]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Enable multi-process for API service
authorZhiteng Huang <zhithuang@ebaysf.com>
Wed, 8 Jan 2014 06:50:29 +0000 (14:50 +0800)
committerZhiteng Huang <zhithuang@ebaysf.com>
Thu, 23 Jan 2014 16:28:55 +0000 (00:28 +0800)
Due to the limit of Python interpreter, API service of Cinder can't
really utilize underlying multi-core architecture even libraries
like eventlet has been used. To make API service much more scalable,
we'd adopt multi-process (worker) mode that has been used for long
in Glance/Swift/Nova.

The default behavior isn't changed with this patch, Cinder API
service will still run in one process (default value of
osapi_volume_workers is None).

Implementation wise, a good portion of cinder/service.py has been
removed because those content has been merged in Oslo version of
service module.  cinder/wsgi.py is also updated to adopt the change
for multiple WSGI servers running in separate processes.

Implement bp: multi-process-api-service

DocImpact: 'New config option osapi_volume_workers is used to specify
number of API service workers (OS processes) to launch for Cinder
API service.  Setting this config option to a proper value (e.g.
osapi_volume_workers = # of CPU cores/threads of the machine) can
greatly improve the total throughput of API service [# of API
requests can be handled per second].'

Also removed out-dated comments in bin/cinder-api due to the fact
that this bug [1] has been fixed in eventlet 0.9.13

[1] https://bitbucket.org/eventlet/eventlet/issue/92/eventletgreen-override-of-oswaitpid

Change-Id: I8361d0dc0d43040e48634ff1aee1324e5e0af466

bin/cinder-all
bin/cinder-api
bin/cinder-backup
bin/cinder-volume
cinder/service.py
cinder/tests/test_service.py
cinder/wsgi.py
etc/cinder/cinder.conf.sample

index 6c3762d0e1ab1fc0f6effaa7d569e6fd754d6616..a3b2fcf7f70d8f82c7036244e7e1257d4f0bb3b5 100755 (executable)
@@ -62,17 +62,17 @@ if __name__ == '__main__':
     LOG = logging.getLogger('cinder.all')
 
     utils.monkey_patch()
-    servers = []
+    launcher = service.process_launcher()
     # cinder-api
     try:
-        servers.append(service.WSGIService('osapi_volume'))
+        server = service.WSGIService('osapi_volume')
+        launcher.launch_service(server, workers=server.workers or 1)
     except (Exception, SystemExit):
         LOG.exception(_('Failed to load osapi_volume'))
 
     for binary in ['cinder-volume', 'cinder-scheduler', 'cinder-backup']:
         try:
-            servers.append(service.Service.create(binary=binary))
+            launcher.launch_service(service.Service.create(binary=binary))
         except (Exception, SystemExit):
             LOG.exception(_('Failed to load %s'), binary)
-    service.serve(*servers)
-    service.wait()
+    launcher.wait()
index 5d4ea2615da5ea1238006ead21b1151518ffee79..953575c9effe57d50d0280a61795bb159b26d77a 100755 (executable)
 
 """Starter script for Cinder OS API."""
 
-# NOTE(jdg): If we port over multi worker code from Nova
-# we'll need to set monkey_patch(os=False), unless
-# eventlet is updated/released to fix the root issue
-
 import eventlet
 eventlet.monkey_patch()
 
@@ -54,6 +50,8 @@ if __name__ == '__main__':
          version=version.version_string())
     logging.setup("cinder")
     utils.monkey_patch()
+
+    launcher = service.process_launcher()
     server = service.WSGIService('osapi_volume')
-    service.serve(server)
-    service.wait()
+    launcher.launch_service(server, workers=server.workers or 1)
+    launcher.wait()
index c1555987c06c767b4add36b518e99fd823a48c6a..2b5a634cca2cfd4e8b16632120dcf78ff7638af2 100755 (executable)
@@ -54,7 +54,6 @@ if __name__ == '__main__':
          version=version.version_string())
     logging.setup("cinder")
     utils.monkey_patch()
