From: Carl Baldwin Date: Fri, 12 Jul 2013 20:45:38 +0000 (+0000) Subject: Creates multiple worker processes for API server X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=3d669cbd0e650853caed7ddaf1b6a5e40dba2501;p=openstack-build%2Fneutron-build.git Creates multiple worker processes for API server This change to the WSGI code uses openstack.common.service to create multiple worker processes to handle API load. The main process will start up a configurable (workers=??) number of child processes which will all listen on the bind port. The main process becomes the parent and manages the children. The parent is not a worker. Backwards compatibility is preserved by setting api_workers to 0, the default. In this case, no separate worker processes are spawned and the worker threads run in the main process. Implement blueprint multi-workers-for-api-server Change-Id: Iffa76041d0055840ccca852814b0e71f17a950ac --- diff --git a/etc/neutron.conf b/etc/neutron.conf index 5abf66917..e685af906 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -239,6 +239,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier # =========== end of items for agent scheduler extension ===== # =========== WSGI parameters related to the API server ============== +# Number of separate worker processes to spawn. The default, 0, runs the +# worker thread in the current process. Greater than 0 launches that number of +# child processes as workers. The parent process manages them. +# api_workers = 0 # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when # starting API server. Not supported on OS X. # tcp_keepidle = 600 diff --git a/neutron/service.py b/neutron/service.py index cd88a6015..640b0a930 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -36,6 +36,9 @@ service_opts = [ cfg.IntOpt('periodic_interval', default=40, help=_('Seconds between running periodic tasks')), + cfg.IntOpt('api_workers', + default=0, + help=_('Number of separate worker processes for service')), cfg.IntOpt('periodic_fuzzy_delay', default=5, help=_('Range of seconds to randomly delay when starting the ' @@ -111,7 +114,8 @@ def _run_wsgi(app_name): LOG.error(_('No known API applications configured.')) return server = wsgi.Server("Neutron") - server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host) + server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, + workers=cfg.CONF.api_workers) # Dump all option values here after all options are parsed cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"), diff --git a/neutron/tests/unit/test_wsgi.py b/neutron/tests/unit/test_wsgi.py index a8be55628..52e946a9c 100644 --- a/neutron/tests/unit/test_wsgi.py +++ b/neutron/tests/unit/test_wsgi.py @@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase): server.stop() server.wait() + @mock.patch('neutron.wsgi.ProcessLauncher') + def test_start_multiple_workers(self, ProcessLauncher): + launcher = ProcessLauncher.return_value + + server = wsgi.Server("test_multiple_processes") + server.start(None, 0, host="127.0.0.1", workers=2) + launcher.running = True + launcher.launch_service.assert_called_once_with(server._server, + workers=2) + + server.stop() + self.assertFalse(launcher.running) + + server.wait() + launcher.wait.assert_called_once_with() + def test_start_random_port_with_ipv6(self): server = wsgi.Server("test_random_port") server.start(None, 0, host="::1") diff --git a/neutron/wsgi.py b/neutron/wsgi.py index bf48af9fa..908272df3 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -37,9 +37,11 @@ import webob.exc from neutron.common import constants from neutron.common import exceptions as exception from neutron import context +from neutron.openstack.common.db.sqlalchemy import session from neutron.openstack.common import gettextutils from neutron.openstack.common import jsonutils from neutron.openstack.common import log as logging +from neutron.openstack.common.service import ProcessLauncher socket_opts = [ cfg.IntOpt('backlog', @@ -82,12 +84,39 @@ def run_server(application, port): eventlet.wsgi.server(sock, application) +class WorkerService(object): + """Wraps a worker to be handled by ProcessLauncher""" + def __init__(self, service, application): + self._service = service + self._application = application + self._server = None + + def start(self): + # We may have just forked from parent process. A quick disposal of the + # existing sql connections avoids producting 500 errors later when they + # are discovered to be broken. + session.get_engine(sqlite_fk=True).pool.dispose() + self._server = self._service.pool.spawn(self._service._run, + self._application, + self._service._socket) + + def wait(self): + self._service.pool.waitall() + + def stop(self): + if isinstance(self._server, eventlet.greenthread.GreenThread): + self._server.kill() + self._server = None + + class Server(object): """Server class to manage multiple WSGI sockets and applications.""" def __init__(self, name, threads=1000): self.pool = eventlet.GreenPool(threads) self.name = name + self._launcher = None + self._server = None def _get_socket(self, host, port, backlog): bind_addr = (host, port) @@ -166,7 +195,7 @@ class Server(object): return sock - def start(self, application, port, host='0.0.0.0'): + def start(self, application, port, host='0.0.0.0', workers=0): """Run a WSGI server with the given application.""" self._host = host self._port = port @@ -175,7 +204,14 @@ class Server(object): self._socket = self._get_socket(self._host, self._port, backlog=backlog) - self._server = self.pool.spawn(self._run, application, self._socket) + if workers < 1: + # For the case where only one process is required. + self._server = self.pool.spawn(self._run, application, + self._socket) + else: + self._launcher = ProcessLauncher() + self._server = WorkerService(self, application) + self._launcher.launch_service(self._server, workers=workers) @property def host(self): @@ -186,12 +222,19 @@ class Server(object): return self._socket.getsockname()[1] if self._socket else self._port def stop(self): - self._server.kill() + if self._launcher: + # The process launcher does not support stop or kill. + self._launcher.running = False + else: + self._server.kill() def wait(self): """Wait until all servers have completed running.""" try: - self.pool.waitall() + if self._launcher: + self._launcher.wait() + else: + self.pool.waitall() except KeyboardInterrupt: pass