]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Adds multiple RPC worker processes to neutron server
authorCarl Baldwin <carl.baldwin@hp.com>
Tue, 11 Feb 2014 05:55:29 +0000 (05:55 +0000)
committerThomas Goirand <thomas@goirand.fr>
Thu, 13 Mar 2014 07:20:42 +0000 (15:20 +0800)
blueprint multiple-rpc-workers

Co-Authored-By: Terry Wilson<twilson@redhat.com>
Change-Id: I51f2a52add6c11af905e6f1e6e45d31731ebbb5d

etc/neutron.conf
neutron/neutron_plugin_base_v2.py
neutron/openstack/common/rpc/amqp.py
neutron/plugins/ml2/plugin.py
neutron/server/__init__.py
neutron/service.py
neutron/tests/unit/ml2/test_port_binding.py
neutron/tests/unit/ml2/test_security_group.py

index e6f699ae404ee8b47856ef40b4d28ea21c12f30e..8aa5cb2879ace937a875927d5418e8db79cf07c3 100644 (file)
@@ -257,6 +257,11 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
 # child processes as workers.  The parent process manages them.
 # api_workers = 0
 
+# Number of separate RPC worker processes to spawn.  The default, 0, runs the
+# worker thread in the current process.  Greater than 0 launches that number of
+# child processes as RPC workers.  The parent process manages them.
+# rpc_workers = 0
+
 # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
 # starting API server. Not supported on OS X.
 # tcp_keepidle = 600
