]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add support for PluginWorker and Process creation notification
authorTerry Wilson <twilson@redhat.com>
Tue, 16 Jun 2015 03:52:28 +0000 (22:52 -0500)
committerArmando Migliaccio <armamig@gmail.com>
Thu, 3 Sep 2015 06:40:50 +0000 (06:40 +0000)
There are several cases where plugin initialization should be
handled after neutron-server forks API/RPC workers. For example,
starting a client connection to an SDN controller before forking
copies the fd of the socket to the child process, but then you have
multiple processes trying to read/write the same socket connection.

It is also useful for a plugin to be able to do something in only
one process, regardless of how many workers are forked. One example
would be handling syncing from an external system to the neutron
database.

This patch does 3 things:
1) Treats rpc_workers=0 as = 1. This simplifies the code for
   handling notification that forking has completed. In the
   existing code, calling the notification in the Worker object's
   start() method would happen twice in the case where both api
   and rpc workers were 0, despite there being only one process.
   An earlier patch already changed the default api_workers to be
   the number of processors.
2) Adds notification of forking via the callbacks mechanism.
   Plugins can subscribe to resources.PROCESS, event.AFTER_CREATE
   and do any post-fork initialization that needs to be done for
   every spawned process.
3) Adds core/service plugin calls to get_workers() which defaults
   to returning (). Plugins that need additional processes to spawn
   should just return an iterable of NeutronWorkers that will be
   spawned in their own process.

DocImpact

Closes-Bug: #1463129
Change-Id: Ib99954678c2b4f32f486b537979d446aafbea07b

14 files changed:
etc/neutron.conf
neutron/callbacks/resources.py
neutron/manager.py
neutron/neutron_plugin_base_v2.py
neutron/plugins/ml2/driver_api.py
neutron/plugins/ml2/managers.py
neutron/plugins/ml2/plugin.py
neutron/server/__init__.py
neutron/service.py
neutron/services/l3_router/l3_router_plugin.py
neutron/services/service_base.py
neutron/tests/functional/test_server.py
neutron/worker.py [new file with mode: 0644]
neutron/wsgi.py

index 3cca29c2bf028474b328ee8608d8ac92b5742030..716746292b5a9cb65bf5b68508f61b86c37214be 100644 (file)
 # ========== end of items for VLAN trunking networks ==========
 
 # =========== WSGI parameters related to the API server ==============
-# Number of separate worker processes to spawn.  A value of 0 runs the
-# worker thread in the current process.  Greater than 0 launches that number of
-# child processes as workers.  The parent process manages them.  If not
-# specified, the default value is equal to the number of CPUs available to
-# achieve best performance.
+# Number of separate API worker processes to spawn. If not specified or < 1,
+# the default value is equal to the number of CPUs available.
 # api_workers = <number of CPUs>
 
-# Number of separate RPC worker processes to spawn.  The default, 0, runs the
-# worker thread in the current process.  Greater than 0 launches that number of
-# child processes as RPC workers.  The parent process manages them.
-# This feature is experimental until issues are addressed and testing has been
-# enabled for various plugins for compatibility.
-# rpc_workers = 0
+# Number of separate RPC worker processes to spawn. If not specified or < 1,
+# a single RPC worker process is spawned by the parent process.
+# rpc_workers = 1
 
 # Timeout for client connections socket operations. If an
 # incoming connection is idle for this number of seconds it
index 1544fe5a4b30460d898d8b2394f15bd157946c10..60c218ad378bca7d04d3856fd1ef9c996b84602e 100644 (file)
@@ -12,6 +12,7 @@
 
 # String literals representing core resources.
 PORT = 'port'
+PROCESS = 'process'
 ROUTER = 'router'
 ROUTER_GATEWAY = 'router_gateway'
 ROUTER_INTERFACE = 'router_interface'
index 0e3a16cb2edc2ccec76bc8026943d3f4df56b6f4..7a174507fddc8e5e75c796fc5d70f3b6580a442f 100644 (file)
@@ -246,3 +246,8 @@ class NeutronManager(object):
         service_plugins = cls.get_instance().service_plugins
         return dict((x, weakref.proxy(y))
                     for x, y in six.iteritems(service_plugins))
+
+    @classmethod
+    def get_unique_service_plugins(cls):
+        service_plugins = cls.get_instance().service_plugins
+        return tuple(weakref.proxy(x) for x in set(service_plugins.values()))
index 374dd19e7ef76bad424df28ceb336de502a04047..79a85c4c83eebd8c551290ebaa6959a870bc1e71 100644 (file)
@@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object):
         """
         return (self.__class__.start_rpc_listeners !=
                 NeutronPluginBaseV2.start_rpc_listeners)
+
+    def get_workers(self):
+        """Returns a collection NeutronWorker instances
+
+        If a plugin needs to define worker processes outside of API/RPC workers
+        then it will override this and return a collection of NeutronWorker
+        instances
+        """
+        return ()
index c54ab1ba35abadbb95303f8fd6025d27d63e1139..db25c8d5f12a2c16ed13407f80aa2116aac602d9 100644 (file)
@@ -888,6 +888,14 @@ class MechanismDriver(object):
         """
         pass
 
