From 678b431ba4be8ccf6e505b3dd8fb8e9465b82f83 Mon Sep 17 00:00:00 2001 From: Eugene Nikanorov Date: Mon, 12 Oct 2015 16:21:02 +0400 Subject: [PATCH] Spawn dedicated rpc workers for state reports queue By default spawn one additional rpc worker to process state report queue. State report queue will also be processed by regular rpc workers, but in case these workers are busy with processing heavy requests, state reports queue will automatically be consumed by dedicated rpc workers. This change applies to ML2 plugin only. Other plugins should implement start_rpc_state_reports_listener to enable additional rpc workers. Change-Id: I5f8df6a478f7c82382049274b34b07109eeafbdb Closes-Bug: #1505217 --- etc/neutron.conf | 8 ++++++-- neutron/neutron_plugin_base_v2.py | 19 +++++++++++++++++++ neutron/plugins/ml2/plugin.py | 8 ++++++++ neutron/service.py | 17 ++++++++++++++++- neutron/tests/functional/test_server.py | 2 ++ 5 files changed, 51 insertions(+), 3 deletions(-) diff --git a/etc/neutron.conf b/etc/neutron.conf index bac35bd2d..f5c15fadf 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -311,10 +311,14 @@ # the default value is equal to the number of CPUs available. # api_workers = -# Number of separate RPC worker processes to spawn. If not specified or < 1, -# a single RPC worker process is spawned by the parent process. +# Number of separate RPC worker processes to spawn. # rpc_workers = 1 +# Number of separate RPC worker processes for processing state report queue. +# Increasing this parameter makes sense when neutron-server handles +# hundreds of agents. +# rpc_state_report_workers = 1 + # Timeout for client connections socket operations. If an # incoming connection is idle for this number of seconds it # will be closed. A value of '0' means wait forever. (integer diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 79a85c4c8..1a557519c 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -375,6 +375,16 @@ class NeutronPluginBaseV2(object): """ raise NotImplementedError() + def start_rpc_state_reports_listener(self): + """Start the RPC listeners consuming state reports queue. + + This optional method creates rpc consumer for REPORTS queue only. + + .. note:: this method is optional, as it was not part of the originally + defined plugin API. + """ + raise NotImplementedError() + def rpc_workers_supported(self): """Return whether the plugin supports multiple RPC workers. @@ -390,6 +400,15 @@ class NeutronPluginBaseV2(object): return (self.__class__.start_rpc_listeners != NeutronPluginBaseV2.start_rpc_listeners) + def rpc_state_report_workers_supported(self): + """Return whether the plugin supports state report RPC workers. + + .. note:: this method is optional, as it was not part of the originally + defined plugin API. + """ + return (self.__class__.start_rpc_state_reports_listener != + NeutronPluginBaseV2.start_rpc_state_reports_listener) + def get_workers(self): """Returns a collection NeutronWorker instances diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index ae8c3f181..e7e0b9dc2 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -190,11 +190,19 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.topic = topics.PLUGIN self.conn = n_rpc.create_connection(new=True) self.conn.create_consumer(self.topic, self.endpoints, fanout=False) + # process state reports despite dedicated rpc workers self.conn.create_consumer(topics.REPORTS, [agents_db.AgentExtRpcCallback()], fanout=False) return self.conn.consume_in_threads() + def start_rpc_state_reports_listener(self): + self.conn_reports = n_rpc.create_connection(new=True) + self.conn_reports.create_consumer(topics.REPORTS, + [agents_db.AgentExtRpcCallback()], + fanout=False) + return self.conn_reports.consume_in_threads() + def _filter_nets_provider(self, context, networks, filters): return [network for network in networks diff --git a/neutron/service.py b/neutron/service.py index b211306a3..8f03e1570 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -47,6 +47,10 @@ service_opts = [ cfg.IntOpt('rpc_workers', default=1, help=_('Number of RPC worker processes for service')), + cfg.IntOpt('rpc_state_report_workers', + default=1, + help=_('Number of RPC worker processes dedicated to state ' + 'reports queue')), cfg.IntOpt('periodic_fuzzy_delay', default=5, help=_('Range of seconds to randomly delay when starting the ' @@ -167,6 +171,10 @@ class RpcWorker(worker.NeutronWorker): config.reset_service() +class RpcReportsWorker(RpcWorker): + start_listeners_method = 'start_rpc_state_reports_listener' + + def serve_rpc(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( @@ -190,7 +198,6 @@ def serve_rpc(): try: # passing service plugins only, because core plugin is among them rpc = RpcWorker(service_plugins) - # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. @@ -198,6 +205,14 @@ def serve_rpc(): session.dispose() launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_state_rep = RpcReportsWorker([plugin]) + LOG.debug('using launcher for state reports rpc, workers=%s', + cfg.CONF.rpc_state_report_workers) + launcher.launch_service( + rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) + return launcher except Exception: with excutils.save_and_reraise_exception(): diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index e04b31b1f..c13c4a266 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -239,6 +239,8 @@ class TestRPCServer(TestNeutronServer): get_plugin.return_value = self.plugin CONF.set_override("rpc_workers", workers) + # not interested in state report workers specifically + CONF.set_override("rpc_state_report_workers", 0) launcher = service.serve_rpc() launcher.wait() -- 2.45.2