This patch adds all RPC workers to consumers of service
plugins queues such as metering and l3-plugin.
This is important for DVR-enabled deployments with hundreds
of agents.
Change-Id: I6fea7f409c91b25d2c35b038d6100fdfa85d1905
Closes-Bug: #
1498844
class RpcWorker(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
- def __init__(self, plugin):
- self._plugin = plugin
+ start_listeners_method = 'start_rpc_listeners'
+
+ def __init__(self, plugins):
+ self._plugins = plugins
self._servers = []
def start(self):
- super(RpcWorker, self).start()
- self._servers = self._plugin.start_rpc_listeners()
+ for plugin in self._plugins:
+ if hasattr(plugin, self.start_listeners_method):
+ servers = getattr(plugin, self.start_listeners_method)()
+ self._servers.extend(servers)
def wait(self):
try:
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
+ service_plugins = (
+ manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
raise NotImplementedError()
try:
- rpc = RpcWorker(plugin)
+ # passing service plugins only, because core plugin is among them
+ rpc = RpcWorker(service_plugins)
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
@resource_registry.tracked_resources(router=l3_db.Router,
floatingip=l3_db.FloatingIP)
def __init__(self):
- self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
l3_db.subscribe()
+ self.start_rpc_listeners()
@log_helpers.log_method_call
- def setup_rpc(self):
+ def start_rpc_listeners(self):
# RPC support
self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
- self.conn.consume_in_threads()
+ return self.conn.consume_in_threads()
def get_plugin_type(self):
return constants.L3_ROUTER_NAT
def __init__(self):
super(MeteringPlugin, self).__init__()
- self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
+ self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
+ self.start_rpc_listeners()
+ def start_rpc_listeners(self):
+ self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(
topics.METERING_PLUGIN, self.endpoints, fanout=False)
- self.conn.consume_in_threads()
-
- self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
+ return self.conn.consume_in_threads()
def create_metering_label(self, context, metering_label):
label = super(MeteringPlugin, self).create_metering_label(