+    def get_workers(self):
+        """Get any NeutronWorker instances that should have their own process
+
+        Any driver that needs to run processes separate from the API or RPC
+        workers, can return a sequence of NeutronWorker instances.
+        """
+        return ()
+
 
 @six.add_metaclass(abc.ABCMeta)
 class ExtensionDriver(object):
index 40179dfe25eac4a1c9c329638a01c29f407ea255..a194aed0a7c62dba9f0b4ac6b82b5e4e9f86cf6f 100644 (file)
@@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
                 return False
         return True
 
+    def get_workers(self):
+        workers = []
+        for driver in self.ordered_mech_drivers:
+            workers += driver.obj.get_workers()
+        return workers
+
 
 class ExtensionManager(stevedore.named.NamedExtensionManager):
     """Manage extension drivers using drivers."""
index 4cdf98a40e7c8f505cc7ea51f4e0cab3692f988e..d3f42dcf828a20ebc234b08431b3a68bf6aee7ca 100644 (file)
@@ -1539,3 +1539,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             if port:
                 return port.id
         return device
+
+    def get_workers(self):
+        return self.mechanism_manager.get_workers()
index c6c72e28422200033ec0e91e29cdda068438c381..688b7542142c9bea80a67dd816c8e906c6d33ca4 100644 (file)
@@ -50,6 +50,10 @@ def main():
         else:
             rpc_thread = pool.spawn(neutron_rpc.wait)
 
+            plugin_workers = service.start_plugin_workers()
+            for worker in plugin_workers:
+                pool.spawn(worker.wait)
+
             # api and rpc should die together.  When one dies, kill the other.
             rpc_thread.link(lambda gt: api_thread.kill())
             api_thread.link(lambda gt: rpc_thread.kill())
index 6b1eee248b19d3af10cc59a8eef695cad1c7cd8e..856551ffb96735ae0c4eb3e9bf682d72ac3e5372 100644 (file)
@@ -32,6 +32,7 @@ from neutron import context
 from neutron.db import api as session
 from neutron.i18n import _LE, _LI
 from neutron import manager
+from neutron import worker
 from neutron import wsgi
 
 
@@ -44,7 +45,7 @@ service_opts = [
                       'If not specified, the default is equal to the number '
                       'of CPUs available for best performance.')),
     cfg.IntOpt('rpc_workers',
-               default=0,
+               default=1,
                help=_('Number of RPC worker processes for service')),
     cfg.IntOpt('periodic_fuzzy_delay',
                default=5,
@@ -108,13 +109,28 @@ def serve_wsgi(cls):
     return service
 
 
-class RpcWorker(common_service.ServiceBase):
+def start_plugin_workers():
+    launchers = []
+    # NOTE(twilson) get_service_plugins also returns the core plugin
+    for plugin in manager.NeutronManager.get_unique_service_plugins():
+        # TODO(twilson) Instead of defaulting here, come up with a good way to
+        # share a common get_workers default between NeutronPluginBaseV2 and
+        # ServicePluginBase
+        for plugin_worker in getattr(plugin, 'get_workers', tuple)():
+            launcher = common_service.ProcessLauncher(cfg.CONF)
+            launcher.launch_service(plugin_worker)
+            launchers.append(launcher)
+    return launchers
+
+
+class RpcWorker(worker.NeutronWorker):
     """Wraps a worker to be handled by ProcessLauncher"""
     def __init__(self, plugin):
         self._plugin = plugin
         self._servers = []
 
     def start(self):
+        super(RpcWorker, self).start()
         self._servers = self._plugin.start_rpc_listeners()
 
     def wait(self):
@@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase):
 def serve_rpc():
     plugin = manager.NeutronManager.get_plugin()
 
+    if cfg.CONF.rpc_workers < 1:
+        cfg.CONF.set_override('rpc_workers', 1)
+
     # If 0 < rpc_workers then start_rpc_listeners would be called in a
     # subprocess and we cannot simply catch the NotImplementedError.  It is
     # simpler to check this up front by testing whether the plugin supports
@@ -164,22 +183,14 @@ def serve_rpc():
     try:
         rpc = RpcWorker(plugin)
 
