There are several cases where plugin initialization should be
handled after neutron-server forks API/RPC workers. For example,
starting a client connection to an SDN controller before forking
copies the fd of the socket to the child process, but then you have
multiple processes trying to read/write the same socket connection.
It is also useful for a plugin to be able to do something in only
one process, regardless of how many workers are forked. One example
would be handling syncing from an external system to the neutron
database.
This patch does 3 things:
1) Treats rpc_workers=0 as = 1. This simplifies the code for
handling notification that forking has completed. In the
existing code, calling the notification in the Worker object's
start() method would happen twice in the case where both api
and rpc workers were 0, despite there being only one process.
An earlier patch already changed the default api_workers to be
the number of processors.
2) Adds notification of forking via the callbacks mechanism.
Plugins can subscribe to resources.PROCESS, event.AFTER_CREATE
and do any post-fork initialization that needs to be done for
every spawned process.
3) Adds core/service plugin calls to get_workers() which defaults
to returning (). Plugins that need additional processes to spawn
should just return an iterable of NeutronWorkers that will be
spawned in their own process.
DocImpact
Closes-Bug: #
1463129
Change-Id: Ib99954678c2b4f32f486b537979d446aafbea07b
# ========== end of items for VLAN trunking networks ==========
# =========== WSGI parameters related to the API server ==============
-# Number of separate worker processes to spawn. A value of 0 runs the
-# worker thread in the current process. Greater than 0 launches that number of
-# child processes as workers. The parent process manages them. If not
-# specified, the default value is equal to the number of CPUs available to
-# achieve best performance.
+# Number of separate API worker processes to spawn. If not specified or < 1,
+# the default value is equal to the number of CPUs available.
# api_workers = <number of CPUs>
-# Number of separate RPC worker processes to spawn. The default, 0, runs the
-# worker thread in the current process. Greater than 0 launches that number of
-# child processes as RPC workers. The parent process manages them.
-# This feature is experimental until issues are addressed and testing has been
-# enabled for various plugins for compatibility.
-# rpc_workers = 0
+# Number of separate RPC worker processes to spawn. If not specified or < 1,
+# a single RPC worker process is spawned by the parent process.
+# rpc_workers = 1
# Timeout for client connections socket operations. If an
# incoming connection is idle for this number of seconds it
# String literals representing core resources.
PORT = 'port'
+PROCESS = 'process'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
ROUTER_INTERFACE = 'router_interface'
service_plugins = cls.get_instance().service_plugins
return dict((x, weakref.proxy(y))
for x, y in six.iteritems(service_plugins))
+
+ @classmethod
+ def get_unique_service_plugins(cls):
+ service_plugins = cls.get_instance().service_plugins
+ return tuple(weakref.proxy(x) for x in set(service_plugins.values()))
"""
return (self.__class__.start_rpc_listeners !=
NeutronPluginBaseV2.start_rpc_listeners)
+
+ def get_workers(self):
+ """Returns a collection NeutronWorker instances
+
+ If a plugin needs to define worker processes outside of API/RPC workers
+ then it will override this and return a collection of NeutronWorker
+ instances
+ """
+ return ()
"""
pass
+ def get_workers(self):
+ """Get any NeutronWorker instances that should have their own process
+
+ Any driver that needs to run processes separate from the API or RPC
+ workers, can return a sequence of NeutronWorker instances.
+ """
+ return ()
+
@six.add_metaclass(abc.ABCMeta)
class ExtensionDriver(object):
return False
return True
+ def get_workers(self):
+ workers = []
+ for driver in self.ordered_mech_drivers:
+ workers += driver.obj.get_workers()
+ return workers
+
class ExtensionManager(stevedore.named.NamedExtensionManager):
"""Manage extension drivers using drivers."""
if port:
return port.id
return device
+
+ def get_workers(self):
+ return self.mechanism_manager.get_workers()
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
+ plugin_workers = service.start_plugin_workers()
+ for worker in plugin_workers:
+ pool.spawn(worker.wait)
+
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())
from neutron.db import api as session
from neutron.i18n import _LE, _LI
from neutron import manager
+from neutron import worker
from neutron import wsgi
'If not specified, the default is equal to the number '
'of CPUs available for best performance.')),
cfg.IntOpt('rpc_workers',
- default=0,
+ default=1,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
return service
-class RpcWorker(common_service.ServiceBase):
+def start_plugin_workers():
+ launchers = []
+ # NOTE(twilson) get_service_plugins also returns the core plugin
+ for plugin in manager.NeutronManager.get_unique_service_plugins():
+ # TODO(twilson) Instead of defaulting here, come up with a good way to
+ # share a common get_workers default between NeutronPluginBaseV2 and
+ # ServicePluginBase
+ for plugin_worker in getattr(plugin, 'get_workers', tuple)():
+ launcher = common_service.ProcessLauncher(cfg.CONF)
+ launcher.launch_service(plugin_worker)
+ launchers.append(launcher)
+ return launchers
+
+
+class RpcWorker(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._servers = []
def start(self):
+ super(RpcWorker, self).start()
self._servers = self._plugin.start_rpc_listeners()
def wait(self):
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
+ if cfg.CONF.rpc_workers < 1:
+ cfg.CONF.set_override('rpc_workers', 1)
+
# If 0 < rpc_workers then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports
try:
rpc = RpcWorker(plugin)
- if cfg.CONF.rpc_workers < 1:
- LOG.debug('starting rpc directly, workers=%s',
- cfg.CONF.rpc_workers)
- rpc.start()
- return rpc
- else:
- # dispose the whole pool before os.fork, otherwise there will
- # be shared DB connections in child processes which may cause
- # DB errors.
- LOG.debug('using launcher for rpc, workers=%s',
- cfg.CONF.rpc_workers)
- session.dispose()
- launcher = common_service.ProcessLauncher(cfg.CONF,
- wait_interval=1.0)
- launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
- return launcher
+ # dispose the whole pool before os.fork, otherwise there will
+ # be shared DB connections in child processes which may cause
+ # DB errors.
+ LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
+ session.dispose()
+ launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
+ launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+ return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
def _get_api_workers():
workers = cfg.CONF.api_workers
- if workers is None:
+ if not workers:
workers = processutils.get_worker_count()
return workers
from neutron.db import l3_hascheduler_db
from neutron.plugins.common import constants
from neutron.quota import resource_registry
+from neutron.services import service_base
-class L3RouterPlugin(common_db_mixin.CommonDbMixin,
+class L3RouterPlugin(service_base.ServicePluginBase,
+ common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
"""Return string description of the plugin."""
pass
+ def get_workers(self):
+ """Returns a collection of NeutronWorkers"""
+ return ()
+
def load_drivers(service_type, plugin):
"""Loads drivers for specific service.
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
+from neutron import worker
from neutron import wsgi
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
workers=2)
+
+
+class TestPluginWorker(TestNeutronServer):
+ """Ensure that a plugin returning Workers spawns workers"""
+
+ def setUp(self):
+ super(TestPluginWorker, self).setUp()
+ self.setup_coreplugin(TARGET_PLUGIN)
+ self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
+ self.plugin = self._plugin_patcher.start()
+
+ def _start_plugin(self, workers=0):
+ with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
+ gp.return_value = self.plugin
+ launchers = service.start_plugin_workers()
+ for launcher in launchers:
+ launcher.wait()
+
+ def test_start(self):
+ class FakeWorker(worker.NeutronWorker):
+ def start(self):
+ pass
+
+ def wait(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def reset(self):
+ pass
+
+ # Make both ABC happy and ensure 'self' is correct
+ FakeWorker.reset = self._fake_reset
+ workers = [FakeWorker()]
+ self.plugin.return_value.get_workers.return_value = workers
+ self._test_restart_service_on_sighup(service=self._start_plugin,
+ workers=len(workers))
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_service import service
+
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
+
+
+class NeutronWorker(service.ServiceBase):
+ """Partial implementation of the ServiceBase ABC
+
+ Subclasses will still need to add the other abstractmethods defined in
+ service.ServiceBase. See oslo_service for more details.
+
+ If a plugin needs to handle synchornization with the Neutron database and
+ do this only once instead of in every API worker, for instance, it would
+ define a NeutronWorker class and the plugin would have get_workers return
+ an array of NeutronWorker instnaces. For example:
+ class MyPlugin(...):
+ def get_workers(self):
+ return [MyPluginWorker()]
+
+ class MyPluginWorker(NeutronWorker):
+ def start(self):
+ super(MyPluginWorker, self).start()
+ do_sync()
+ """
+ def start(self):
+ registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
from neutron import context
from neutron.db import api
from neutron.i18n import _LE, _LI
+from neutron import worker
socket_opts = [
cfg.IntOpt('backlog',
return body
-class WorkerService(common_service.ServiceBase):
+class WorkerService(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
self._server = None
def start(self):
+ super(WorkerService, self).start()
# When api worker is stopped it kills the eventlet wsgi server which
# internally closes the wsgi server socket object. This server socket
# object becomes not usable which leads to "Bad file descriptor"