]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Consume service plugins queues in RPC workers.
authorEugene Nikanorov <enikanorov@mirantis.com>
Wed, 23 Sep 2015 10:06:54 +0000 (14:06 +0400)
committerEugene Nikanorov <enikanorov@mirantis.com>
Fri, 2 Oct 2015 07:32:41 +0000 (11:32 +0400)
This patch adds all RPC workers to consumers of service
plugins queues such as metering and l3-plugin.
This is important for DVR-enabled deployments with hundreds
of agents.

Change-Id: I6fea7f409c91b25d2c35b038d6100fdfa85d1905
Closes-Bug: #1498844

neutron/service.py
neutron/services/l3_router/l3_router_plugin.py
neutron/services/metering/metering_plugin.py

index 856551ffb96735ae0c4eb3e9bf682d72ac3e5372..f70cdee6cd91558d82b9b36049d529d2105cd64c 100644 (file)
@@ -125,13 +125,17 @@ def start_plugin_workers():
 
 class RpcWorker(worker.NeutronWorker):
     """Wraps a worker to be handled by ProcessLauncher"""
-    def __init__(self, plugin):
-        self._plugin = plugin
+    start_listeners_method = 'start_rpc_listeners'
+
+    def __init__(self, plugins):
+        self._plugins = plugins
         self._servers = []
 
     def start(self):
-        super(RpcWorker, self).start()
-        self._servers = self._plugin.start_rpc_listeners()
+        for plugin in self._plugins:
+            if hasattr(plugin, self.start_listeners_method):
+                servers = getattr(plugin, self.start_listeners_method)()
+                self._servers.extend(servers)
 
     def wait(self):
         try:
@@ -164,6 +168,8 @@ class RpcWorker(worker.NeutronWorker):
 
 def serve_rpc():
     plugin = manager.NeutronManager.get_plugin()
+    service_plugins = (
+        manager.NeutronManager.get_service_plugins().values())
 
     if cfg.CONF.rpc_workers < 1:
         cfg.CONF.set_override('rpc_workers', 1)
@@ -181,7 +187,8 @@ def serve_rpc():
         raise NotImplementedError()
 
     try:
-        rpc = RpcWorker(plugin)
+        # passing service plugins only, because core plugin is among them
+        rpc = RpcWorker(service_plugins)
 
         # dispose the whole pool before os.fork, otherwise there will
         # be shared DB connections in child processes which may cause
index 85daa4ea1d3aed7760e7945c978ae3bb6051bddc..cca550d2b9c97a7a3f8f29bfbf1e570c61c28b55 100644 (file)
@@ -58,7 +58,6 @@ class L3RouterPlugin(service_base.ServicePluginBase,
     @resource_registry.tracked_resources(router=l3_db.Router,
                                          floatingip=l3_db.FloatingIP)
     def __init__(self):
-        self.setup_rpc()
         self.router_scheduler = importutils.import_object(
             cfg.CONF.router_scheduler_driver)
         self.start_periodic_l3_agent_status_check()
@@ -66,9 +65,10 @@ class L3RouterPlugin(service_base.ServicePluginBase,
         if 'dvr' in self.supported_extension_aliases:
             l3_dvrscheduler_db.subscribe()
         l3_db.subscribe()
+        self.start_rpc_listeners()
 
     @log_helpers.log_method_call
-    def setup_rpc(self):
+    def start_rpc_listeners(self):
         # RPC support
         self.topic = topics.L3PLUGIN
         self.conn = n_rpc.create_connection(new=True)
@@ -77,7 +77,7 @@ class L3RouterPlugin(service_base.ServicePluginBase,
         self.endpoints = [l3_rpc.L3RpcCallback()]
         self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
-        self.conn.consume_in_threads()
+        return self.conn.consume_in_threads()
 
     def get_plugin_type(self):
         return constants.L3_ROUTER_NAT
index 57789b784752b6a4ede62c12e1bd62f6c6f5b786..6cbbabd47af3d2202f9db404b5fb30df71e49087 100644 (file)
@@ -27,14 +27,15 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
     def __init__(self):
         super(MeteringPlugin, self).__init__()
 
-        self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
+        self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
+        self.start_rpc_listeners()
 
+    def start_rpc_listeners(self):
+        self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
         self.conn = n_rpc.create_connection(new=True)
         self.conn.create_consumer(
             topics.METERING_PLUGIN, self.endpoints, fanout=False)
-        self.conn.consume_in_threads()
-
-        self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
+        return self.conn.consume_in_threads()
 
     def create_metering_label(self, context, metering_label):
         label = super(MeteringPlugin, self).create_metering_label(