"""Generic Node base class for all workers that run on hosts."""
-import errno
import inspect
import os
import random
-import signal
-import sys
-import time
-import eventlet
-import greenlet
from oslo.config import cfg
from cinder import context
from cinder.openstack.common import log as logging
from cinder.openstack.common import loopingcall
from cinder.openstack.common import rpc
+from cinder.openstack.common import service
from cinder import version
from cinder import wsgi
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
- help='port for os volume api to listen'), ]
+ help='port for os volume api to listen'),
+ cfg.IntOpt('osapi_volume_workers',
+ help='Number of workers for OpenStack Volume API service'), ]
CONF = cfg.CONF
CONF.register_opts(service_opts)
-class SignalExit(SystemExit):
- def __init__(self, signo, exccode=1):
- super(SignalExit, self).__init__(exccode)
- self.signo = signo
-
-
-class Launcher(object):
- """Launch one or more services and wait for them to complete."""
-
- def __init__(self):
- """Initialize the service launcher.
-
- :returns: None
-
- """
- self._services = []
-
- @staticmethod
- def run_server(server):
- """Start and wait for a server to finish.
-
- :param server: Server to run and wait for.
- :returns: None
-
- """
- server.start()
- server.wait()
-
- def launch_server(self, server):
- """Load and start the given server.
-
- :param server: The server you would like to start.
- :returns: None
-
- """
- gt = eventlet.spawn(self.run_server, server)
- self._services.append(gt)
-
- def stop(self):
- """Stop all services which are currently running.
-
- :returns: None
-
- """
- for service in self._services:
- service.kill()
-
- def wait(self):
- """Waits until all services have been stopped, and then returns.
-
- :returns: None
-
- """
- def sigterm(sig, frame):
- LOG.audit(_("SIGTERM received"))
- # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
- # shuts down the service. This does not yet handle eventlet
- # threads.
- raise KeyboardInterrupt
-
- signal.signal(signal.SIGTERM, sigterm)
-
- for service in self._services:
- try:
- service.wait()
- except greenlet.GreenletExit:
- pass
-
-
-class ServerWrapper(object):
- def __init__(self, server, workers):
- self.server = server
- self.workers = workers
- self.children = set()
- self.forktimes = []
- self.failed = False
-
-
-class ProcessLauncher(object):
- def __init__(self):
- self.children = {}
- self.sigcaught = None
- self.totalwrap = 0
- self.failedwrap = 0
- self.running = True
- rfd, self.writepipe = os.pipe()
- self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
-
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
-
- def _handle_signal(self, signo, frame):
- self.sigcaught = signo
- self.running = False
-
- # Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
- def _pipe_watcher(self):
- # This will block until the write end is closed when the parent
- # dies unexpectedly
- self.readpipe.read()
-
- LOG.info(_('Parent process has died unexpectedly, exiting'))
-
- sys.exit(1)
-
- def _child_process(self, server):
- # Setup child signal handlers differently
- def _sigterm(*args):
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- raise SignalExit(signal.SIGTERM)
-
- signal.signal(signal.SIGTERM, _sigterm)
- # Block SIGINT and let the parent send us a SIGTERM
- # signal.signal(signal.SIGINT, signal.SIG_IGN)
- # This differs from the behavior in nova in that we don't ignore this
- # It allows the non-wsgi services to be terminated properly
- signal.signal(signal.SIGINT, _sigterm)
-
- # Reopen the eventlet hub to make sure we don't share an epoll
- # fd with parent and/or siblings, which would be bad
- eventlet.hubs.use_hub()
-
- # Close write to ensure only parent has it open
- os.close(self.writepipe)
- # Create greenthread to watch for parent to close pipe
- eventlet.spawn(self._pipe_watcher)
-
- # Reseed random number generator
- random.seed()
-
- launcher = Launcher()
- launcher.run_server(server)
-
- def _start_child(self, wrap):
- if len(wrap.forktimes) > wrap.workers:
- # Limit ourselves to one process a second (over the period of
- # number of workers * 1 second). This will allow workers to
- # start up quickly but ensure we don't fork off children that
- # die instantly too quickly.
- if time.time() - wrap.forktimes[0] < wrap.workers:
- LOG.info(_('Forking too fast, sleeping'))
- time.sleep(1)
-
- wrap.forktimes.pop(0)
-
- wrap.forktimes.append(time.time())
-
- pid = os.fork()
- if pid == 0:
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.server)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.server.stop()
-
- os._exit(status)
-
- LOG.info(_('Started child %d'), pid)
-
- wrap.children.add(pid)
- self.children[pid] = wrap
-
- return pid
-
- def launch_server(self, server, workers=1):
- wrap = ServerWrapper(server, workers)
- self.totalwrap = self.totalwrap + 1
- LOG.info(_('Starting %d workers'), wrap.workers)
- while (self.running and len(wrap.children) < wrap.workers
- and not wrap.failed):
- self._start_child(wrap)
-
- def _wait_child(self):
- try:
- # Don't block if no child processes have exited
- pid, status = os.waitpid(0, os.WNOHANG)
- if not pid:
- return None
- except OSError as exc:
- if exc.errno not in (errno.EINTR, errno.ECHILD):
- raise
- return None
-
- code = 0
- if os.WIFSIGNALED(status):
- sig = os.WTERMSIG(status)
- LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
- {'pid': pid, 'sig': sig})
- else:
- code = os.WEXITSTATUS(status)
- LOG.info(_('Child %(pid)d exited with status %(code)d'),
- {'pid': pid, 'code': code})
-
- if pid not in self.children:
- LOG.warning(_('pid %d not in child list'), pid)
- return None
-
- wrap = self.children.pop(pid)
- wrap.children.remove(pid)
- if 2 == code:
- wrap.failed = True
- self.failedwrap = self.failedwrap + 1
- LOG.info(_('_wait_child %d'), self.failedwrap)
- if self.failedwrap == self.totalwrap:
- self.running = False
- return wrap
-
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
- while self.running:
- wrap = self._wait_child()
- if not wrap:
- # Yield to other threads if no children have exited
- # Sleep for a short time to avoid excessive CPU usage
- # (see bug #1095346)
- eventlet.greenthread.sleep(.01)
- continue
-
- LOG.info(_('wait wrap.failed %s'), wrap.failed)
- while (self.running and len(wrap.children) < wrap.workers
- and not wrap.failed):
- self._start_child(wrap)
-
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
-
- for pid in self.children:
- try:
- os.kill(pid, signal.SIGTERM)
- except OSError as exc:
- if exc.errno != errno.ESRCH:
- raise
-
- # Wait for children to die
- if self.children:
- LOG.info(_('Waiting on %d children to exit'), len(self.children))
- while self.children:
- wrap = self._wait_child()
- if not wrap:
- eventlet.greenthread.sleep(.01)
- continue
-
-
-class Service(object):
+class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
+ super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
- super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
pass
self.timers = []
+ super(Service, self).stop()
+
def wait(self):
for x in self.timers:
try:
self.app = self.loader.load_app(name)
self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
self.port = getattr(CONF, '%s_listen_port' % name, 0)
+ self.workers = getattr(CONF, '%s_workers' % name, None)
+ if self.workers < 1:
+ LOG.warn(_("Value of config option %(name)s_workers must be "
+ "integer greater than 1. Input value ignored.") %
+ {'name': name})
+ # Reset workers to default
+ self.workers = None
self.basic_config_check()
self.server = wsgi.Server(name,
self.app,
self.server.wait()
+def process_launcher():
+ return service.ProcessLauncher()
+
+
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
# service.wait
_launcher = None
-def serve(*servers):
+def serve(server, workers=None):
global _launcher
- if not _launcher:
- _launcher = Launcher()
- for server in servers:
- _launcher.launch_server(server)
+ if _launcher:
+ raise RuntimeError(_('serve() can only be called once'))
+
+ _launcher = service.launch(server, workers=workers)
def wait():