]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Creates multiple worker processes for API server
authorCarl Baldwin <carl.baldwin@hp.com>
Fri, 12 Jul 2013 20:45:38 +0000 (20:45 +0000)
committerCarl Baldwin <carl.baldwin@hp.com>
Thu, 19 Sep 2013 18:52:13 +0000 (18:52 +0000)
This change to the WSGI code uses openstack.common.service to create
multiple worker processes to handle API load.  The main process will
start up a configurable (workers=??) number of child processes which
will all listen on the bind port.  The main process becomes the parent
and manages the children.  The parent is not a worker.

Backwards compatibility is preserved by setting api_workers to 0, the
default.  In this case, no separate worker processes are spawned and
the worker threads run in the main process.

Implement blueprint multi-workers-for-api-server

Change-Id: Iffa76041d0055840ccca852814b0e71f17a950ac

etc/neutron.conf
neutron/service.py
neutron/tests/unit/test_wsgi.py
neutron/wsgi.py

index 5abf669175794d9340537f4401b0a9013f2c5a49..e685af9065aaec930ee8db8ff1dd2b80276483c9 100644 (file)
@@ -239,6 +239,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
 # ===========  end of items for agent scheduler extension =====
 
 # =========== WSGI parameters related to the API server ==============
+# Number of separate worker processes to spawn.  The default, 0, runs the
+# worker thread in the current process.  Greater than 0 launches that number of
+# child processes as workers.  The parent process manages them.
+# api_workers = 0
 # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
 # starting API server. Not supported on OS X.
 # tcp_keepidle = 600
index cd88a6015685e16e951c71139a63b3e67af8ce7c..640b0a9300b40aaea426c5e87de6dd7f4948cf0c 100644 (file)
@@ -36,6 +36,9 @@ service_opts = [
     cfg.IntOpt('periodic_interval',
                default=40,
                help=_('Seconds between running periodic tasks')),
+    cfg.IntOpt('api_workers',
+               default=0,
+               help=_('Number of separate worker processes for service')),
     cfg.IntOpt('periodic_fuzzy_delay',
                default=5,
                help=_('Range of seconds to randomly delay when starting the '
@@ -111,7 +114,8 @@ def _run_wsgi(app_name):
         LOG.error(_('No known API applications configured.'))
         return
     server = wsgi.Server("Neutron")
-    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
+    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
+                 workers=cfg.CONF.api_workers)
     # Dump all option values here after all options are parsed
     cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
     LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"),
index a8be556289530ec391299d05729265851f5de519..52e946a9cb6750b4f90d9ab7ebc5675a11bdf580 100644 (file)
@@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase):
         server.stop()
         server.wait()
 
+    @mock.patch('neutron.wsgi.ProcessLauncher')
+    def test_start_multiple_workers(self, ProcessLauncher):
+        launcher = ProcessLauncher.return_value
+
+        server = wsgi.Server("test_multiple_processes")
+        server.start(None, 0, host="127.0.0.1", workers=2)
+        launcher.running = True
+        launcher.launch_service.assert_called_once_with(server._server,
+                                                        workers=2)
+
+        server.stop()
+        self.assertFalse(launcher.running)
+
+        server.wait()
+        launcher.wait.assert_called_once_with()
+
     def test_start_random_port_with_ipv6(self):
         server = wsgi.Server("test_random_port")
         server.start(None, 0, host="::1")
index bf48af9fa0f848042828baf40ce0e01a8b9bb4c7..908272df37d06647a4b5b12bee27649473354d06 100644 (file)
@@ -37,9 +37,11 @@ import webob.exc
 from neutron.common import constants
 from neutron.common import exceptions as exception
 from neutron import context
+from neutron.openstack.common.db.sqlalchemy import session
 from neutron.openstack.common import gettextutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import log as logging
+from neutron.openstack.common.service import ProcessLauncher
 
 socket_opts = [
     cfg.IntOpt('backlog',
@@ -82,12 +84,39 @@ def run_server(application, port):
     eventlet.wsgi.server(sock, application)
 
 
+class WorkerService(object):
+    """Wraps a worker to be handled by ProcessLauncher"""
+    def __init__(self, service, application):
+        self._service = service
+        self._application = application
+        self._server = None
+
+    def start(self):
+        # We may have just forked from parent process.  A quick disposal of the
+        # existing sql connections avoids producting 500 errors later when they
+        # are discovered to be broken.
+        session.get_engine(sqlite_fk=True).pool.dispose()
+        self._server = self._service.pool.spawn(self._service._run,
+                                                self._application,
+                                                self._service._socket)
+
+    def wait(self):
+        self._service.pool.waitall()
+
+    def stop(self):
+        if isinstance(self._server, eventlet.greenthread.GreenThread):
+            self._server.kill()
+            self._server = None
+
+
 class Server(object):
     """Server class to manage multiple WSGI sockets and applications."""
 
     def __init__(self, name, threads=1000):
         self.pool = eventlet.GreenPool(threads)
         self.name = name
+        self._launcher = None
+        self._server = None
 
     def _get_socket(self, host, port, backlog):
         bind_addr = (host, port)
@@ -166,7 +195,7 @@ class Server(object):
 
         return sock
 
-    def start(self, application, port, host='0.0.0.0'):
+    def start(self, application, port, host='0.0.0.0', workers=0):
         """Run a WSGI server with the given application."""
         self._host = host
         self._port = port
@@ -175,7 +204,14 @@ class Server(object):
         self._socket = self._get_socket(self._host,
                                         self._port,
                                         backlog=backlog)
-        self._server = self.pool.spawn(self._run, application, self._socket)
+        if workers < 1:
+            # For the case where only one process is required.
+            self._server = self.pool.spawn(self._run, application,
+                                           self._socket)
+        else:
+            self._launcher = ProcessLauncher()
+            self._server = WorkerService(self, application)
+            self._launcher.launch_service(self._server, workers=workers)
 
     @property
     def host(self):
@@ -186,12 +222,19 @@ class Server(object):
         return self._socket.getsockname()[1] if self._socket else self._port
 
     def stop(self):
-        self._server.kill()
+        if self._launcher:
+            # The process launcher does not support stop or kill.
+            self._launcher.running = False
+        else:
+            self._server.kill()
 
     def wait(self):
         """Wait until all servers have completed running."""
         try:
-            self.pool.waitall()
+            if self._launcher:
+                self._launcher.wait()
+            else:
+                self.pool.waitall()
         except KeyboardInterrupt:
             pass