from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common import service
from neutron import wsgi
LOG = logging.getLogger(__name__)
self._socket = eventlet.listen(file_socket,
family=socket.AF_UNIX,
backlog=backlog)
- if workers < 1:
- # For the case where only one process is required.
- self._server = self.pool.spawn_n(self._run, application,
- self._socket)
- else:
- # Minimize the cost of checking for child exit by extending the
- # wait interval past the default of 0.01s.
- self._launcher = service.ProcessLauncher(wait_interval=1.0)
- self._server = WorkerService(self, application)
- self._launcher.launch_service(self._server, workers=workers)
+
+ self._launch(application, workers=workers)
def _run(self, application, socket):
"""Start a WSGI service in a new green thread."""
help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers',
default=0,
- help=_('Number of separate worker processes for service')),
+ help=_('Number of separate API worker processes for service')),
cfg.IntOpt('rpc_workers',
default=0,
help=_('Number of RPC worker processes for service')),
def test_start(self):
mock_app = mock.Mock()
- with mock.patch.object(self.server, 'pool') as pool:
- self.server.start(mock_app, '/the/path', workers=0, backlog=128)
+ with mock.patch.object(self.server, '_launch') as launcher:
+ self.server.start(mock_app, '/the/path', workers=5, backlog=128)
self.eventlet.assert_has_calls([
mock.call.listen(
'/the/path',
backlog=128
)]
)
- pool.spawn_n.assert_called_once_with(
- self.server._run,
- mock_app,
- self.eventlet.listen.return_value
- )
-
- @mock.patch('neutron.openstack.common.service.ProcessLauncher')
- def test_start_multiple_workers(self, process_launcher):
- launcher = process_launcher.return_value
-
- mock_app = mock.Mock()
- self.server.start(mock_app, '/the/path', workers=2, backlog=128)
- launcher.running = True
- launcher.launch_service.assert_called_once_with(self.server._server,
- workers=2)
-
- self.server.stop()
- self.assertFalse(launcher.running)
-
- self.server.wait()
- launcher.wait.assert_called_once_with()
+ launcher.assert_called_once_with(mock_app, workers=5)
def test_run(self):
with mock.patch.object(agent, 'logging') as logging:
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
- launcher.running = True
- launcher.launch_service.assert_called_once_with(server._server,
- workers=2)
+ launcher.launch_service.assert_called_once_with(mock.ANY, workers=2)
server.stop()
- self.assertFalse(launcher.running)
+ launcher.stop.assert_called_once_with()
server.wait()
launcher.wait.assert_called_once_with()
self._service._socket)
def wait(self):
- self._service.pool.waitall()
+ if isinstance(self._server, eventlet.greenthread.GreenThread):
+ self._server.wait()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.pool = eventlet.GreenPool(threads)
self.name = name
- self._launcher = None
self._server = None
def _get_socket(self, host, port, backlog):
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
+
+ self._launch(application, workers)
+
+ def _launch(self, application, workers=0):
+ service = WorkerService(self, application)
if workers < 1:
- # For the case where only one process is required.
- self._server = self.pool.spawn(self._run, application,
- self._socket)
+ # The API service should run in the current process.
+ self._server = service
+ service.start()
systemd.notify_once()
else:
+ # The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
- self._launcher = common_service.ProcessLauncher(wait_interval=1.0)
- self._server = WorkerService(self, application)
- self._launcher.launch_service(self._server, workers=workers)
+ self._server = common_service.ProcessLauncher(wait_interval=1.0)
+ self._server.launch_service(service, workers=workers)
@property
def host(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
- if self._launcher:
- # The process launcher does not support stop or kill.
- self._launcher.running = False
- else:
- self._server.kill()
+ self._server.stop()
def wait(self):
"""Wait until all servers have completed running."""
try:
- if self._launcher:
- self._launcher.wait()
- else:
- self.pool.waitall()
+ self._server.wait()
except KeyboardInterrupt:
pass