index 603f23e6dfdb814b395716e8b5e72b2bff451962..046ed1ae846f286a09adcb3f4e15f7cdf3c85b94 100644 (file)
@@ -324,3 +324,15 @@ class NeutronPluginBaseV2(object):
         :param id: UUID representing the port to delete.
         """
         pass
+
+    def start_rpc_listener(self):
+        """Start the rpc listener.
+
+        Most plugins start an RPC listener implicitly on initialization.  In
+        order to support multiple process RPC, the plugin needs to expose
+        control over when this is started.
+
+        .. note:: this method is optional, as it was not part of the originally
+                  defined plugin API.
+        """
+        raise NotImplementedError
index b74fad059850558d1ea95e7eaf6561473f34ebb2..a0fa8cef22aea4fbb2d4887635a2aade99765442 100644 (file)
@@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection):
                                            ack_on_error)
 
     def consume_in_thread(self):
-        self.connection.consume_in_thread()
+        return self.connection.consume_in_thread()
 
     def __getattr__(self, key):
         """Proxy all other calls to the Connection instance."""
index 0e07cff9a79d6db1a93e1d8c70e6aff651b61401..77032a6c1af4acda8d2070d6b9748b247b3c0671 100644 (file)
@@ -122,13 +122,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
             dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
         )
+
+    def start_rpc_listener(self):
         self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
         self.topic = topics.PLUGIN
         self.conn = c_rpc.create_connection(new=True)
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        self.conn.consume_in_thread()
+        return self.conn.consume_in_thread()
 
     def _process_provider_segment(self, segment):
         network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
index 8c3ebc66c60a97ad4426ad28df5d1cf105cc9fd6..d5177cb237b64515dc50d9f2839515d99ce3ffc6 100755 (executable)
@@ -27,8 +27,11 @@ from neutron.common import config
 from neutron import service
 
 from neutron.openstack.common import gettextutils
+from neutron.openstack.common import log as logging
 gettextutils.install('neutron', lazy=True)
 
+LOG = logging.getLogger(__name__)
+
 
 def main():
     eventlet.monkey_patch()
@@ -40,8 +43,25 @@ def main():
                    " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                    " the '--config-file' option!"))
     try:
-        neutron_service = service.serve_wsgi(service.NeutronApiService)
-        neutron_service.wait()
+        pool = eventlet.GreenPool()
+
+        neutron_api = service.serve_wsgi(service.NeutronApiService)
+        api_thread = pool.spawn(neutron_api.wait)
+
+        try:
+            neutron_rpc = service.serve_rpc()
+        except NotImplementedError:
+            LOG.info(_("RPC was already started in parent process by plugin."))
+        else:
+            rpc_thread = pool.spawn(neutron_rpc.wait)
+
+            # api and rpc should die together.  When one dies, kill the other.
+            rpc_thread.link(lambda gt: api_thread.kill())
+            api_thread.link(lambda gt: rpc_thread.kill())
+
+        pool.waitall()
+    except KeyboardInterrupt:
+        pass
     except RuntimeError as e:
         sys.exit(_("ERROR: %s") % e)
 
index 2444baca89ca8ec5a5c17e363ed68362c80267dd..f035156568641c5f50d6812591609b7f02549a0b 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import eventlet
 import inspect
 import logging as std_logging
 import os
@@ -23,11 +24,15 @@ from oslo.config import cfg
 from neutron.common import config
 from neutron.common import legacy
 from neutron import context
+from neutron import manager
+from neutron import neutron_plugin_base_v2
+from neutron.openstack.common.db.sqlalchemy import session
 from neutron.openstack.common import excutils
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
 from neutron.openstack.common.rpc import service
+from neutron.openstack.common.service import ProcessLauncher
 from neutron import wsgi
 
 
@@ -38,6 +43,9 @@ service_opts = [
     cfg.IntOpt('api_workers',
                default=0,
                help=_('Number of separate worker processes for service')),
+    cfg.IntOpt('rpc_workers',
+               default=0,
+               help=_('Number of RPC worker processes for service')),
     cfg.IntOpt('periodic_fuzzy_delay',
                default=5,
                help=_('Range of seconds to randomly delay when starting the '
@@ -108,6 +116,61 @@ def serve_wsgi(cls):
     return service
 
 
+class RpcWorker(object):
+    """Wraps a worker to be handled by ProcessLauncher"""
+    def __init__(self, plugin):
+        self._plugin = plugin
+        self._server = None
+
+    def start(self):
+        # We may have just forked from parent process.  A quick disposal of the
+        # existing sql connections avoids producing errors later when they are
+        # discovered to be broken.
+        session.get_engine(sqlite_fk=True).pool.dispose()
+        self._server = self._plugin.start_rpc_listener()
+
+    def wait(self):
+        if isinstance(self._server, eventlet.greenthread.GreenThread):
+            self._server.wait()
+
+    def stop(self):
+        if isinstance(self._server, eventlet.greenthread.GreenThread):
+            self._server.kill()
+            self._server = None
+
+
+def serve_rpc():
+    plugin = manager.NeutronManager.get_plugin()
+
+    # If 0 < rpc_workers then start_rpc_listener would be called in a
+    # subprocess and we cannot simply catch the NotImplementedError.  It is
+    # simpler to check this up front by testing whether the plugin overrides
+    # start_rpc_listener.
+    base = neutron_plugin_base_v2.NeutronPluginBaseV2
+    if plugin.__class__.start_rpc_listener == base.start_rpc_listener:
+        LOG.debug(_("Active plugin doesn't implement start_rpc_listener"))
+        if 0 < cfg.CONF.rpc_workers:
+            msg = _("'rpc_workers = %d' ignored because start_rpc_listener "
+                    "is not implemented.")
+            LOG.error(msg, cfg.CONF.rpc_workers)
+        raise NotImplementedError
+
+    try:
+        rpc = RpcWorker(plugin)
+
+        if cfg.CONF.rpc_workers < 1:
+            rpc.start()
+            return rpc
+        else:
+            launcher = ProcessLauncher(wait_interval=1.0)
+            launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+            return launcher
+    except Exception:
+        with excutils.save_and_reraise_exception():
+            LOG.exception(_('Unrecoverable error: please check log '
+                            'for details.'))
+
+
 def _run_wsgi(app_name):
     app = config.load_paste_app(app_name)
     if not app:
index 86e066892df2e20a419af5450dccea089af43428..d2c0cb1b7ca89185cde3ccd4fdf6d70ed9a4836f 100644 (file)
@@ -40,6 +40,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
         super(PortBindingTestCase, self).setUp(PLUGIN_NAME)
         self.port_create_status = 'DOWN'
         self.plugin = manager.NeutronManager.get_plugin()
+        self.plugin.start_rpc_listener()
 
     def _check_response(self, port, vif_type, has_port_filter, bound):
         self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
index ff0bc39f5e9b98f3d3a52b8a647537799ce5e043..34d71a71efef810f8febd7e7458757027e2ea79c 100644 (file)
@@ -51,6 +51,11 @@ class Ml2SecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
 class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
                             test_sg.TestSecurityGroups,
                             test_sg_rpc.SGNotificationTestMixin):
+    def setUp(self):
+        super(TestMl2SecurityGroups, self).setUp()
+        plugin = manager.NeutronManager.get_plugin()
+        plugin.start_rpc_listener()
+
     def test_security_group_get_port_from_device(self):
         with self.network() as n:
             with self.subnet(n):