From 5be613490deebf494f2ecbbcd8cb5dd0d6f5e1b3 Mon Sep 17 00:00:00 2001 From: Eugene Nikanorov Date: Wed, 23 Sep 2015 14:06:54 +0400 Subject: [PATCH] Consume service plugins queues in RPC workers. This patch adds all RPC workers to consumers of service plugins queues such as metering and l3-plugin. This is important for DVR-enabled deployments with hundreds of agents. Change-Id: I6fea7f409c91b25d2c35b038d6100fdfa85d1905 Closes-Bug: #1498844 --- neutron/service.py | 17 ++++++++++++----- neutron/services/l3_router/l3_router_plugin.py | 6 +++--- neutron/services/metering/metering_plugin.py | 9 +++++---- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/neutron/service.py b/neutron/service.py index 856551ffb..f70cdee6c 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -125,13 +125,17 @@ def start_plugin_workers(): class RpcWorker(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, plugin): - self._plugin = plugin + start_listeners_method = 'start_rpc_listeners' + + def __init__(self, plugins): + self._plugins = plugins self._servers = [] def start(self): - super(RpcWorker, self).start() - self._servers = self._plugin.start_rpc_listeners() + for plugin in self._plugins: + if hasattr(plugin, self.start_listeners_method): + servers = getattr(plugin, self.start_listeners_method)() + self._servers.extend(servers) def wait(self): try: @@ -164,6 +168,8 @@ class RpcWorker(worker.NeutronWorker): def serve_rpc(): plugin = manager.NeutronManager.get_plugin() + service_plugins = ( + manager.NeutronManager.get_service_plugins().values()) if cfg.CONF.rpc_workers < 1: cfg.CONF.set_override('rpc_workers', 1) @@ -181,7 +187,8 @@ def serve_rpc(): raise NotImplementedError() try: - rpc = RpcWorker(plugin) + # passing service plugins only, because core plugin is among them + rpc = RpcWorker(service_plugins) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 85daa4ea1..cca550d2b 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -58,7 +58,6 @@ class L3RouterPlugin(service_base.ServicePluginBase, @resource_registry.tracked_resources(router=l3_db.Router, floatingip=l3_db.FloatingIP) def __init__(self): - self.setup_rpc() self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() @@ -66,9 +65,10 @@ class L3RouterPlugin(service_base.ServicePluginBase, if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() + self.start_rpc_listeners() @log_helpers.log_method_call - def setup_rpc(self): + def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection(new=True) @@ -77,7 +77,7 @@ class L3RouterPlugin(service_base.ServicePluginBase, self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() + return self.conn.consume_in_threads() def get_plugin_type(self): return constants.L3_ROUTER_NAT diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 57789b784..6cbbabd47 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -27,14 +27,15 @@ class MeteringPlugin(metering_db.MeteringDbMixin): def __init__(self): super(MeteringPlugin, self).__init__() - self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] + self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() + self.start_rpc_listeners() + def start_rpc_listeners(self): + self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] self.conn = n_rpc.create_connection(new=True) self.conn.create_consumer( topics.METERING_PLUGIN, self.endpoints, fanout=False) - self.conn.consume_in_threads() - - self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() + return self.conn.consume_in_threads() def create_metering_label(self, context, metering_label): label = super(MeteringPlugin, self).create_metering_label( -- 2.45.2