self._servers = self._plugin.start_rpc_listeners()
def wait(self):
+ try:
+ self._wait()
+ except Exception:
+ LOG.exception(_LE('done with wait'))
+ raise
+
+ def _wait(self):
+ LOG.debug('calling RpcWorker wait()')
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
+ LOG.debug('calling wait on %s', server)
server.wait()
+ else:
+ LOG.debug('NOT calling wait on %s', server)
+ LOG.debug('returning from RpcWorker wait()')
def stop(self):
+ LOG.debug('calling RpcWorker stop()')
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
+ LOG.debug('calling stop on %s', server)
server.stop()
@staticmethod
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
+ LOG.debug('starting rpc directly, workers=%s',
+ cfg.CONF.rpc_workers)
rpc.start()
return rpc
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
+ LOG.debug('using launcher for rpc, workers=%s',
+ cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)