By default spawn one additional rpc worker to process
state report queue.
State report queue will also be processed by regular
rpc workers, but in case these workers are busy with
processing heavy requests, state reports queue will
automatically be consumed by dedicated rpc workers.
This change applies to ML2 plugin only.
Other plugins should implement start_rpc_state_reports_listener
to enable additional rpc workers.
Change-Id: I5f8df6a478f7c82382049274b34b07109eeafbdb
Closes-Bug: #
1505217
# the default value is equal to the number of CPUs available.
# api_workers = <number of CPUs>
-# Number of separate RPC worker processes to spawn. If not specified or < 1,
-# a single RPC worker process is spawned by the parent process.
+# Number of separate RPC worker processes to spawn.
# rpc_workers = 1
+# Number of separate RPC worker processes for processing state report queue.
+# Increasing this parameter makes sense when neutron-server handles
+# hundreds of agents.
+# rpc_state_report_workers = 1
+
# Timeout for client connections socket operations. If an
# incoming connection is idle for this number of seconds it
# will be closed. A value of '0' means wait forever. (integer
"""
raise NotImplementedError()
+ def start_rpc_state_reports_listener(self):
+ """Start the RPC listeners consuming state reports queue.
+
+ This optional method creates rpc consumer for REPORTS queue only.
+
+ .. note:: this method is optional, as it was not part of the originally
+ defined plugin API.
+ """
+ raise NotImplementedError()
+
def rpc_workers_supported(self):
"""Return whether the plugin supports multiple RPC workers.
return (self.__class__.start_rpc_listeners !=
NeutronPluginBaseV2.start_rpc_listeners)
+ def rpc_state_report_workers_supported(self):
+ """Return whether the plugin supports state report RPC workers.
+
+ .. note:: this method is optional, as it was not part of the originally
+ defined plugin API.
+ """
+ return (self.__class__.start_rpc_state_reports_listener !=
+ NeutronPluginBaseV2.start_rpc_state_reports_listener)
+
def get_workers(self):
"""Returns a collection NeutronWorker instances
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
+ # process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn.consume_in_threads()
+ def start_rpc_state_reports_listener(self):
+ self.conn_reports = n_rpc.create_connection(new=True)
+ self.conn_reports.create_consumer(topics.REPORTS,
+ [agents_db.AgentExtRpcCallback()],
+ fanout=False)
+ return self.conn_reports.consume_in_threads()
+
def _filter_nets_provider(self, context, networks, filters):
return [network
for network in networks
cfg.IntOpt('rpc_workers',
default=1,
help=_('Number of RPC worker processes for service')),
+ cfg.IntOpt('rpc_state_report_workers',
+ default=1,
+ help=_('Number of RPC worker processes dedicated to state '
+ 'reports queue')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
config.reset_service()
+class RpcReportsWorker(RpcWorker):
+ start_listeners_method = 'start_rpc_state_reports_listener'
+
+
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
try:
# passing service plugins only, because core plugin is among them
rpc = RpcWorker(service_plugins)
-
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+ if (cfg.CONF.rpc_state_report_workers > 0 and
+ plugin.rpc_state_report_workers_supported()):
+ rpc_state_rep = RpcReportsWorker([plugin])
+ LOG.debug('using launcher for state reports rpc, workers=%s',
+ cfg.CONF.rpc_state_report_workers)
+ launcher.launch_service(
+ rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
+
return launcher
except Exception:
with excutils.save_and_reraise_exception():
get_plugin.return_value = self.plugin
CONF.set_override("rpc_workers", workers)
+ # not interested in state report workers specifically
+ CONF.set_override("rpc_state_report_workers", 0)
launcher = service.serve_rpc()
launcher.wait()