-    launcher = service.ProcessLauncher()
     server = service.Service.create(binary='cinder-backup')
-    launcher.launch_server(server)
-    launcher.wait()
+    service.serve(server)
+    service.wait()
index c461c1ef7db9dfabc48ef209707e734ac29785d4..0a04626c3ba2898134de736d6eb7e6b4aa7550ab 100755 (executable)
@@ -58,18 +58,17 @@ if __name__ == '__main__':
          version=version.version_string())
     logging.setup("cinder")
     utils.monkey_patch()
-    if os.name == 'nt':
-        launcher = service
-        launcher.launch_server = service.serve
-    else:
-        launcher = service.ProcessLauncher()
+    # Note(zhiteng): Since Windows (os='nt') has already ignored monkey
+    # patching 'os' module, there is no need to treat it differently
+    # when creating launcher.
+    launcher = service.process_launcher()
     if CONF.enabled_backends:
         for backend in CONF.enabled_backends:
             host = "%s@%s" % (CONF.host, backend)
             server = service.Service.create(host=host,
                                             service_name=backend)
-            launcher.launch_server(server)
+            launcher.launch_service(server)
     else:
         server = service.Service.create(binary='cinder-volume')
-        launcher.launch_server(server)
+        launcher.launch_service(server)
     launcher.wait()
index 5e8b20b9583970f5decaac84c0bd14ee60c3fe1d..1199779d04e9e3c25e8ed4af6258c7077e4575c7 100644 (file)
 """Generic Node base class for all workers that run on hosts."""
 
 
-import errno
 import inspect
 import os
 import random
-import signal
-import sys
-import time
 
-import eventlet
-import greenlet
 from oslo.config import cfg
 
 from cinder import context
@@ -37,6 +31,7 @@ from cinder.openstack.common import importutils
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import loopingcall
 from cinder.openstack.common import rpc
+from cinder.openstack.common import service
 from cinder import version
 from cinder import wsgi
 
@@ -60,274 +55,15 @@ service_opts = [
                help='IP address for OpenStack Volume API to listen'),
     cfg.IntOpt('osapi_volume_listen_port',
                default=8776,
-               help='port for os volume api to listen'), ]
+               help='port for os volume api to listen'),
+    cfg.IntOpt('osapi_volume_workers',
+               help='Number of workers for OpenStack Volume API service'), ]
 
 CONF = cfg.CONF
 CONF.register_opts(service_opts)
 
 
