From 9c2029a8019e04634acf26ee780b59f78e19331f Mon Sep 17 00:00:00 2001 From: Zhiteng Huang Date: Thu, 21 Nov 2013 17:02:23 +0800 Subject: [PATCH] Pull latest service module from Oslo Get latest service module from Oslo to prepare for multi-process API service implementation. Below are the commits included in this pull. Changes being pulled into in service module are: * e7bc8c9 2013-11-20 | Merge "os._exit in _start_child may cause unexpected exception" * 96a2d4e 2013-11-07 | os._exit in _start_child may cause unexpected exception * 1771a77 2013-11-05 | Adjust import order according to PEP8 imports rule * 3110c0f 2013-10-17 | Use multiprocessing.Event to ensure services have started * b5fba9e 2013-09-18 | Move comment in service.py to correct location * 11cc74f 2013-08-26 | Fixes issue with SUGHUP in services on Windows * 825ace5 2013-06-17 | Add service restart function in oslo-incubator * c935d1c 2013-07-16 | Merge "Allow launchers to be stopped multiple times" * dc8aa79 2013-07-08 | Allow launchers to be stopped multiple times * 1a2df89 2013-06-25 | Enable H302 hacking check * 52e857a 2013-06-19 | Ignore any exceptions from rpc.cleanup(). * 5518ad3 2013-05-16 | Add graceful service shutdown support to Launcher And these dependent modules - cinder/openstack/common/eventlet_backdoor.py * 1dcc747 2013-07-15 | Fix stylistic problems with help text * 1a2df89 2013-06-25 | Enable H302 hacking check * c7c55b2 2013-06-20 | Improve usability when backdoor_port is nonzero - cinder/openstack/common/gettextutils.py * 3970d46 2013-11-02 | Fix typos in oslo * 88db9c8 2013-10-03 | When translating if no locale is given use default locale - cinder/openstack/common/jsonutils.py * 3d7504b 2013-09-23 | Ensure that Message objects will be sent via RPC in unicode format * 1807d32 2013-08-22 | jsonutils: make types py3 compatible * bdef862 2013-08-22 | jsonutils: do not require xmlrpclib * ded9bd6 2013-08-04 | Make dependency on netaddr optional * 7b7566b 2013-06-25 | Add netaddr.IPAddress support to to_primitive() - cinder/openstack/common/local.py * cb2a2b6 2013-06-28 | Modify local.py to not be dependent on Eventlet * 547ab34 2013-03-11 | Fix Copyright Headers - Rename LLC to Foundation - cinder/openstack/common/log.py * a82e889 2013-11-14 | Merge "Do not name variables as builtins" * 2251cb5 2013-11-13 | Do not name variables as builtins * 25c5854 2013-11-13 | Adds admin_password as key to be sanitized when logging * cbfded9 2013-11-11 | Default iso8601 logging to WARN * 76b0cd1 2013-11-04 | Add mask password impl from other projects - cinder/openstack/common/loopingcall.py * 1a2df89 2013-06-25 | Enable H302 hacking check - cinder/openstack/common/threadgroup.py * 9d3c34b 2013-10-25 | Add a link method to Thread * 1a2df89 2013-06-25 | Enable H302 hacking check - cinder/openstack/common/timeutils.py * f3b5f17 2013-11-12 | Add helper method total_seconds in timeutils.py * 53ebd30 2013-10-18 | python3: use six.text_types for unicode() * 3bc6f79 2013-09-19 | Fix timeutils.set_override_time not defaulting to current wall time * af76064 2013-08-29 | Optimize timeutils.utcnow_ts() * df3f2ba 2013-07-26 | BaseException.message is deprecated since Python 2.6 * d28fa69 2013-06-27 | python3: Add python3 compatibility. Partial bp: multi-process-api-service Change-Id: Ifd25eae9eb2d6ae53bcf1665c3d5b7db4144433c --- cinder/openstack/common/eventlet_backdoor.py | 63 ++++- cinder/openstack/common/gettextutils.py | 22 +- cinder/openstack/common/jsonutils.py | 19 +- cinder/openstack/common/local.py | 13 +- cinder/openstack/common/log.py | 57 ++++- cinder/openstack/common/loopingcall.py | 2 +- cinder/openstack/common/service.py | 256 ++++++++++++++----- cinder/openstack/common/threadgroup.py | 10 +- cinder/openstack/common/timeutils.py | 31 ++- etc/cinder/cinder.conf.sample | 11 +- openstack-common.conf | 1 + 11 files changed, 386 insertions(+), 99 deletions(-) diff --git a/cinder/openstack/common/eventlet_backdoor.py b/cinder/openstack/common/eventlet_backdoor.py index 57b89ae91..d530cda37 100644 --- a/cinder/openstack/common/eventlet_backdoor.py +++ b/cinder/openstack/common/eventlet_backdoor.py @@ -18,8 +18,11 @@ from __future__ import print_function +import errno import gc +import os import pprint +import socket import sys import traceback @@ -28,14 +31,34 @@ import eventlet.backdoor import greenlet from oslo.config import cfg +from cinder.openstack.common.gettextutils import _ # noqa +from cinder.openstack.common import log as logging + +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ - cfg.IntOpt('backdoor_port', + cfg.StrOpt('backdoor_port', default=None, - help='port for eventlet backdoor to listen') + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] CONF = cfg.CONF CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range def _dont_use_this(): @@ -60,6 +83,33 @@ def _print_nativethreads(): print() +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or + try_port >= end_port): + raise + try_port += 1 + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process @@ -72,6 +122,8 @@ def initialize_if_enabled(): if CONF.backdoor_port is None: return None + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + # NOTE(johannes): The standard sys.displayhook will print the value of # the last expression and set it to __builtin__._, which overwrites # the __builtin__._ that gettext sets. Let's switch to using pprint @@ -82,8 +134,13 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - sock = eventlet.listen(('localhost', CONF.backdoor_port)) + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. port = sock.getsockname()[1] + LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()}) eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) return port diff --git a/cinder/openstack/common/gettextutils.py b/cinder/openstack/common/gettextutils.py index 161032252..caa6a0925 100644 --- a/cinder/openstack/common/gettextutils.py +++ b/cinder/openstack/common/gettextutils.py @@ -317,7 +317,7 @@ def get_available_languages(domain): # NOTE(luisg): Babel <1.0 used a function called list(), which was # renamed to locale_identifiers() in >=1.0, the requirements master list # requires >=0.9.6, uncapped, so defensively work with both. We can remove - # this check when the master list updates to >=1.0, and all projects udpate + # this check when the master list updates to >=1.0, and update all projects list_identifiers = (getattr(localedata, 'list', None) or getattr(localedata, 'locale_identifiers')) locale_identifiers = list_identifiers() @@ -329,13 +329,21 @@ def get_available_languages(domain): def get_localized_message(message, user_locale): - """Gets a localized version of the given message in the given locale.""" + """Gets a localized version of the given message in the given locale. + + If the message is not a Message object the message is returned as-is. + If the locale is None the message is translated to the default locale. + + :returns: the translated message in unicode, or the original message if + it could not be translated + """ + translated = message if isinstance(message, Message): - if user_locale: - message.locale = user_locale - return six.text_type(message) - else: - return message + original_locale = message.locale + message.locale = user_locale + translated = six.text_type(message) + message.locale = original_locale + return translated class LocaleHandler(logging.Handler): diff --git a/cinder/openstack/common/jsonutils.py b/cinder/openstack/common/jsonutils.py index b1fa1afa3..b06d855bf 100644 --- a/cinder/openstack/common/jsonutils.py +++ b/cinder/openstack/common/jsonutils.py @@ -38,13 +38,19 @@ import functools import inspect import itertools import json -import types -import xmlrpclib +try: + import xmlrpclib +except ImportError: + # NOTE(jd): xmlrpclib is not shipped with Python 3 + xmlrpclib = None import six +from cinder.openstack.common import gettextutils +from cinder.openstack.common import importutils from cinder.openstack.common import timeutils +netaddr = importutils.try_import("netaddr") _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.isfunction, inspect.isgeneratorfunction, @@ -52,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.iscode, inspect.isbuiltin, inspect.isroutine, inspect.isabstract] -_simple_types = (types.NoneType, int, basestring, bool, float, long) +_simple_types = (six.string_types + six.integer_types + + (type(None), bool, float)) def to_primitive(value, convert_instances=False, convert_datetime=True, @@ -124,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled - if isinstance(value, xmlrpclib.DateTime): + if xmlrpclib and isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) + elif isinstance(value, gettextutils.Message): + return value.data elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): @@ -137,6 +146,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Likely an instance of something. Watch for cycles. # Ignore class member vars. return recursive(value.__dict__, level=level + 1) + elif netaddr and isinstance(value, netaddr.IPAddress): + return six.text_type(value) else: if any(test(value) for test in _nasty_type_tests): return six.text_type(value) diff --git a/cinder/openstack/common/local.py b/cinder/openstack/common/local.py index f1bfc824b..e82f17d0f 100644 --- a/cinder/openstack/common/local.py +++ b/cinder/openstack/common/local.py @@ -15,16 +15,15 @@ # License for the specific language governing permissions and limitations # under the License. -"""Greenthread local storage of variables using weak references""" +"""Local storage of variables using weak references""" +import threading import weakref -from eventlet import corolocal - -class WeakLocal(corolocal.local): +class WeakLocal(threading.local): def __getattribute__(self, attr): - rval = corolocal.local.__getattribute__(self, attr) + rval = super(WeakLocal, self).__getattribute__(attr) if rval: # NOTE(mikal): this bit is confusing. What is stored is a weak # reference, not the value itself. We therefore need to lookup @@ -34,7 +33,7 @@ class WeakLocal(corolocal.local): def __setattr__(self, attr, value): value = weakref.ref(value) - return corolocal.local.__setattr__(self, attr, value) + return super(WeakLocal, self).__setattr__(attr, value) # NOTE(mikal): the name "store" should be deprecated in the future @@ -45,4 +44,4 @@ store = WeakLocal() # "strong" store will hold a reference to the object so that it never falls out # of scope. weak_store = WeakLocal() -strong_store = corolocal.local +strong_store = threading.local() diff --git a/cinder/openstack/common/log.py b/cinder/openstack/common/log.py index 510c405a9..d844880cd 100644 --- a/cinder/openstack/common/log.py +++ b/cinder/openstack/common/log.py @@ -35,6 +35,7 @@ import logging import logging.config import logging.handlers import os +import re import sys import traceback @@ -50,6 +51,24 @@ from cinder.openstack.common import local _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" +_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password'] + +# NOTE(ldbragst): Let's build a list of regex objects using the list of +# _SANITIZE_KEYS we already have. This way, we only have to add the new key +# to the list of _SANITIZE_KEYS and we can generate regular expressions +# for XML and JSON automatically. +_SANITIZE_PATTERNS = [] +_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', + r'(<%(key)s>).*?()', + r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', + r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] + +for key in _SANITIZE_KEYS: + for pattern in _FORMAT_PATTERNS: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS.append(reg_ex) + + common_cli_opts = [ cfg.BoolOpt('debug', short='d', @@ -136,6 +155,7 @@ log_opts = [ 'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO', + 'iso8601=WARN', ], help='list of logger=LEVEL pairs'), cfg.BoolOpt('publish_errors', @@ -214,6 +234,39 @@ def _get_log_file_path(binary=None): return None +def mask_password(message, secret="***"): + """Replace password with 'secret' in message. + + :param message: The string which includes security information. + :param secret: value with which to replace passwords, defaults to "***". + :returns: The unicode value of message with the password fields masked. + + For example: + >>> mask_password("'adminPass' : 'aaaaa'") + "'adminPass' : '***'" + >>> mask_password("'admin_pass' : 'aaaaa'") + "'admin_pass' : '***'" + >>> mask_password('"password" : "aaaaa"') + '"password" : "***"' + >>> mask_password("'original_password' : 'aaaaa'") + "'original_password' : '***'" + >>> mask_password("u'original_password' : u'aaaaa'") + "u'original_password' : u'***'" + """ + message = six.text_type(message) + + # NOTE(ldbragst): Check to see if anything in message contains any key + # specified in _SANITIZE_KEYS, if not then just return the message since + # we don't have to mask any passwords. + if not any(key in message for key in _SANITIZE_KEYS): + return message + + secret = r'\g<1>' + secret + r'\g<2>' + for pattern in _SANITIZE_PATTERNS: + message = re.sub(pattern, secret, message) + return message + + class BaseLoggerAdapter(logging.LoggerAdapter): def audit(self, msg, *args, **kwargs): @@ -336,10 +389,10 @@ class JSONFormatter(logging.Formatter): def _create_logging_excepthook(product_name): - def logging_excepthook(type, value, tb): + def logging_excepthook(exc_type, value, tb): extra = {} if CONF.verbose: - extra['exc_info'] = (type, value, tb) + extra['exc_info'] = (exc_type, value, tb) getLogger(product_name).critical(str(value), **extra) return logging_excepthook diff --git a/cinder/openstack/common/loopingcall.py b/cinder/openstack/common/loopingcall.py index f62792d5c..9d5a057ec 100644 --- a/cinder/openstack/common/loopingcall.py +++ b/cinder/openstack/common/loopingcall.py @@ -22,7 +22,7 @@ import sys from eventlet import event from eventlet import greenthread -from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common.gettextutils import _ # noqa from cinder.openstack.common import log as logging from cinder.openstack.common import timeutils diff --git a/cinder/openstack/common/service.py b/cinder/openstack/common/service.py index 7cbd3690a..8016ba270 100644 --- a/cinder/openstack/common/service.py +++ b/cinder/openstack/common/service.py @@ -20,6 +20,7 @@ """Generic Node base class for all workers that run on hosts.""" import errno +import logging as std_logging import os import random import signal @@ -27,11 +28,11 @@ import sys import time import eventlet -import logging as std_logging +from eventlet import event from oslo.config import cfg from cinder.openstack.common import eventlet_backdoor -from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common.gettextutils import _ # noqa from cinder.openstack.common import importutils from cinder.openstack.common import log as logging from cinder.openstack.common import threadgroup @@ -42,6 +43,29 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def _sighup_supported(): + return hasattr(signal, 'SIGHUP') + + +def _is_sighup(signo): + return _sighup_supported() and signo == signal.SIGHUP + + +def _signo_to_signame(signo): + signals = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'} + if _sighup_supported(): + signals[signal.SIGHUP] = 'SIGHUP' + return signals[signo] + + +def _set_signals_handler(handler): + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + if _sighup_supported(): + signal.signal(signal.SIGHUP, handler) + + class Launcher(object): """Launch one or more services and wait for them to complete.""" @@ -51,20 +75,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +86,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +94,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +102,16 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() class SignalExit(SystemExit): @@ -101,33 +123,48 @@ class SignalExit(SystemExit): class ServiceLauncher(Launcher): def _handle_signal(self, signo, frame): # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - + _set_signals_handler(signal.SIG_DFL) raise SignalExit(signo) - def wait(self): - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _wait_for_exit_or_signal(self, ready_callback=None): + status = None + signo = 0 LOG.debug(_('Full set of CONF:')) CONF.log_opt_values(LOG, std_logging.DEBUG) - status = None try: + if ready_callback: + ready_callback() super(ServiceLauncher, self).wait() except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] + signame = _signo_to_signame(exc.signo) LOG.info(_('Caught %s, exiting'), signame) status = exc.code + signo = exc.signo except SystemExit as exc: status = exc.code finally: - if rpc: - rpc.cleanup() self.stop() - return status + if rpc: + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_('Exception during rpc cleanup.')) + + return status, signo + + def wait(self, ready_callback=None): + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal(ready_callback) + if not _is_sighup(signo): + return status + self.restart() class ServiceWrapper(object): @@ -145,17 +182,17 @@ class ProcessLauncher(object): self.running = True rfd, self.writepipe = os.pipe() self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) + def handle_signal(self): + _set_signals_handler(self._handle_signal) def _handle_signal(self, signo, frame): self.sigcaught = signo self.running = False # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) + _set_signals_handler(signal.SIG_DFL) def _pipe_watcher(self): # This will block until the write end is closed when the parent @@ -166,16 +203,49 @@ class ProcessLauncher(object): sys.exit(1) - def _child_process(self, service): + def _child_process_handle_signal(self): # Setup child signal handlers differently def _sigterm(*args): signal.signal(signal.SIGTERM, signal.SIG_DFL) raise SignalExit(signal.SIGTERM) + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + signal.signal(signal.SIGTERM, _sigterm) + if _sighup_supported(): + signal.signal(signal.SIGHUP, _sighup) # Block SIGINT and let the parent send us a SIGTERM signal.signal(signal.SIGINT, signal.SIG_IGN) + def _child_wait_for_exit_or_signal(self, launcher): + status = 0 + signo = 0 + + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + try: + launcher.wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + # Reopen the eventlet hub to make sure we don't share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() @@ -189,7 +259,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + return launcher def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -207,24 +278,13 @@ class ProcessLauncher(object): pid = os.fork() if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.service) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.service.stop() + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if not _is_sighup(signo): + break + launcher.restart() os._exit(status) @@ -270,12 +330,7 @@ class ProcessLauncher(object): wrap.children.remove(pid) return wrap - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - LOG.debug(_('Full set of CONF:')) - CONF.log_opt_values(LOG, std_logging.DEBUG) - + def _respawn_children(self): while self.running: wrap = self._wait_child() if not wrap: @@ -284,14 +339,28 @@ class ProcessLauncher(object): # (see bug #1095346) eventlet.greenthread.sleep(.01) continue - while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap) - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + while True: + self.handle_signal() + self._respawn_children() + if self.sigcaught: + signame = _signo_to_signame(self.sigcaught) + LOG.info(_('Caught %s, stopping children'), signame) + if not _is_sighup(self.sigcaught): + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None for pid in self.children: try: @@ -313,15 +382,74 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/cinder/openstack/common/threadgroup.py b/cinder/openstack/common/threadgroup.py index 7e8041603..36e05e9cc 100644 --- a/cinder/openstack/common/threadgroup.py +++ b/cinder/openstack/common/threadgroup.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenlet +import eventlet from eventlet import greenpool from eventlet import greenthread @@ -48,6 +48,9 @@ class Thread(object): def wait(self): return self.thread.wait() + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + class ThreadGroup(object): """The point of the ThreadGroup classis to: @@ -79,6 +82,7 @@ class ThreadGroup(object): gt = self.pool.spawn(callback, *args, **kwargs) th = Thread(gt, self) self.threads.append(th) + return th def thread_done(self, thread): self.threads.remove(thread) @@ -105,7 +109,7 @@ class ThreadGroup(object): for x in self.timers: try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) @@ -115,7 +119,7 @@ class ThreadGroup(object): continue try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) diff --git a/cinder/openstack/common/timeutils.py b/cinder/openstack/common/timeutils.py index ac2441bcb..db471ea51 100644 --- a/cinder/openstack/common/timeutils.py +++ b/cinder/openstack/common/timeutils.py @@ -21,8 +21,10 @@ Time related utilities and helper functions. import calendar import datetime +import time import iso8601 +import six # ISO 8601 extended time format with microseconds @@ -48,9 +50,9 @@ def parse_isotime(timestr): try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: - raise ValueError(e.message) + raise ValueError(six.text_type(e)) except TypeError as e: - raise ValueError(e.message) + raise ValueError(six.text_type(e)) def strtime(at=None, fmt=PERFECT_TIME_FORMAT): @@ -75,20 +77,25 @@ def normalize_time(timestamp): def is_older_than(before, seconds): """Return True if before is older than seconds.""" - if isinstance(before, basestring): + if isinstance(before, six.string_types): before = parse_strtime(before).replace(tzinfo=None) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" - if isinstance(after, basestring): + if isinstance(after, six.string_types): after = parse_strtime(after).replace(tzinfo=None) return after - utcnow() > datetime.timedelta(seconds=seconds) def utcnow_ts(): """Timestamp version of our utcnow function.""" + if utcnow.override_time is None: + # NOTE(kgriffs): This is several times faster + # than going through calendar.timegm(...) + return int(time.time()) + return calendar.timegm(utcnow().timetuple()) @@ -110,12 +117,15 @@ def iso8601_from_timestamp(timestamp): utcnow.override_time = None -def set_time_override(override_time=datetime.datetime.utcnow()): +def set_time_override(override_time=None): """Overrides utils.utcnow. Make it return a constant time or a list thereof, one at a time. + + :param override_time: datetime instance or list thereof. If not + given, defaults to the current UTC time. """ - utcnow.override_time = override_time + utcnow.override_time = override_time or datetime.datetime.utcnow() def advance_time_delta(timedelta): @@ -168,6 +178,15 @@ def delta_seconds(before, after): datetime objects (as a float, to microsecond resolution). """ delta = after - before + return total_seconds(delta) + + +def total_seconds(delta): + """Return the total seconds of datetime.timedelta object. + + Compute total seconds of datetime.timedelta, datetime.timedelta + doesn't have method total_seconds in Python2.6, calculate it manually. + """ try: return delta.total_seconds() except AttributeError: diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 9d54af9e8..2b67d8456 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -573,7 +573,14 @@ # Options defined in cinder.openstack.common.eventlet_backdoor # -# port for eventlet backdoor to listen (integer value) +# Enable eventlet backdoor. Acceptable values are 0, , +# and :, where 0 results in listening on a random +# tcp port number; results in listening on the +# specified port number (and not enabling backdoor if that +# port is in use); and : results in listening on +# the smallest unused port number within the specified range +# of port numbers. The chosen port is displayed in the +# service's log file. (string value) #backdoor_port= @@ -621,7 +628,7 @@ #logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s # list of logger=LEVEL pairs (list value) -#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO +#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN # publish error events (boolean value) #publish_errors=false diff --git a/openstack-common.conf b/openstack-common.conf index 50294c594..da7ed46c6 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -25,6 +25,7 @@ module=rpc module=scheduler module=scheduler.filters module=scheduler.weights +module=service module=strutils module=timeutils module=uuidutils -- 2.45.2