From 9f6bd17703b7286be9e7d439d15f4dec2774e13a Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Mon, 15 Jun 2015 22:52:28 -0500 Subject: [PATCH] Add support for PluginWorker and Process creation notification There are several cases where plugin initialization should be handled after neutron-server forks API/RPC workers. For example, starting a client connection to an SDN controller before forking copies the fd of the socket to the child process, but then you have multiple processes trying to read/write the same socket connection. It is also useful for a plugin to be able to do something in only one process, regardless of how many workers are forked. One example would be handling syncing from an external system to the neutron database. This patch does 3 things: 1) Treats rpc_workers=0 as = 1. This simplifies the code for handling notification that forking has completed. In the existing code, calling the notification in the Worker object's start() method would happen twice in the case where both api and rpc workers were 0, despite there being only one process. An earlier patch already changed the default api_workers to be the number of processors. 2) Adds notification of forking via the callbacks mechanism. Plugins can subscribe to resources.PROCESS, event.AFTER_CREATE and do any post-fork initialization that needs to be done for every spawned process. 3) Adds core/service plugin calls to get_workers() which defaults to returning (). Plugins that need additional processes to spawn should just return an iterable of NeutronWorkers that will be spawned in their own process. DocImpact Closes-Bug: #1463129 Change-Id: Ib99954678c2b4f32f486b537979d446aafbea07b --- etc/neutron.conf | 16 ++---- neutron/callbacks/resources.py | 1 + neutron/manager.py | 5 ++ neutron/neutron_plugin_base_v2.py | 9 ++++ neutron/plugins/ml2/driver_api.py | 8 +++ neutron/plugins/ml2/managers.py | 6 +++ neutron/plugins/ml2/plugin.py | 3 ++ neutron/server/__init__.py | 4 ++ neutron/service.py | 49 ++++++++++++------- .../services/l3_router/l3_router_plugin.py | 4 +- neutron/services/service_base.py | 4 ++ neutron/tests/functional/test_server.py | 39 +++++++++++++++ neutron/worker.py | 40 +++++++++++++++ neutron/wsgi.py | 4 +- 14 files changed, 160 insertions(+), 32 deletions(-) create mode 100644 neutron/worker.py diff --git a/etc/neutron.conf b/etc/neutron.conf index 3cca29c2b..716746292 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -301,19 +301,13 @@ # ========== end of items for VLAN trunking networks ========== # =========== WSGI parameters related to the API server ============== -# Number of separate worker processes to spawn. A value of 0 runs the -# worker thread in the current process. Greater than 0 launches that number of -# child processes as workers. The parent process manages them. If not -# specified, the default value is equal to the number of CPUs available to -# achieve best performance. +# Number of separate API worker processes to spawn. If not specified or < 1, +# the default value is equal to the number of CPUs available. # api_workers = -# Number of separate RPC worker processes to spawn. The default, 0, runs the -# worker thread in the current process. Greater than 0 launches that number of -# child processes as RPC workers. The parent process manages them. -# This feature is experimental until issues are addressed and testing has been -# enabled for various plugins for compatibility. -# rpc_workers = 0 +# Number of separate RPC worker processes to spawn. If not specified or < 1, +# a single RPC worker process is spawned by the parent process. +# rpc_workers = 1 # Timeout for client connections socket operations. If an # incoming connection is idle for this number of seconds it diff --git a/neutron/callbacks/resources.py b/neutron/callbacks/resources.py index 1544fe5a4..60c218ad3 100644 --- a/neutron/callbacks/resources.py +++ b/neutron/callbacks/resources.py @@ -12,6 +12,7 @@ # String literals representing core resources. PORT = 'port' +PROCESS = 'process' ROUTER = 'router' ROUTER_GATEWAY = 'router_gateway' ROUTER_INTERFACE = 'router_interface' diff --git a/neutron/manager.py b/neutron/manager.py index 0e3a16cb2..7a174507f 100644 --- a/neutron/manager.py +++ b/neutron/manager.py @@ -246,3 +246,8 @@ class NeutronManager(object): service_plugins = cls.get_instance().service_plugins return dict((x, weakref.proxy(y)) for x, y in six.iteritems(service_plugins)) + + @classmethod + def get_unique_service_plugins(cls): + service_plugins = cls.get_instance().service_plugins + return tuple(weakref.proxy(x) for x in set(service_plugins.values())) diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 374dd19e7..79a85c4c8 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object): """ return (self.__class__.start_rpc_listeners != NeutronPluginBaseV2.start_rpc_listeners) + + def get_workers(self): + """Returns a collection NeutronWorker instances + + If a plugin needs to define worker processes outside of API/RPC workers + then it will override this and return a collection of NeutronWorker + instances + """ + return () diff --git a/neutron/plugins/ml2/driver_api.py b/neutron/plugins/ml2/driver_api.py index c54ab1ba3..db25c8d5f 100644 --- a/neutron/plugins/ml2/driver_api.py +++ b/neutron/plugins/ml2/driver_api.py @@ -888,6 +888,14 @@ class MechanismDriver(object): """ pass + def get_workers(self): + """Get any NeutronWorker instances that should have their own process + + Any driver that needs to run processes separate from the API or RPC + workers, can return a sequence of NeutronWorker instances. + """ + return () + @six.add_metaclass(abc.ABCMeta) class ExtensionDriver(object): diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 40179dfe2..a194aed0a 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager): return False return True + def get_workers(self): + workers = [] + for driver in self.ordered_mech_drivers: + workers += driver.obj.get_workers() + return workers + class ExtensionManager(stevedore.named.NamedExtensionManager): """Manage extension drivers using drivers.""" diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 4cdf98a40..d3f42dcf8 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -1539,3 +1539,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if port: return port.id return device + + def get_workers(self): + return self.mechanism_manager.get_workers() diff --git a/neutron/server/__init__.py b/neutron/server/__init__.py index c6c72e284..688b75421 100644 --- a/neutron/server/__init__.py +++ b/neutron/server/__init__.py @@ -50,6 +50,10 @@ def main(): else: rpc_thread = pool.spawn(neutron_rpc.wait) + plugin_workers = service.start_plugin_workers() + for worker in plugin_workers: + pool.spawn(worker.wait) + # api and rpc should die together. When one dies, kill the other. rpc_thread.link(lambda gt: api_thread.kill()) api_thread.link(lambda gt: rpc_thread.kill()) diff --git a/neutron/service.py b/neutron/service.py index 6b1eee248..856551ffb 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,6 +32,7 @@ from neutron import context from neutron.db import api as session from neutron.i18n import _LE, _LI from neutron import manager +from neutron import worker from neutron import wsgi @@ -44,7 +45,7 @@ service_opts = [ 'If not specified, the default is equal to the number ' 'of CPUs available for best performance.')), cfg.IntOpt('rpc_workers', - default=0, + default=1, help=_('Number of RPC worker processes for service')), cfg.IntOpt('periodic_fuzzy_delay', default=5, @@ -108,13 +109,28 @@ def serve_wsgi(cls): return service -class RpcWorker(common_service.ServiceBase): +def start_plugin_workers(): + launchers = [] + # NOTE(twilson) get_service_plugins also returns the core plugin + for plugin in manager.NeutronManager.get_unique_service_plugins(): + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + for plugin_worker in getattr(plugin, 'get_workers', tuple)(): + launcher = common_service.ProcessLauncher(cfg.CONF) + launcher.launch_service(plugin_worker) + launchers.append(launcher) + return launchers + + +class RpcWorker(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin self._servers = [] def start(self): + super(RpcWorker, self).start() self._servers = self._plugin.start_rpc_listeners() def wait(self): @@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase): def serve_rpc(): plugin = manager.NeutronManager.get_plugin() + if cfg.CONF.rpc_workers < 1: + cfg.CONF.set_override('rpc_workers', 1) + # If 0 < rpc_workers then start_rpc_listeners would be called in a # subprocess and we cannot simply catch the NotImplementedError. It is # simpler to check this up front by testing whether the plugin supports @@ -164,22 +183,14 @@ def serve_rpc(): try: rpc = RpcWorker(plugin) - if cfg.CONF.rpc_workers < 1: - LOG.debug('starting rpc directly, workers=%s', - cfg.CONF.rpc_workers) - rpc.start() - return rpc - else: - # dispose the whole pool before os.fork, otherwise there will - # be shared DB connections in child processes which may cause - # DB errors. - LOG.debug('using launcher for rpc, workers=%s', - cfg.CONF.rpc_workers) - session.dispose() - launcher = common_service.ProcessLauncher(cfg.CONF, - wait_interval=1.0) - launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) - return launcher + # dispose the whole pool before os.fork, otherwise there will + # be shared DB connections in child processes which may cause + # DB errors. + LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) + session.dispose() + launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) + launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + return launcher except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' @@ -188,7 +199,7 @@ def serve_rpc(): def _get_api_workers(): workers = cfg.CONF.api_workers - if workers is None: + if not workers: workers = processutils.get_worker_count() return workers diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 4c4e96ae5..85daa4ea1 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db from neutron.db import l3_hascheduler_db from neutron.plugins.common import constants from neutron.quota import resource_registry +from neutron.services import service_base -class L3RouterPlugin(common_db_mixin.CommonDbMixin, +class L3RouterPlugin(service_base.ServicePluginBase, + common_db_mixin.CommonDbMixin, extraroute_db.ExtraRoute_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin, diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index acae7a0f5..3ae9f6329 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface): """Return string description of the plugin.""" pass + def get_workers(self): + """Returns a collection of NeutronWorkers""" + return () + def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 8f81f6849..48891bb7b 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -27,6 +27,7 @@ import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base +from neutron import worker from neutron import wsgi @@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer): def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, workers=2) + + +class TestPluginWorker(TestNeutronServer): + """Ensure that a plugin returning Workers spawns workers""" + + def setUp(self): + super(TestPluginWorker, self).setUp() + self.setup_coreplugin(TARGET_PLUGIN) + self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True) + self.plugin = self._plugin_patcher.start() + + def _start_plugin(self, workers=0): + with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: + gp.return_value = self.plugin + launchers = service.start_plugin_workers() + for launcher in launchers: + launcher.wait() + + def test_start(self): + class FakeWorker(worker.NeutronWorker): + def start(self): + pass + + def wait(self): + pass + + def stop(self): + pass + + def reset(self): + pass + + # Make both ABC happy and ensure 'self' is correct + FakeWorker.reset = self._fake_reset + workers = [FakeWorker()] + self.plugin.return_value.get_workers.return_value = workers + self._test_restart_service_on_sighup(service=self._start_plugin, + workers=len(workers)) diff --git a/neutron/worker.py b/neutron/worker.py new file mode 100644 index 000000000..c49d742f6 --- /dev/null +++ b/neutron/worker.py @@ -0,0 +1,40 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_service import service + +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources + + +class NeutronWorker(service.ServiceBase): + """Partial implementation of the ServiceBase ABC + + Subclasses will still need to add the other abstractmethods defined in + service.ServiceBase. See oslo_service for more details. + + If a plugin needs to handle synchornization with the Neutron database and + do this only once instead of in every API worker, for instance, it would + define a NeutronWorker class and the plugin would have get_workers return + an array of NeutronWorker instnaces. For example: + class MyPlugin(...): + def get_workers(self): + return [MyPluginWorker()] + + class MyPluginWorker(NeutronWorker): + def start(self): + super(MyPluginWorker, self).start() + do_sync() + """ + def start(self): + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index cb302d306..dacbadf8e 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,6 +44,7 @@ from neutron.common import exceptions as exception from neutron import context from neutron.db import api from neutron.i18n import _LE, _LI +from neutron import worker socket_opts = [ cfg.IntOpt('backlog', @@ -102,7 +103,7 @@ def encode_body(body): return body -class WorkerService(common_service.ServiceBase): +class WorkerService(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, service, application): self._service = service @@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase): self._server = None def start(self): + super(WorkerService, self).start() # When api worker is stopped it kills the eventlet wsgi server which # internally closes the wsgi server socket object. This server socket # object becomes not usable which leads to "Bad file descriptor" -- 2.45.2