-class SignalExit(SystemExit):
-    def __init__(self, signo, exccode=1):
-        super(SignalExit, self).__init__(exccode)
-        self.signo = signo
-
-
-class Launcher(object):
-    """Launch one or more services and wait for them to complete."""
-
-    def __init__(self):
-        """Initialize the service launcher.
-
-        :returns: None
-
-        """
-        self._services = []
-
-    @staticmethod
-    def run_server(server):
-        """Start and wait for a server to finish.
-
-        :param server: Server to run and wait for.
-        :returns: None
-
-        """
-        server.start()
-        server.wait()
-
-    def launch_server(self, server):
-        """Load and start the given server.
-
-        :param server: The server you would like to start.
-        :returns: None
-
-        """
-        gt = eventlet.spawn(self.run_server, server)
-        self._services.append(gt)
-
-    def stop(self):
-        """Stop all services which are currently running.
-
-        :returns: None
-
-        """
-        for service in self._services:
-            service.kill()
-
-    def wait(self):
-        """Waits until all services have been stopped, and then returns.
-
-        :returns: None
-
-        """
-        def sigterm(sig, frame):
-            LOG.audit(_("SIGTERM received"))
-            # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
-            # shuts down the service. This does not yet handle eventlet
-            # threads.
-            raise KeyboardInterrupt
-
-        signal.signal(signal.SIGTERM, sigterm)
-
-        for service in self._services:
-            try:
-                service.wait()
-            except greenlet.GreenletExit:
-                pass
-
-
-class ServerWrapper(object):
-    def __init__(self, server, workers):
-        self.server = server
-        self.workers = workers
-        self.children = set()
-        self.forktimes = []
-        self.failed = False
-
-
-class ProcessLauncher(object):
-    def __init__(self):
-        self.children = {}
-        self.sigcaught = None
-        self.totalwrap = 0
-        self.failedwrap = 0
-        self.running = True
-        rfd, self.writepipe = os.pipe()
-        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
-
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
-
-    def _handle_signal(self, signo, frame):
-        self.sigcaught = signo
-        self.running = False
-
-        # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
-
-    def _pipe_watcher(self):
-        # This will block until the write end is closed when the parent
-        # dies unexpectedly
-        self.readpipe.read()
-
-        LOG.info(_('Parent process has died unexpectedly, exiting'))
-
-        sys.exit(1)
-
-    def _child_process(self, server):
-        # Setup child signal handlers differently
-        def _sigterm(*args):
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            raise SignalExit(signal.SIGTERM)
-
-        signal.signal(signal.SIGTERM, _sigterm)
-        # Block SIGINT and let the parent send us a SIGTERM
-        # signal.signal(signal.SIGINT, signal.SIG_IGN)
-        # This differs from the behavior in nova in that we don't ignore this
-        # It allows the non-wsgi services to be terminated properly
-        signal.signal(signal.SIGINT, _sigterm)
-
-        # Reopen the eventlet hub to make sure we don't share an epoll
-        # fd with parent and/or siblings, which would be bad
-        eventlet.hubs.use_hub()
-
-        # Close write to ensure only parent has it open
-        os.close(self.writepipe)
-        # Create greenthread to watch for parent to close pipe
-        eventlet.spawn(self._pipe_watcher)
-
-        # Reseed random number generator
-        random.seed()
-
-        launcher = Launcher()
-        launcher.run_server(server)
-
-    def _start_child(self, wrap):
-        if len(wrap.forktimes) > wrap.workers:
-            # Limit ourselves to one process a second (over the period of
-            # number of workers * 1 second). This will allow workers to
-            # start up quickly but ensure we don't fork off children that
-            # die instantly too quickly.
-            if time.time() - wrap.forktimes[0] < wrap.workers:
-                LOG.info(_('Forking too fast, sleeping'))
-                time.sleep(1)
-
-            wrap.forktimes.pop(0)
-
-        wrap.forktimes.append(time.time())
-
-        pid = os.fork()
-        if pid == 0:
-            # NOTE(johannes): All exceptions are caught to ensure this
-            # doesn't fallback into the loop spawning children. It would
-            # be bad for a child to spawn more children.
-            status = 0
-            try:
-                self._child_process(wrap.server)
-            except SignalExit as exc:
-                signame = {signal.SIGTERM: 'SIGTERM',
-                           signal.SIGINT: 'SIGINT'}[exc.signo]
-                LOG.info(_('Caught %s, exiting'), signame)
-                status = exc.code
-            except SystemExit as exc:
-                status = exc.code
-            except BaseException:
-                LOG.exception(_('Unhandled exception'))
-                status = 2
-            finally:
-                wrap.server.stop()
-
-            os._exit(status)
-
-        LOG.info(_('Started child %d'), pid)
-
-        wrap.children.add(pid)
-        self.children[pid] = wrap
-
-        return pid
-
-    def launch_server(self, server, workers=1):
-        wrap = ServerWrapper(server, workers)
-        self.totalwrap = self.totalwrap + 1
-        LOG.info(_('Starting %d workers'), wrap.workers)
-        while (self.running and len(wrap.children) < wrap.workers
-               and not wrap.failed):
-            self._start_child(wrap)
-
-    def _wait_child(self):
-        try:
-            # Don't block if no child processes have exited
-            pid, status = os.waitpid(0, os.WNOHANG)
-            if not pid:
-                return None
-        except OSError as exc:
-            if exc.errno not in (errno.EINTR, errno.ECHILD):
-                raise
-            return None
-
-        code = 0
-        if os.WIFSIGNALED(status):
-            sig = os.WTERMSIG(status)
-            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
-                     {'pid': pid, 'sig': sig})
-        else:
-            code = os.WEXITSTATUS(status)
-            LOG.info(_('Child %(pid)d exited with status %(code)d'),
-                     {'pid': pid, 'code': code})
-
-        if pid not in self.children:
-            LOG.warning(_('pid %d not in child list'), pid)
-            return None
-
-        wrap = self.children.pop(pid)
-        wrap.children.remove(pid)
-        if 2 == code:
-            wrap.failed = True
-            self.failedwrap = self.failedwrap + 1
-            LOG.info(_('_wait_child %d'), self.failedwrap)
-            if self.failedwrap == self.totalwrap:
-                self.running = False
-        return wrap
-
-    def wait(self):
-        """Loop waiting on children to die and respawning as necessary."""
-        while self.running:
-            wrap = self._wait_child()
-            if not wrap:
-                # Yield to other threads if no children have exited
-                # Sleep for a short time to avoid excessive CPU usage
-                # (see bug #1095346)
-                eventlet.greenthread.sleep(.01)
-                continue
-
-            LOG.info(_('wait wrap.failed %s'), wrap.failed)
-            while (self.running and len(wrap.children) < wrap.workers
-                   and not wrap.failed):
-                self._start_child(wrap)
-
-        if self.sigcaught:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
-            LOG.info(_('Caught %s, stopping children'), signame)
-
-        for pid in self.children:
-            try:
-                os.kill(pid, signal.SIGTERM)
-            except OSError as exc:
-                if exc.errno != errno.ESRCH:
-                    raise
-
-        # Wait for children to die
-        if self.children:
-            LOG.info(_('Waiting on %d children to exit'), len(self.children))
-            while self.children:
-                wrap = self._wait_child()
-                if not wrap:
-                    eventlet.greenthread.sleep(.01)
-                    continue
-
-
-class Service(object):
+class Service(service.Service):
     """Service object for binaries running on hosts.
 
     A service takes a manager and enables rpc by listening to queues based
@@ -338,6 +74,7 @@ class Service(object):
     def __init__(self, host, binary, topic, manager, report_interval=None,
                  periodic_interval=None, periodic_fuzzy_delay=None,
                  service_name=None, *args, **kwargs):
+        super(Service, self).__init__()
         self.host = host
         self.binary = binary
         self.topic = topic
@@ -349,7 +86,6 @@ class Service(object):
         self.report_interval = report_interval
         self.periodic_interval = periodic_interval
         self.periodic_fuzzy_delay = periodic_fuzzy_delay
-        super(Service, self).__init__(*args, **kwargs)
         self.saved_args, self.saved_kwargs = args, kwargs
         self.timers = []
 
@@ -476,6 +212,8 @@ class Service(object):
                 pass
         self.timers = []
 
+        super(Service, self).stop()
+
     def wait(self):
         for x in self.timers:
             try:
@@ -538,6 +276,13 @@ class WSGIService(object):
         self.app = self.loader.load_app(name)
         self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
         self.port = getattr(CONF, '%s_listen_port' % name, 0)
+        self.workers = getattr(CONF, '%s_workers' % name, None)
+        if self.workers < 1:
+            LOG.warn(_("Value of config option %(name)s_workers must be "
+                       "integer greater than 1.  Input value ignored.") %
+                     {'name': name})
+            # Reset workers to default
+            self.workers = None
         self.basic_config_check()
         self.server = wsgi.Server(name,
                                   self.app,
@@ -612,18 +357,22 @@ class WSGIService(object):
         self.server.wait()
 
 
+def process_launcher():
+    return service.ProcessLauncher()
+
+
 # NOTE(vish): the global launcher is to maintain the existing
 #             functionality of calling service.serve +
 #             service.wait
 _launcher = None
 
 
-def serve(*servers):
+def serve(server, workers=None):
     global _launcher
-    if not _launcher:
-        _launcher = Launcher()
-    for server in servers:
-        _launcher.launch_server(server)
+    if _launcher:
+        raise RuntimeError(_('serve() can only be called once'))
+
+    _launcher = service.launch(server, workers=workers)
 
 
 def wait():
index 60457217d47c562b83448a17ff4bfbe20f78ceb5..2291502c207cdb8447a576460e2873e2e9558740 100644 (file)
@@ -214,18 +214,3 @@ class TestWSGIService(test.TestCase):
         CONF.set_override('report_interval', 10)
         service.WSGIService("test_service")
         self.assertEqual(CONF.service_down_time, 25)
-
-
-class TestLauncher(test.TestCase):
-
-    def setUp(self):
-        super(TestLauncher, self).setUp()
-        self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything())
-        self.service = service.WSGIService("test_service")
-
-    def test_launch_app(self):
-        self.assertEqual(0, self.service.port)
-        launcher = service.Launcher()
-        launcher.launch_server(self.service)
-        self.assertEqual(0, self.service.port)
-        launcher.stop()
index bc41b61acd84ad073a5c1efb8c28674cbd1fa63a..6a067aa6e58598de494bf50334a2b2b262c68ebd 100644 (file)
@@ -77,7 +77,7 @@ class Server(object):
     default_pool_size = 1000
 
     def __init__(self, name, app, host=None, port=None, pool_size=None,
-                 protocol=eventlet.wsgi.HttpProtocol):
+                 protocol=eventlet.wsgi.HttpProtocol, backlog=128):
         """Initialize, but do not start, a WSGI server.
 
         :param name: Pretty name for logging.
