cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
+ cfg.IntOpt('api_workers',
+ default=0,
+ help=_('Number of separate worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
LOG.error(_('No known API applications configured.'))
return
server = wsgi.Server("Neutron")
- server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
+ server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
+ workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"),
server.stop()
server.wait()
+ @mock.patch('neutron.wsgi.ProcessLauncher')
+ def test_start_multiple_workers(self, ProcessLauncher):
+ launcher = ProcessLauncher.return_value
+
+ server = wsgi.Server("test_multiple_processes")
+ server.start(None, 0, host="127.0.0.1", workers=2)
+ launcher.running = True
+ launcher.launch_service.assert_called_once_with(server._server,
+ workers=2)
+
+ server.stop()
+ self.assertFalse(launcher.running)
+
+ server.wait()
+ launcher.wait.assert_called_once_with()
+
def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1")
from neutron.common import constants
from neutron.common import exceptions as exception
from neutron import context
+from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import gettextutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
+from neutron.openstack.common.service import ProcessLauncher
socket_opts = [
cfg.IntOpt('backlog',
eventlet.wsgi.server(sock, application)
+class WorkerService(object):
+ """Wraps a worker to be handled by ProcessLauncher"""
+ def __init__(self, service, application):
+ self._service = service
+ self._application = application
+ self._server = None
+
+ def start(self):
+ # We may have just forked from parent process. A quick disposal of the
+ # existing sql connections avoids producting 500 errors later when they
+ # are discovered to be broken.
+ session.get_engine(sqlite_fk=True).pool.dispose()
+ self._server = self._service.pool.spawn(self._service._run,
+ self._application,
+ self._service._socket)
+
+ def wait(self):
+ self._service.pool.waitall()
+
+ def stop(self):
+ if isinstance(self._server, eventlet.greenthread.GreenThread):
+ self._server.kill()
+ self._server = None
+
+
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.name = name
+ self._launcher = None
+ self._server = None
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
return sock
- def start(self, application, port, host='0.0.0.0'):
+ def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
- self._server = self.pool.spawn(self._run, application, self._socket)
+ if workers < 1:
+ # For the case where only one process is required.
+ self._server = self.pool.spawn(self._run, application,
+ self._socket)
+ else:
+ self._launcher = ProcessLauncher()
+ self._server = WorkerService(self, application)
+ self._launcher.launch_service(self._server, workers=workers)
@property
def host(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
- self._server.kill()
+ if self._launcher:
+ # The process launcher does not support stop or kill.
+ self._launcher.running = False
+ else:
+ self._server.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
- self.pool.waitall()
+ if self._launcher:
+ self._launcher.wait()
+ else:
+ self.pool.waitall()
except KeyboardInterrupt:
pass