-        if cfg.CONF.rpc_workers < 1:
-            LOG.debug('starting rpc directly, workers=%s',
-                      cfg.CONF.rpc_workers)
-            rpc.start()
-            return rpc
-        else:
-            # dispose the whole pool before os.fork, otherwise there will
-            # be shared DB connections in child processes which may cause
-            # DB errors.
-            LOG.debug('using launcher for rpc, workers=%s',
-                      cfg.CONF.rpc_workers)
-            session.dispose()
-            launcher = common_service.ProcessLauncher(cfg.CONF,
-                                                      wait_interval=1.0)
-            launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
-            return launcher
+        # dispose the whole pool before os.fork, otherwise there will
+        # be shared DB connections in child processes which may cause
+        # DB errors.
+        LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
+        session.dispose()
+        launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
+        launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+        return launcher
     except Exception:
         with excutils.save_and_reraise_exception():
             LOG.exception(_LE('Unrecoverable error: please check log for '
@@ -188,7 +199,7 @@ def serve_rpc():
 
 def _get_api_workers():
     workers = cfg.CONF.api_workers
-    if workers is None:
+    if not workers:
         workers = processutils.get_worker_count()
     return workers
 
index 4c4e96ae5bfa031012736b0db42f8ad966bea605..85daa4ea1d3aed7760e7945c978ae3bb6051bddc 100644 (file)
@@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db
 from neutron.db import l3_hascheduler_db
 from neutron.plugins.common import constants
 from neutron.quota import resource_registry
+from neutron.services import service_base
 
 
-class L3RouterPlugin(common_db_mixin.CommonDbMixin,
+class L3RouterPlugin(service_base.ServicePluginBase,
+                     common_db_mixin.CommonDbMixin,
                      extraroute_db.ExtraRoute_db_mixin,
                      l3_hamode_db.L3_HA_NAT_db_mixin,
                      l3_gwmode_db.L3_NAT_db_mixin,
index acae7a0f50b2b4fef16469a51edb4e4fff8cfd1f..3ae9f6329f407cec6eeb063e132002e8f0250170 100644 (file)
@@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface):
         """Return string description of the plugin."""
         pass
 
+    def get_workers(self):
+        """Returns a collection of NeutronWorkers"""
+        return ()
+
 
 def load_drivers(service_type, plugin):
     """Loads drivers for specific service.
index 8f81f68495694477b7e7b8a6bc9d2ce724ba5f2e..48891bb7b99037343cdfd4203702c90b026eeb98 100644 (file)
@@ -27,6 +27,7 @@ import psutil
 from neutron.agent.linux import utils
 from neutron import service
 from neutron.tests import base
+from neutron import worker
 from neutron import wsgi
 
 
@@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer):
     def test_restart_rpc_on_sighup_multiple_workers(self):
         self._test_restart_service_on_sighup(service=self._serve_rpc,
                                              workers=2)
+
+
+class TestPluginWorker(TestNeutronServer):
+    """Ensure that a plugin returning Workers spawns workers"""
+
+    def setUp(self):
+        super(TestPluginWorker, self).setUp()
+        self.setup_coreplugin(TARGET_PLUGIN)
+        self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
+        self.plugin = self._plugin_patcher.start()
+
+    def _start_plugin(self, workers=0):
+        with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
+            gp.return_value = self.plugin
+            launchers = service.start_plugin_workers()
+            for launcher in launchers:
+                launcher.wait()
+
+    def test_start(self):
+        class FakeWorker(worker.NeutronWorker):
+            def start(self):
+                pass
+
+            def wait(self):
+                pass
+
+            def stop(self):
+                pass
+
+            def reset(self):
+                pass
+
+        # Make both ABC happy and ensure 'self' is correct
+        FakeWorker.reset = self._fake_reset
+        workers = [FakeWorker()]
+        self.plugin.return_value.get_workers.return_value = workers
+        self._test_restart_service_on_sighup(service=self._start_plugin,
+                                             workers=len(workers))
diff --git a/neutron/worker.py b/neutron/worker.py
new file mode 100644 (file)
index 0000000..c49d742
--- /dev/null
@@ -0,0 +1,40 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_service import service
+
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
+
+
+class NeutronWorker(service.ServiceBase):
+    """Partial implementation of the ServiceBase ABC
+
+    Subclasses will still need to add the other abstractmethods defined in
+    service.ServiceBase. See oslo_service for more details.
+
+    If a plugin needs to handle synchornization with the Neutron database and
+    do this only once instead of in every API worker, for instance, it would
+    define a NeutronWorker class and the plugin would have get_workers return
+    an array of NeutronWorker instnaces. For example:
+        class MyPlugin(...):
+            def get_workers(self):
+                return [MyPluginWorker()]
+
+        class MyPluginWorker(NeutronWorker):
+            def start(self):
+                super(MyPluginWorker, self).start()
+                do_sync()
+    """
+    def start(self):
+        registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
index cb302d306dc6abb9245a09d8f28b3e79ac671ce7..dacbadf8eafd4e5e26c4db41d855fc10c281c5a5 100644 (file)
@@ -44,6 +44,7 @@ from neutron.common import exceptions as exception
 from neutron import context
 from neutron.db import api
 from neutron.i18n import _LE, _LI
+from neutron import worker
 
 socket_opts = [
     cfg.IntOpt('backlog',
@@ -102,7 +103,7 @@ def encode_body(body):
     return body
 
 
-class WorkerService(common_service.ServiceBase):
+class WorkerService(worker.NeutronWorker):
     """Wraps a worker to be handled by ProcessLauncher"""
     def __init__(self, service, application):
         self._service = service
@@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase):
         self._server = None
 
     def start(self):
+        super(WorkerService, self).start()
         # When api worker is stopped it kills the eventlet wsgi server which
         # internally closes the wsgi server socket object. This server socket
         # object becomes not usable which leads to "Bad file descriptor"