@@ -99,6 +99,13 @@ class Server(object):
         self._logger = logging.getLogger("eventlet.wsgi.server")
         self._wsgi_logger = logging.WritableLogger(self._logger)
 
+        if backlog < 1:
+            raise exception.InvalidInput(
+                reason='The backlog must be more than 1')
+        self._socket = self._get_socket(self._host,
+                                        self._port,
+                                        backlog=backlog)
+
     def _get_socket(self, host, port, backlog):
         bind_addr = (host, port)
         # TODO(dims): eventlet's green dns/socket module does not actually
@@ -197,13 +204,6 @@ class Server(object):
         :raises: cinder.exception.InvalidInput
 
         """
-        if backlog < 1:
-            raise exception.InvalidInput(
-                reason='The backlog must be more than 1')
-
-        self._socket = self._get_socket(self._host,
-                                        self._port,
-                                        backlog=backlog)
         self._server = eventlet.spawn(self._start)
         (self._host, self._port) = self._socket.getsockname()[0:2]
         LOG.info(_("Started %(name)s on %(host)s:%(port)s") %
@@ -227,7 +227,10 @@ class Server(object):
 
         """
         LOG.info(_("Stopping WSGI server."))
-        self._server.kill()
+        if self._server is not None:
+            # Resize pool to stop new requests from being processed
+            self._pool.resize(0)
+            self._server.kill()
 
     def wait(self):
         """Block, until the server has stopped.
@@ -238,7 +241,8 @@ class Server(object):
 
         """
         try:
-            self._server.wait()
+            if self._server is not None:
+                self._server.wait()
         except greenlet.GreenletExit:
             LOG.info(_("WSGI server has stopped."))
 
index ea38194f9bec1d6f66cfe7678ec0bbc89fde8021..1c2ba2b004951720c50c9a264d44c090a0a97f68 100644 (file)
 # port for os volume api to listen (integer value)
 #osapi_volume_listen_port=8776
 
+# Number of workers for OpenStack Volume API service (integer
+# value)
+#osapi_volume_workers=<None>
+
 
 #
 # Options defined in cinder.test