From 4576ca73eae6e2ec554b4e6785b9fd7ef310ac91 Mon Sep 17 00:00:00 2001 From: Zhiteng Huang Date: Wed, 8 Jan 2014 14:50:29 +0800 Subject: [PATCH] Enable multi-process for API service Due to the limit of Python interpreter, API service of Cinder can't really utilize underlying multi-core architecture even libraries like eventlet has been used. To make API service much more scalable, we'd adopt multi-process (worker) mode that has been used for long in Glance/Swift/Nova. The default behavior isn't changed with this patch, Cinder API service will still run in one process (default value of osapi_volume_workers is None). Implementation wise, a good portion of cinder/service.py has been removed because those content has been merged in Oslo version of service module. cinder/wsgi.py is also updated to adopt the change for multiple WSGI servers running in separate processes. Implement bp: multi-process-api-service DocImpact: 'New config option osapi_volume_workers is used to specify number of API service workers (OS processes) to launch for Cinder API service. Setting this config option to a proper value (e.g. osapi_volume_workers = # of CPU cores/threads of the machine) can greatly improve the total throughput of API service [# of API requests can be handled per second].' Also removed out-dated comments in bin/cinder-api due to the fact that this bug [1] has been fixed in eventlet 0.9.13 [1] https://bitbucket.org/eventlet/eventlet/issue/92/eventletgreen-override-of-oswaitpid Change-Id: I8361d0dc0d43040e48634ff1aee1324e5e0af466 --- bin/cinder-all | 10 +- bin/cinder-api | 10 +- bin/cinder-backup | 5 +- bin/cinder-volume | 13 +- cinder/service.py | 299 +++------------------------------- cinder/tests/test_service.py | 15 -- cinder/wsgi.py | 24 +-- etc/cinder/cinder.conf.sample | 4 + 8 files changed, 59 insertions(+), 321 deletions(-) diff --git a/bin/cinder-all b/bin/cinder-all index 6c3762d0e..a3b2fcf7f 100755 --- a/bin/cinder-all +++ b/bin/cinder-all @@ -62,17 +62,17 @@ if __name__ == '__main__': LOG = logging.getLogger('cinder.all') utils.monkey_patch() - servers = [] + launcher = service.process_launcher() # cinder-api try: - servers.append(service.WSGIService('osapi_volume')) + server = service.WSGIService('osapi_volume') + launcher.launch_service(server, workers=server.workers or 1) except (Exception, SystemExit): LOG.exception(_('Failed to load osapi_volume')) for binary in ['cinder-volume', 'cinder-scheduler', 'cinder-backup']: try: - servers.append(service.Service.create(binary=binary)) + launcher.launch_service(service.Service.create(binary=binary)) except (Exception, SystemExit): LOG.exception(_('Failed to load %s'), binary) - service.serve(*servers) - service.wait() + launcher.wait() diff --git a/bin/cinder-api b/bin/cinder-api index 5d4ea2615..953575c9e 100755 --- a/bin/cinder-api +++ b/bin/cinder-api @@ -17,10 +17,6 @@ """Starter script for Cinder OS API.""" -# NOTE(jdg): If we port over multi worker code from Nova -# we'll need to set monkey_patch(os=False), unless -# eventlet is updated/released to fix the root issue - import eventlet eventlet.monkey_patch() @@ -54,6 +50,8 @@ if __name__ == '__main__': version=version.version_string()) logging.setup("cinder") utils.monkey_patch() + + launcher = service.process_launcher() server = service.WSGIService('osapi_volume') - service.serve(server) - service.wait() + launcher.launch_service(server, workers=server.workers or 1) + launcher.wait() diff --git a/bin/cinder-backup b/bin/cinder-backup index c1555987c..2b5a634cc 100755 --- a/bin/cinder-backup +++ b/bin/cinder-backup @@ -54,7 +54,6 @@ if __name__ == '__main__': version=version.version_string()) logging.setup("cinder") utils.monkey_patch() - launcher = service.ProcessLauncher() server = service.Service.create(binary='cinder-backup') - launcher.launch_server(server) - launcher.wait() + service.serve(server) + service.wait() diff --git a/bin/cinder-volume b/bin/cinder-volume index c461c1ef7..0a04626c3 100755 --- a/bin/cinder-volume +++ b/bin/cinder-volume @@ -58,18 +58,17 @@ if __name__ == '__main__': version=version.version_string()) logging.setup("cinder") utils.monkey_patch() - if os.name == 'nt': - launcher = service - launcher.launch_server = service.serve - else: - launcher = service.ProcessLauncher() + # Note(zhiteng): Since Windows (os='nt') has already ignored monkey + # patching 'os' module, there is no need to treat it differently + # when creating launcher. + launcher = service.process_launcher() if CONF.enabled_backends: for backend in CONF.enabled_backends: host = "%s@%s" % (CONF.host, backend) server = service.Service.create(host=host, service_name=backend) - launcher.launch_server(server) + launcher.launch_service(server) else: server = service.Service.create(binary='cinder-volume') - launcher.launch_server(server) + launcher.launch_service(server) launcher.wait() diff --git a/cinder/service.py b/cinder/service.py index 5e8b20b95..1199779d0 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -18,16 +18,10 @@ """Generic Node base class for all workers that run on hosts.""" -import errno import inspect import os import random -import signal -import sys -import time -import eventlet -import greenlet from oslo.config import cfg from cinder import context @@ -37,6 +31,7 @@ from cinder.openstack.common import importutils from cinder.openstack.common import log as logging from cinder.openstack.common import loopingcall from cinder.openstack.common import rpc +from cinder.openstack.common import service from cinder import version from cinder import wsgi @@ -60,274 +55,15 @@ service_opts = [ help='IP address for OpenStack Volume API to listen'), cfg.IntOpt('osapi_volume_listen_port', default=8776, - help='port for os volume api to listen'), ] + help='port for os volume api to listen'), + cfg.IntOpt('osapi_volume_workers', + help='Number of workers for OpenStack Volume API service'), ] CONF = cfg.CONF CONF.register_opts(service_opts) -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super(SignalExit, self).__init__(exccode) - self.signo = signo - - -class Launcher(object): - """Launch one or more services and wait for them to complete.""" - - def __init__(self): - """Initialize the service launcher. - - :returns: None - - """ - self._services = [] - - @staticmethod - def run_server(server): - """Start and wait for a server to finish. - - :param server: Server to run and wait for. - :returns: None - - """ - server.start() - server.wait() - - def launch_server(self, server): - """Load and start the given server. - - :param server: The server you would like to start. - :returns: None - - """ - gt = eventlet.spawn(self.run_server, server) - self._services.append(gt) - - def stop(self): - """Stop all services which are currently running. - - :returns: None - - """ - for service in self._services: - service.kill() - - def wait(self): - """Waits until all services have been stopped, and then returns. - - :returns: None - - """ - def sigterm(sig, frame): - LOG.audit(_("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) - - for service in self._services: - try: - service.wait() - except greenlet.GreenletExit: - pass - - -class ServerWrapper(object): - def __init__(self, server, workers): - self.server = server - self.workers = workers - self.children = set() - self.forktimes = [] - self.failed = False - - -class ProcessLauncher(object): - def __init__(self): - self.children = {} - self.sigcaught = None - self.totalwrap = 0 - self.failedwrap = 0 - self.running = True - rfd, self.writepipe = os.pipe() - self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') - - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) - - def _handle_signal(self, signo, frame): - self.sigcaught = signo - self.running = False - - # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - - def _pipe_watcher(self): - # This will block until the write end is closed when the parent - # dies unexpectedly - self.readpipe.read() - - LOG.info(_('Parent process has died unexpectedly, exiting')) - - sys.exit(1) - - def _child_process(self, server): - # Setup child signal handlers differently - def _sigterm(*args): - signal.signal(signal.SIGTERM, signal.SIG_DFL) - raise SignalExit(signal.SIGTERM) - - signal.signal(signal.SIGTERM, _sigterm) - # Block SIGINT and let the parent send us a SIGTERM - # signal.signal(signal.SIGINT, signal.SIG_IGN) - # This differs from the behavior in nova in that we don't ignore this - # It allows the non-wsgi services to be terminated properly - signal.signal(signal.SIGINT, _sigterm) - - # Reopen the eventlet hub to make sure we don't share an epoll - # fd with parent and/or siblings, which would be bad - eventlet.hubs.use_hub() - - # Close write to ensure only parent has it open - os.close(self.writepipe) - # Create greenthread to watch for parent to close pipe - eventlet.spawn(self._pipe_watcher) - - # Reseed random number generator - random.seed() - - launcher = Launcher() - launcher.run_server(server) - - def _start_child(self, wrap): - if len(wrap.forktimes) > wrap.workers: - # Limit ourselves to one process a second (over the period of - # number of workers * 1 second). This will allow workers to - # start up quickly but ensure we don't fork off children that - # die instantly too quickly. - if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_('Forking too fast, sleeping')) - time.sleep(1) - - wrap.forktimes.pop(0) - - wrap.forktimes.append(time.time()) - - pid = os.fork() - if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.server) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.server.stop() - - os._exit(status) - - LOG.info(_('Started child %d'), pid) - - wrap.children.add(pid) - self.children[pid] = wrap - - return pid - - def launch_server(self, server, workers=1): - wrap = ServerWrapper(server, workers) - self.totalwrap = self.totalwrap + 1 - LOG.info(_('Starting %d workers'), wrap.workers) - while (self.running and len(wrap.children) < wrap.workers - and not wrap.failed): - self._start_child(wrap) - - def _wait_child(self): - try: - # Don't block if no child processes have exited - pid, status = os.waitpid(0, os.WNOHANG) - if not pid: - return None - except OSError as exc: - if exc.errno not in (errno.EINTR, errno.ECHILD): - raise - return None - - code = 0 - if os.WIFSIGNALED(status): - sig = os.WTERMSIG(status) - LOG.info(_('Child %(pid)d killed by signal %(sig)d'), - {'pid': pid, 'sig': sig}) - else: - code = os.WEXITSTATUS(status) - LOG.info(_('Child %(pid)d exited with status %(code)d'), - {'pid': pid, 'code': code}) - - if pid not in self.children: - LOG.warning(_('pid %d not in child list'), pid) - return None - - wrap = self.children.pop(pid) - wrap.children.remove(pid) - if 2 == code: - wrap.failed = True - self.failedwrap = self.failedwrap + 1 - LOG.info(_('_wait_child %d'), self.failedwrap) - if self.failedwrap == self.totalwrap: - self.running = False - return wrap - - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - while self.running: - wrap = self._wait_child() - if not wrap: - # Yield to other threads if no children have exited - # Sleep for a short time to avoid excessive CPU usage - # (see bug #1095346) - eventlet.greenthread.sleep(.01) - continue - - LOG.info(_('wait wrap.failed %s'), wrap.failed) - while (self.running and len(wrap.children) < wrap.workers - and not wrap.failed): - self._start_child(wrap) - - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) - - for pid in self.children: - try: - os.kill(pid, signal.SIGTERM) - except OSError as exc: - if exc.errno != errno.ESRCH: - raise - - # Wait for children to die - if self.children: - LOG.info(_('Waiting on %d children to exit'), len(self.children)) - while self.children: - wrap = self._wait_child() - if not wrap: - eventlet.greenthread.sleep(.01) - continue - - -class Service(object): +class Service(service.Service): """Service object for binaries running on hosts. A service takes a manager and enables rpc by listening to queues based @@ -338,6 +74,7 @@ class Service(object): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, service_name=None, *args, **kwargs): + super(Service, self).__init__() self.host = host self.binary = binary self.topic = topic @@ -349,7 +86,6 @@ class Service(object): self.report_interval = report_interval self.periodic_interval = periodic_interval self.periodic_fuzzy_delay = periodic_fuzzy_delay - super(Service, self).__init__(*args, **kwargs) self.saved_args, self.saved_kwargs = args, kwargs self.timers = [] @@ -476,6 +212,8 @@ class Service(object): pass self.timers = [] + super(Service, self).stop() + def wait(self): for x in self.timers: try: @@ -538,6 +276,13 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0") self.port = getattr(CONF, '%s_listen_port' % name, 0) + self.workers = getattr(CONF, '%s_workers' % name, None) + if self.workers < 1: + LOG.warn(_("Value of config option %(name)s_workers must be " + "integer greater than 1. Input value ignored.") % + {'name': name}) + # Reset workers to default + self.workers = None self.basic_config_check() self.server = wsgi.Server(name, self.app, @@ -612,18 +357,22 @@ class WSGIService(object): self.server.wait() +def process_launcher(): + return service.ProcessLauncher() + + # NOTE(vish): the global launcher is to maintain the existing # functionality of calling service.serve + # service.wait _launcher = None -def serve(*servers): +def serve(server, workers=None): global _launcher - if not _launcher: - _launcher = Launcher() - for server in servers: - _launcher.launch_server(server) + if _launcher: + raise RuntimeError(_('serve() can only be called once')) + + _launcher = service.launch(server, workers=workers) def wait(): diff --git a/cinder/tests/test_service.py b/cinder/tests/test_service.py index 60457217d..2291502c2 100644 --- a/cinder/tests/test_service.py +++ b/cinder/tests/test_service.py @@ -214,18 +214,3 @@ class TestWSGIService(test.TestCase): CONF.set_override('report_interval', 10) service.WSGIService("test_service") self.assertEqual(CONF.service_down_time, 25) - - -class TestLauncher(test.TestCase): - - def setUp(self): - super(TestLauncher, self).setUp() - self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything()) - self.service = service.WSGIService("test_service") - - def test_launch_app(self): - self.assertEqual(0, self.service.port) - launcher = service.Launcher() - launcher.launch_server(self.service) - self.assertEqual(0, self.service.port) - launcher.stop() diff --git a/cinder/wsgi.py b/cinder/wsgi.py index bc41b61ac..6a067aa6e 100644 --- a/cinder/wsgi.py +++ b/cinder/wsgi.py @@ -77,7 +77,7 @@ class Server(object): default_pool_size = 1000 def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + protocol=eventlet.wsgi.HttpProtocol, backlog=128): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. @@ -99,6 +99,13 @@ class Server(object): self._logger = logging.getLogger("eventlet.wsgi.server") self._wsgi_logger = logging.WritableLogger(self._logger) + if backlog < 1: + raise exception.InvalidInput( + reason='The backlog must be more than 1') + self._socket = self._get_socket(self._host, + self._port, + backlog=backlog) + def _get_socket(self, host, port, backlog): bind_addr = (host, port) # TODO(dims): eventlet's green dns/socket module does not actually @@ -197,13 +204,6 @@ class Server(object): :raises: cinder.exception.InvalidInput """ - if backlog < 1: - raise exception.InvalidInput( - reason='The backlog must be more than 1') - - self._socket = self._get_socket(self._host, - self._port, - backlog=backlog) self._server = eventlet.spawn(self._start) (self._host, self._port) = self._socket.getsockname()[0:2] LOG.info(_("Started %(name)s on %(host)s:%(port)s") % @@ -227,7 +227,10 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + if self._server is not None: + # Resize pool to stop new requests from being processed + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -238,7 +241,8 @@ class Server(object): """ try: - self._server.wait() + if self._server is not None: + self._server.wait() except greenlet.GreenletExit: LOG.info(_("WSGI server has stopped.")) diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index ea38194f9..1c2ba2b00 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -76,6 +76,10 @@ # port for os volume api to listen (integer value) #osapi_volume_listen_port=8776 +# Number of workers for OpenStack Volume API service (integer +# value) +#osapi_volume_workers= + # # Options defined in cinder.test -- 2.45.2