]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Spawn dedicated rpc workers for state reports queue
authorEugene Nikanorov <enikanorov@mirantis.com>
Mon, 12 Oct 2015 12:21:02 +0000 (16:21 +0400)
committerEugene Nikanorov <enikanorov@mirantis.com>
Wed, 21 Oct 2015 13:55:25 +0000 (17:55 +0400)
By default spawn one additional rpc worker to process
state report queue.
State report queue will also be processed by regular
rpc workers, but in case these workers are busy with
processing heavy requests, state reports queue will
automatically be consumed by dedicated rpc workers.

This change applies to ML2 plugin only.
Other plugins should implement start_rpc_state_reports_listener
to enable additional rpc workers.

Change-Id: I5f8df6a478f7c82382049274b34b07109eeafbdb
Closes-Bug: #1505217

etc/neutron.conf
neutron/neutron_plugin_base_v2.py
neutron/plugins/ml2/plugin.py
neutron/service.py
neutron/tests/functional/test_server.py

index bac35bd2dc1fc948bb741be4f80e66e520b86a7b..f5c15fadf2017a35504d26e43c75d07c845b9755 100644 (file)
 # the default value is equal to the number of CPUs available.
 # api_workers = <number of CPUs>
 
-# Number of separate RPC worker processes to spawn. If not specified or < 1,
-# a single RPC worker process is spawned by the parent process.
+# Number of separate RPC worker processes to spawn.
 # rpc_workers = 1
 
+# Number of separate RPC worker processes for processing state report queue.
+# Increasing this parameter makes sense when neutron-server handles
+# hundreds of agents.
+# rpc_state_report_workers = 1
+
 # Timeout for client connections socket operations. If an
 # incoming connection is idle for this number of seconds it
 # will be closed. A value of '0' means wait forever. (integer
index 79a85c4c83eebd8c551290ebaa6959a870bc1e71..1a557519c08b242a4542ed28e22ec68ee7d9a690 100644 (file)
@@ -375,6 +375,16 @@ class NeutronPluginBaseV2(object):
         """
         raise NotImplementedError()
 
+    def start_rpc_state_reports_listener(self):
+        """Start the RPC listeners consuming state reports queue.
+
+        This optional method creates rpc consumer for REPORTS queue only.
+
+        .. note:: this method is optional, as it was not part of the originally
+                  defined plugin API.
+        """
+        raise NotImplementedError()
+
     def rpc_workers_supported(self):
         """Return whether the plugin supports multiple RPC workers.
 
@@ -390,6 +400,15 @@ class NeutronPluginBaseV2(object):
         return (self.__class__.start_rpc_listeners !=
                 NeutronPluginBaseV2.start_rpc_listeners)
 
+    def rpc_state_report_workers_supported(self):
+        """Return whether the plugin supports state report RPC workers.
+
+        .. note:: this method is optional, as it was not part of the originally
+                  defined plugin API.
+        """
+        return (self.__class__.start_rpc_state_reports_listener !=
+                NeutronPluginBaseV2.start_rpc_state_reports_listener)
+
     def get_workers(self):
         """Returns a collection NeutronWorker instances
 
index ae8c3f181f830dbd1e714a53f1628a98b31316d0..e7e0b9dc20139d92af0a591e8f17b27d72271808 100644 (file)
@@ -190,11 +190,19 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.topic = topics.PLUGIN
         self.conn = n_rpc.create_connection(new=True)
         self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
+        # process state reports despite dedicated rpc workers
         self.conn.create_consumer(topics.REPORTS,
                                   [agents_db.AgentExtRpcCallback()],
                                   fanout=False)
         return self.conn.consume_in_threads()
 
+    def start_rpc_state_reports_listener(self):
+        self.conn_reports = n_rpc.create_connection(new=True)
+        self.conn_reports.create_consumer(topics.REPORTS,
+                                          [agents_db.AgentExtRpcCallback()],
+                                          fanout=False)
+        return self.conn_reports.consume_in_threads()
+
     def _filter_nets_provider(self, context, networks, filters):
         return [network
                 for network in networks
index b211306a3313f4f0c201ee3dd4be8c24d10d62cc..8f03e1570c2e9a629f1097eedd801c0cbbf3dd06 100644 (file)
@@ -47,6 +47,10 @@ service_opts = [
     cfg.IntOpt('rpc_workers',
                default=1,
                help=_('Number of RPC worker processes for service')),
+    cfg.IntOpt('rpc_state_report_workers',
+               default=1,
+               help=_('Number of RPC worker processes dedicated to state '
+                      'reports queue')),
     cfg.IntOpt('periodic_fuzzy_delay',
                default=5,
                help=_('Range of seconds to randomly delay when starting the '
@@ -167,6 +171,10 @@ class RpcWorker(worker.NeutronWorker):
         config.reset_service()
 
 
+class RpcReportsWorker(RpcWorker):
+    start_listeners_method = 'start_rpc_state_reports_listener'
+
+
 def serve_rpc():
     plugin = manager.NeutronManager.get_plugin()
     service_plugins = (
@@ -190,7 +198,6 @@ def serve_rpc():
     try:
         # passing service plugins only, because core plugin is among them
         rpc = RpcWorker(service_plugins)
-
         # dispose the whole pool before os.fork, otherwise there will
         # be shared DB connections in child processes which may cause
         # DB errors.
@@ -198,6 +205,14 @@ def serve_rpc():
         session.dispose()
         launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
         launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+        if (cfg.CONF.rpc_state_report_workers > 0 and
+            plugin.rpc_state_report_workers_supported()):
+            rpc_state_rep = RpcReportsWorker([plugin])
+            LOG.debug('using launcher for state reports rpc, workers=%s',
+                      cfg.CONF.rpc_state_report_workers)
+            launcher.launch_service(
+                rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
+
         return launcher
     except Exception:
         with excutils.save_and_reraise_exception():
index e04b31b1fadb3fd6cbccb6855d644adcf5227871..c13c4a266cfbe2d51693f7085f26dd5a59743421 100644 (file)
@@ -239,6 +239,8 @@ class TestRPCServer(TestNeutronServer):
                 get_plugin.return_value = self.plugin
 
                 CONF.set_override("rpc_workers", workers)
+                # not interested in state report workers specifically
+                CONF.set_override("rpc_state_report_workers", 0)
 
                 launcher = service.serve_rpc()
                 launcher.wait()