]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Pull latest service module from Oslo
authorZhiteng Huang <zhithuang@ebaysf.com>
Thu, 21 Nov 2013 09:02:23 +0000 (17:02 +0800)
committerZhiteng Huang <zhithuang@ebaysf.com>
Fri, 22 Nov 2013 16:38:17 +0000 (16:38 +0000)
Get latest service module from Oslo to prepare for multi-process API service implementation.
Below are the commits included in this pull.

Changes being pulled into in service module are:
e7bc8c9 2013-11-20 | Merge "os._exit in _start_child may cause unexpected exception"
96a2d4e 2013-11-07 | os._exit in _start_child may cause unexpected exception
1771a77 2013-11-05 | Adjust import order according to PEP8 imports rule
3110c0f 2013-10-17 | Use multiprocessing.Event to ensure services have started
b5fba9e 2013-09-18 | Move comment in service.py to correct location
11cc74f 2013-08-26 | Fixes issue with SUGHUP in services on Windows
825ace5 2013-06-17 | Add service restart function in oslo-incubator
c935d1c 2013-07-16 | Merge "Allow launchers to be stopped multiple times"
dc8aa79 2013-07-08 | Allow launchers to be stopped multiple times
1a2df89 2013-06-25 | Enable H302 hacking check
52e857a 2013-06-19 | Ignore any exceptions from rpc.cleanup().
5518ad3 2013-05-16 | Add graceful service shutdown support to Launcher

And these dependent modules
 - cinder/openstack/common/eventlet_backdoor.py
    * 1dcc747 2013-07-15 | Fix stylistic problems with help text
    * 1a2df89 2013-06-25 | Enable H302 hacking check
    * c7c55b2 2013-06-20 | Improve usability when backdoor_port is nonzero
 - cinder/openstack/common/gettextutils.py
    * 3970d46 2013-11-02 | Fix typos in oslo
    * 88db9c8 2013-10-03 | When translating if no locale is given use default locale
 - cinder/openstack/common/jsonutils.py
    * 3d7504b 2013-09-23 | Ensure that Message objects will be sent via RPC in unicode format
    * 1807d32 2013-08-22 | jsonutils: make types py3 compatible
    * bdef862 2013-08-22 | jsonutils: do not require xmlrpclib
    * ded9bd6 2013-08-04 | Make dependency on netaddr optional
    * 7b7566b 2013-06-25 | Add netaddr.IPAddress support to to_primitive()
 - cinder/openstack/common/local.py
    * cb2a2b6 2013-06-28 | Modify local.py to not be dependent on Eventlet
    * 547ab34 2013-03-11 | Fix Copyright Headers - Rename LLC to Foundation
 - cinder/openstack/common/log.py
    * a82e889 2013-11-14 | Merge "Do not name variables as builtins"
    * 2251cb5 2013-11-13 | Do not name variables as builtins
    * 25c5854 2013-11-13 | Adds admin_password as key to be sanitized when logging
    * cbfded9 2013-11-11 | Default iso8601 logging to WARN
    * 76b0cd1 2013-11-04 | Add mask password impl from other projects
 - cinder/openstack/common/loopingcall.py
    * 1a2df89 2013-06-25 | Enable H302 hacking check
 - cinder/openstack/common/threadgroup.py
    * 9d3c34b 2013-10-25 | Add a link method to Thread
    * 1a2df89 2013-06-25 | Enable H302 hacking check
 - cinder/openstack/common/timeutils.py
    * f3b5f17 2013-11-12 | Add helper method total_seconds in timeutils.py
    * 53ebd30 2013-10-18 | python3: use six.text_types for unicode()
    * 3bc6f79 2013-09-19 | Fix timeutils.set_override_time not defaulting to current wall time
    * af76064 2013-08-29 | Optimize timeutils.utcnow_ts()
    * df3f2ba 2013-07-26 | BaseException.message is deprecated since Python 2.6
    * d28fa69 2013-06-27 | python3: Add python3 compatibility.

Partial bp: multi-process-api-service

Change-Id: Ifd25eae9eb2d6ae53bcf1665c3d5b7db4144433c

cinder/openstack/common/eventlet_backdoor.py
cinder/openstack/common/gettextutils.py
cinder/openstack/common/jsonutils.py
cinder/openstack/common/local.py
cinder/openstack/common/log.py
cinder/openstack/common/loopingcall.py
cinder/openstack/common/service.py
cinder/openstack/common/threadgroup.py
cinder/openstack/common/timeutils.py
etc/cinder/cinder.conf.sample
openstack-common.conf

index 57b89ae914ebdfcd4ad041135d749319b5f4de77..d530cda3734da1a78676842b6398838d835fdfc8 100644 (file)
 
 from __future__ import print_function
 
+import errno
 import gc
+import os
 import pprint
+import socket
 import sys
 import traceback
 
@@ -28,14 +31,34 @@ import eventlet.backdoor
 import greenlet
 from oslo.config import cfg
 
+from cinder.openstack.common.gettextutils import _  # noqa
+from cinder.openstack.common import log as logging
+
+help_for_backdoor_port = (
+    "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+    "in listening on a random tcp port number; <port> results in listening "
+    "on the specified port number (and not enabling backdoor if that port "
+    "is in use); and <start>:<end> results in listening on the smallest "
+    "unused port number within the specified range of port numbers.  The "
+    "chosen port is displayed in the service's log file.")
 eventlet_backdoor_opts = [
-    cfg.IntOpt('backdoor_port',
+    cfg.StrOpt('backdoor_port',
                default=None,
-               help='port for eventlet backdoor to listen')
+               help="Enable eventlet backdoor.  %s" % help_for_backdoor_port)
 ]
 
 CONF = cfg.CONF
 CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+    def __init__(self, port_range, help_msg, ex):
+        msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+               '%(help)s' %
+               {'range': port_range, 'ex': ex, 'help': help_msg})
+        super(EventletBackdoorConfigValueError, self).__init__(msg)
+        self.port_range = port_range
 
 
 def _dont_use_this():
@@ -60,6 +83,33 @@ def _print_nativethreads():
         print()
 
 
+def _parse_port_range(port_range):
+    if ':' not in port_range:
+        start, end = port_range, port_range
+    else:
+        start, end = port_range.split(':', 1)
+    try:
+        start, end = int(start), int(end)
+        if end < start:
+            raise ValueError
+        return start, end
+    except ValueError as ex:
+        raise EventletBackdoorConfigValueError(port_range, ex,
+                                               help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+    try_port = start_port
+    while True:
+        try:
+            return listen_func((host, try_port))
+        except socket.error as exc:
+            if (exc.errno != errno.EADDRINUSE or
+               try_port >= end_port):
+                raise
+            try_port += 1
+
+
 def initialize_if_enabled():
     backdoor_locals = {
         'exit': _dont_use_this,      # So we don't exit the entire process
@@ -72,6 +122,8 @@ def initialize_if_enabled():
     if CONF.backdoor_port is None:
         return None
 
+    start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
     # NOTE(johannes): The standard sys.displayhook will print the value of
     # the last expression and set it to __builtin__._, which overwrites
     # the __builtin__._ that gettext sets. Let's switch to using pprint
@@ -82,8 +134,13 @@ def initialize_if_enabled():
             pprint.pprint(val)
     sys.displayhook = displayhook
 
-    sock = eventlet.listen(('localhost', CONF.backdoor_port))
+    sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+    # In the case of backdoor port being zero, a port number is assigned by
+    # listen().  In any case, pull the port number out here.
     port = sock.getsockname()[1]
+    LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+             {'port': port, 'pid': os.getpid()})
     eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
                      locals=backdoor_locals)
     return port
index 1610322524cf1b011a0bd461e4e1fd9d6f61dfd7..caa6a0925d5ccd3efbc1feef21f7ac08fd3ff4ae 100644 (file)
@@ -317,7 +317,7 @@ def get_available_languages(domain):
     # NOTE(luisg): Babel <1.0 used a function called list(), which was
     # renamed to locale_identifiers() in >=1.0, the requirements master list
     # requires >=0.9.6, uncapped, so defensively work with both. We can remove
-    # this check when the master list updates to >=1.0, and all projects udpate
+    # this check when the master list updates to >=1.0, and update all projects
     list_identifiers = (getattr(localedata, 'list', None) or
                         getattr(localedata, 'locale_identifiers'))
     locale_identifiers = list_identifiers()
@@ -329,13 +329,21 @@ def get_available_languages(domain):
 
 
 def get_localized_message(message, user_locale):
-    """Gets a localized version of the given message in the given locale."""
+    """Gets a localized version of the given message in the given locale.
+
+    If the message is not a Message object the message is returned as-is.
+    If the locale is None the message is translated to the default locale.
+
+    :returns: the translated message in unicode, or the original message if
+              it could not be translated
+    """
+    translated = message
     if isinstance(message, Message):
-        if user_locale:
-            message.locale = user_locale
-        return six.text_type(message)
-    else:
-        return message
+        original_locale = message.locale
+        message.locale = user_locale
+        translated = six.text_type(message)
+        message.locale = original_locale
+    return translated
 
 
 class LocaleHandler(logging.Handler):
index b1fa1afa3b8115a277438c5d58e1e63f4afc430b..b06d855bfaaff920ebbd2d2064e7f601396909f8 100644 (file)
@@ -38,13 +38,19 @@ import functools
 import inspect
 import itertools
 import json
-import types
-import xmlrpclib
+try:
+    import xmlrpclib
+except ImportError:
+    # NOTE(jd): xmlrpclib is not shipped with Python 3
+    xmlrpclib = None
 
 import six
 
+from cinder.openstack.common import gettextutils
+from cinder.openstack.common import importutils
 from cinder.openstack.common import timeutils
 
+netaddr = importutils.try_import("netaddr")
 
 _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.isfunction, inspect.isgeneratorfunction,
@@ -52,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.iscode, inspect.isbuiltin, inspect.isroutine,
                      inspect.isabstract]
 
-_simple_types = (types.NoneType, int, basestring, bool, float, long)
+_simple_types = (six.string_types + six.integer_types
+                 + (type(None), bool, float))
 
 
 def to_primitive(value, convert_instances=False, convert_datetime=True,
@@ -124,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
         # It's not clear why xmlrpclib created their own DateTime type, but
         # for our purposes, make it a datetime type which is explicitly
         # handled
-        if isinstance(value, xmlrpclib.DateTime):
+        if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
             value = datetime.datetime(*tuple(value.timetuple())[:6])
 
         if convert_datetime and isinstance(value, datetime.datetime):
             return timeutils.strtime(value)
+        elif isinstance(value, gettextutils.Message):
+            return value.data
         elif hasattr(value, 'iteritems'):
             return recursive(dict(value.iteritems()), level=level + 1)
         elif hasattr(value, '__iter__'):
@@ -137,6 +146,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             # Likely an instance of something. Watch for cycles.
             # Ignore class member vars.
             return recursive(value.__dict__, level=level + 1)
+        elif netaddr and isinstance(value, netaddr.IPAddress):
+            return six.text_type(value)
         else:
             if any(test(value) for test in _nasty_type_tests):
                 return six.text_type(value)
index f1bfc824bf6103c18f357c0dbcf57da9a76e4551..e82f17d0f3fd307724a7f1b3ffb0454cd449219e 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-"""Greenthread local storage of variables using weak references"""
+"""Local storage of variables using weak references"""
 
+import threading
 import weakref
 
-from eventlet import corolocal
 
-
-class WeakLocal(corolocal.local):
+class WeakLocal(threading.local):
     def __getattribute__(self, attr):
-        rval = corolocal.local.__getattribute__(self, attr)
+        rval = super(WeakLocal, self).__getattribute__(attr)
         if rval:
             # NOTE(mikal): this bit is confusing. What is stored is a weak
             # reference, not the value itself. We therefore need to lookup
@@ -34,7 +33,7 @@ class WeakLocal(corolocal.local):
 
     def __setattr__(self, attr, value):
         value = weakref.ref(value)
-        return corolocal.local.__setattr__(self, attr, value)
+        return super(WeakLocal, self).__setattr__(attr, value)
 
 
 # NOTE(mikal): the name "store" should be deprecated in the future
@@ -45,4 +44,4 @@ store = WeakLocal()
 # "strong" store will hold a reference to the object so that it never falls out
 # of scope.
 weak_store = WeakLocal()
-strong_store = corolocal.local
+strong_store = threading.local()
index 510c405a9196a89c5192cd310f04669dea658db6..d844880cd2225550a4352f258c17e43a29fd23f2 100644 (file)
@@ -35,6 +35,7 @@ import logging
 import logging.config
 import logging.handlers
 import os
+import re
 import sys
 import traceback
 
@@ -50,6 +51,24 @@ from cinder.openstack.common import local
 
 _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
 
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
+
+# NOTE(ldbragst): Let's build a list of regex objects using the list of
+# _SANITIZE_KEYS we already have. This way, we only have to add the new key
+# to the list of _SANITIZE_KEYS and we can generate regular expressions
+# for XML and JSON automatically.
+_SANITIZE_PATTERNS = []
+_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
+                    r'(<%(key)s>).*?(</%(key)s>)',
+                    r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
+                    r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
+
+for key in _SANITIZE_KEYS:
+    for pattern in _FORMAT_PATTERNS:
+        reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+        _SANITIZE_PATTERNS.append(reg_ex)
+
+
 common_cli_opts = [
     cfg.BoolOpt('debug',
                 short='d',
@@ -136,6 +155,7 @@ log_opts = [
                     'qpid=WARN',
                     'sqlalchemy=WARN',
                     'suds=INFO',
+                    'iso8601=WARN',
                 ],
                 help='list of logger=LEVEL pairs'),
     cfg.BoolOpt('publish_errors',
@@ -214,6 +234,39 @@ def _get_log_file_path(binary=None):
     return None
 
 
+def mask_password(message, secret="***"):
+    """Replace password with 'secret' in message.
+
+    :param message: The string which includes security information.
+    :param secret: value with which to replace passwords, defaults to "***".
+    :returns: The unicode value of message with the password fields masked.
+
+    For example:
+    >>> mask_password("'adminPass' : 'aaaaa'")
+    "'adminPass' : '***'"
+    >>> mask_password("'admin_pass' : 'aaaaa'")
+    "'admin_pass' : '***'"
+    >>> mask_password('"password" : "aaaaa"')
+    '"password" : "***"'
+    >>> mask_password("'original_password' : 'aaaaa'")
+    "'original_password' : '***'"
+    >>> mask_password("u'original_password' :   u'aaaaa'")
+    "u'original_password' :   u'***'"
+    """
+    message = six.text_type(message)
+
+    # NOTE(ldbragst): Check to see if anything in message contains any key
+    # specified in _SANITIZE_KEYS, if not then just return the message since
+    # we don't have to mask any passwords.
+    if not any(key in message for key in _SANITIZE_KEYS):
+        return message
+
+    secret = r'\g<1>' + secret + r'\g<2>'
+    for pattern in _SANITIZE_PATTERNS:
+        message = re.sub(pattern, secret, message)
+    return message
+
+
 class BaseLoggerAdapter(logging.LoggerAdapter):
 
     def audit(self, msg, *args, **kwargs):
@@ -336,10 +389,10 @@ class JSONFormatter(logging.Formatter):
 
 
 def _create_logging_excepthook(product_name):
-    def logging_excepthook(type, value, tb):
+    def logging_excepthook(exc_type, value, tb):
         extra = {}
         if CONF.verbose:
-            extra['exc_info'] = (type, value, tb)
+            extra['exc_info'] = (exc_type, value, tb)
         getLogger(product_name).critical(str(value), **extra)
     return logging_excepthook
 
index f62792d5c75329155d774a014b74eeae6f5013cf..9d5a057ec3e72cf1a522ff2e05678abd0cbb75b7 100644 (file)
@@ -22,7 +22,7 @@ import sys
 from eventlet import event
 from eventlet import greenthread
 
-from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common.gettextutils import _  # noqa
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import timeutils
 
index 7cbd3690a64002b523c8f6f43b3acfa454d96aef..8016ba270e9c12641db61e937c70d849cb3a1639 100644 (file)
@@ -20,6 +20,7 @@
 """Generic Node base class for all workers that run on hosts."""
 
 import errno
+import logging as std_logging
 import os
 import random
 import signal
@@ -27,11 +28,11 @@ import sys
 import time
 
 import eventlet
-import logging as std_logging
+from eventlet import event
 from oslo.config import cfg
 
 from cinder.openstack.common import eventlet_backdoor
-from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common.gettextutils import _  # noqa
 from cinder.openstack.common import importutils
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import threadgroup
@@ -42,6 +43,29 @@ CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+def _sighup_supported():
+    return hasattr(signal, 'SIGHUP')
+
+
+def _is_sighup(signo):
+    return _sighup_supported() and signo == signal.SIGHUP
+
+
+def _signo_to_signame(signo):
+    signals = {signal.SIGTERM: 'SIGTERM',
+               signal.SIGINT: 'SIGINT'}
+    if _sighup_supported():
+        signals[signal.SIGHUP] = 'SIGHUP'
+    return signals[signo]
+
+
+def _set_signals_handler(handler):
+    signal.signal(signal.SIGTERM, handler)
+    signal.signal(signal.SIGINT, handler)
+    if _sighup_supported():
+        signal.signal(signal.SIGHUP, handler)
+
+
 class Launcher(object):
     """Launch one or more services and wait for them to complete."""
 
@@ -51,20 +75,9 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services = threadgroup.ThreadGroup()
+        self.services = Services()
         self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
 
-    @staticmethod
-    def run_service(service):
-        """Start and wait for a service to finish.
-
-        :param service: service to run and wait for.
-        :returns: None
-
-        """
-        service.start()
-        service.wait()
-
     def launch_service(self, service):
         """Load and start the given service.
 
@@ -73,7 +86,7 @@ class Launcher(object):
 
         """
         service.backdoor_port = self.backdoor_port
-        self._services.add_thread(self.run_service, service)
+        self.services.add(service)
 
     def stop(self):
         """Stop all services which are currently running.
@@ -81,7 +94,7 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.stop()
+        self.services.stop()
 
     def wait(self):
         """Waits until all services have been stopped, and then returns.
@@ -89,7 +102,16 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.wait()
+        self.services.wait()
+
+    def restart(self):
+        """Reload config files and restart service.
+
+        :returns: None
+
+        """
+        cfg.CONF.reload_config_files()
+        self.services.restart()
 
 
 class SignalExit(SystemExit):
@@ -101,33 +123,48 @@ class SignalExit(SystemExit):
 class ServiceLauncher(Launcher):
     def _handle_signal(self, signo, frame):
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
-
+        _set_signals_handler(signal.SIG_DFL)
         raise SignalExit(signo)
 
-    def wait(self):
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
+    def handle_signal(self):
+        _set_signals_handler(self._handle_signal)
+
+    def _wait_for_exit_or_signal(self, ready_callback=None):
+        status = None
+        signo = 0
 
         LOG.debug(_('Full set of CONF:'))
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
-        status = None
         try:
+            if ready_callback:
+                ready_callback()
             super(ServiceLauncher, self).wait()
         except SignalExit as exc:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[exc.signo]
+            signame = _signo_to_signame(exc.signo)
             LOG.info(_('Caught %s, exiting'), signame)
             status = exc.code
+            signo = exc.signo
         except SystemExit as exc:
             status = exc.code
         finally:
-            if rpc:
-                rpc.cleanup()
             self.stop()
-        return status
+            if rpc:
+                try:
+                    rpc.cleanup()
+                except Exception:
+                    # We're shutting down, so it doesn't matter at this point.
+                    LOG.exception(_('Exception during rpc cleanup.'))
+
+        return status, signo
+
+    def wait(self, ready_callback=None):
+        while True:
+            self.handle_signal()
+            status, signo = self._wait_for_exit_or_signal(ready_callback)
+            if not _is_sighup(signo):
+                return status
+            self.restart()
 
 
 class ServiceWrapper(object):
@@ -145,17 +182,17 @@ class ProcessLauncher(object):
         self.running = True
         rfd, self.writepipe = os.pipe()
         self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+        self.handle_signal()
 
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
+    def handle_signal(self):
+        _set_signals_handler(self._handle_signal)
 
     def _handle_signal(self, signo, frame):
         self.sigcaught = signo
         self.running = False
 
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
+        _set_signals_handler(signal.SIG_DFL)
 
     def _pipe_watcher(self):
         # This will block until the write end is closed when the parent
@@ -166,16 +203,49 @@ class ProcessLauncher(object):
 
         sys.exit(1)
 
-    def _child_process(self, service):
+    def _child_process_handle_signal(self):
         # Setup child signal handlers differently
         def _sigterm(*args):
             signal.signal(signal.SIGTERM, signal.SIG_DFL)
             raise SignalExit(signal.SIGTERM)
 
+        def _sighup(*args):
+            signal.signal(signal.SIGHUP, signal.SIG_DFL)
+            raise SignalExit(signal.SIGHUP)
+
         signal.signal(signal.SIGTERM, _sigterm)
+        if _sighup_supported():
+            signal.signal(signal.SIGHUP, _sighup)
         # Block SIGINT and let the parent send us a SIGTERM
         signal.signal(signal.SIGINT, signal.SIG_IGN)
 
+    def _child_wait_for_exit_or_signal(self, launcher):
+        status = 0
+        signo = 0
+
+        # NOTE(johannes): All exceptions are caught to ensure this
+        # doesn't fallback into the loop spawning children. It would
+        # be bad for a child to spawn more children.
+        try:
+            launcher.wait()
+        except SignalExit as exc:
+            signame = _signo_to_signame(exc.signo)
+            LOG.info(_('Caught %s, exiting'), signame)
+            status = exc.code
+            signo = exc.signo
+        except SystemExit as exc:
+            status = exc.code
+        except BaseException:
+            LOG.exception(_('Unhandled exception'))
+            status = 2
+        finally:
+            launcher.stop()
+
+        return status, signo
+
+    def _child_process(self, service):
+        self._child_process_handle_signal()
+
         # Reopen the eventlet hub to make sure we don't share an epoll
         # fd with parent and/or siblings, which would be bad
         eventlet.hubs.use_hub()
@@ -189,7 +259,8 @@ class ProcessLauncher(object):
         random.seed()
 
         launcher = Launcher()
-        launcher.run_service(service)
+        launcher.launch_service(service)
+        return launcher
 
     def _start_child(self, wrap):
         if len(wrap.forktimes) > wrap.workers:
@@ -207,24 +278,13 @@ class ProcessLauncher(object):
 
         pid = os.fork()
         if pid == 0:
-            # NOTE(johannes): All exceptions are caught to ensure this
-            # doesn't fallback into the loop spawning children. It would
-            # be bad for a child to spawn more children.
-            status = 0
-            try:
-                self._child_process(wrap.service)
-            except SignalExit as exc:
-                signame = {signal.SIGTERM: 'SIGTERM',
-                           signal.SIGINT: 'SIGINT'}[exc.signo]
-                LOG.info(_('Caught %s, exiting'), signame)
-                status = exc.code
-            except SystemExit as exc:
-                status = exc.code
-            except BaseException:
-                LOG.exception(_('Unhandled exception'))
-                status = 2
-            finally:
-                wrap.service.stop()
+            launcher = self._child_process(wrap.service)
+            while True:
+                self._child_process_handle_signal()
+                status, signo = self._child_wait_for_exit_or_signal(launcher)
+                if not _is_sighup(signo):
+                    break
+                launcher.restart()
 
             os._exit(status)
 
@@ -270,12 +330,7 @@ class ProcessLauncher(object):
         wrap.children.remove(pid)
         return wrap
 
-    def wait(self):
-        """Loop waiting on children to die and respawning as necessary."""
-
-        LOG.debug(_('Full set of CONF:'))
-        CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+    def _respawn_children(self):
         while self.running:
             wrap = self._wait_child()
             if not wrap:
@@ -284,14 +339,28 @@ class ProcessLauncher(object):
                 # (see bug #1095346)
                 eventlet.greenthread.sleep(.01)
                 continue
-
             while self.running and len(wrap.children) < wrap.workers:
                 self._start_child(wrap)
 
-        if self.sigcaught:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
-            LOG.info(_('Caught %s, stopping children'), signame)
+    def wait(self):
+        """Loop waiting on children to die and respawning as necessary."""
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        while True:
+            self.handle_signal()
+            self._respawn_children()
+            if self.sigcaught:
+                signame = _signo_to_signame(self.sigcaught)
+                LOG.info(_('Caught %s, stopping children'), signame)
+            if not _is_sighup(self.sigcaught):
+                break
+
+            for pid in self.children:
+                os.kill(pid, signal.SIGHUP)
+            self.running = True
+            self.sigcaught = None
 
         for pid in self.children:
             try:
@@ -313,15 +382,74 @@ class Service(object):
     def __init__(self, threads=1000):
         self.tg = threadgroup.ThreadGroup(threads)
 
+        # signal that the service is done shutting itself down:
+        self._done = event.Event()
+
+    def reset(self):
+        # NOTE(Fengqian): docs for Event.reset() recommend against using it
+        self._done = event.Event()
+
     def start(self):
         pass
 
     def stop(self):
         self.tg.stop()
+        self.tg.wait()
+        # Signal that service cleanup is done:
+        if not self._done.ready():
+            self._done.send()
+
+    def wait(self):
+        self._done.wait()
+
+
+class Services(object):
+
+    def __init__(self):
+        self.services = []
+        self.tg = threadgroup.ThreadGroup()
+        self.done = event.Event()
+
+    def add(self, service):
+        self.services.append(service)
+        self.tg.add_thread(self.run_service, service, self.done)
+
+    def stop(self):
+        # wait for graceful shutdown of services:
+        for service in self.services:
+            service.stop()
+            service.wait()
+
+        # Each service has performed cleanup, now signal that the run_service
+        # wrapper threads can now die:
+        if not self.done.ready():
+            self.done.send()
+
+        # reap threads:
+        self.tg.stop()
 
     def wait(self):
         self.tg.wait()
 
+    def restart(self):
+        self.stop()
+        self.done = event.Event()
+        for restart_service in self.services:
+            restart_service.reset()
+            self.tg.add_thread(self.run_service, restart_service, self.done)
+
+    @staticmethod
+    def run_service(service, done):
+        """Service start wrapper.
+
+        :param service: service to run
+        :param done: event to wait on until a shutdown is triggered
+        :returns: None
+
+        """
+        service.start()
+        done.wait()
+
 
 def launch(service, workers=None):
     if workers:
index 7e80416038910a891686788587a38c7e3989944e..36e05e9ccd4cbdfeda0f8d3d625c4fd696578d2a 100644 (file)
@@ -14,7 +14,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from eventlet import greenlet
+import eventlet
 from eventlet import greenpool
 from eventlet import greenthread
 
@@ -48,6 +48,9 @@ class Thread(object):
     def wait(self):
         return self.thread.wait()
 
+    def link(self, func, *args, **kwargs):
+        self.thread.link(func, *args, **kwargs)
+
 
 class ThreadGroup(object):
     """The point of the ThreadGroup classis to:
@@ -79,6 +82,7 @@ class ThreadGroup(object):
         gt = self.pool.spawn(callback, *args, **kwargs)
         th = Thread(gt, self)
         self.threads.append(th)
+        return th
 
     def thread_done(self, thread):
         self.threads.remove(thread)
@@ -105,7 +109,7 @@ class ThreadGroup(object):
         for x in self.timers:
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
@@ -115,7 +119,7 @@ class ThreadGroup(object):
                 continue
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
index ac2441bcb41cea1faeb2f93133f68fd07b848e59..db471ea51eee69acf6456b59ecaae6dfd341e5a6 100644 (file)
@@ -21,8 +21,10 @@ Time related utilities and helper functions.
 
 import calendar
 import datetime
+import time
 
 import iso8601
+import six
 
 
 # ISO 8601 extended time format with microseconds
@@ -48,9 +50,9 @@ def parse_isotime(timestr):
     try:
         return iso8601.parse_date(timestr)
     except iso8601.ParseError as e:
-        raise ValueError(e.message)
+        raise ValueError(six.text_type(e))
     except TypeError as e:
-        raise ValueError(e.message)
+        raise ValueError(six.text_type(e))
 
 
 def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@@ -75,20 +77,25 @@ def normalize_time(timestamp):
 
 def is_older_than(before, seconds):
     """Return True if before is older than seconds."""
-    if isinstance(before, basestring):
+    if isinstance(before, six.string_types):
         before = parse_strtime(before).replace(tzinfo=None)
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
 def is_newer_than(after, seconds):
     """Return True if after is newer than seconds."""
-    if isinstance(after, basestring):
+    if isinstance(after, six.string_types):
         after = parse_strtime(after).replace(tzinfo=None)
     return after - utcnow() > datetime.timedelta(seconds=seconds)
 
 
 def utcnow_ts():
     """Timestamp version of our utcnow function."""
+    if utcnow.override_time is None:
+        # NOTE(kgriffs): This is several times faster
+        # than going through calendar.timegm(...)
+        return int(time.time())
+
     return calendar.timegm(utcnow().timetuple())
 
 
@@ -110,12 +117,15 @@ def iso8601_from_timestamp(timestamp):
 utcnow.override_time = None
 
 
-def set_time_override(override_time=datetime.datetime.utcnow()):
+def set_time_override(override_time=None):
     """Overrides utils.utcnow.
 
     Make it return a constant time or a list thereof, one at a time.
+
+    :param override_time: datetime instance or list thereof. If not
+                          given, defaults to the current UTC time.
     """
-    utcnow.override_time = override_time
+    utcnow.override_time = override_time or datetime.datetime.utcnow()
 
 
 def advance_time_delta(timedelta):
@@ -168,6 +178,15 @@ def delta_seconds(before, after):
     datetime objects (as a float, to microsecond resolution).
     """
     delta = after - before
+    return total_seconds(delta)
+
+
+def total_seconds(delta):
+    """Return the total seconds of datetime.timedelta object.
+
+    Compute total seconds of datetime.timedelta, datetime.timedelta
+    doesn't have method total_seconds in Python2.6, calculate it manually.
+    """
     try:
         return delta.total_seconds()
     except AttributeError:
index 9d54af9e808e61d088180b78c2c3d5798fdd9434..2b67d8456335e7af36a920d8828073928b35725f 100644 (file)
 # Options defined in cinder.openstack.common.eventlet_backdoor
 #
 
-# port for eventlet backdoor to listen (integer value)
+# Enable eventlet backdoor.  Acceptable values are 0, <port>,
+# and <start>:<end>, where 0 results in listening on a random
+# tcp port number; <port> results in listening on the
+# specified port number (and not enabling backdoor if that
+# port is in use); and <start>:<end> results in listening on
+# the smallest unused port number within the specified range
+# of port numbers.  The chosen port is displayed in the
+# service's log file. (string value)
 #backdoor_port=<None>
 
 
 #logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
 
 # list of logger=LEVEL pairs (list value)
-#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO
+#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN
 
 # publish error events (boolean value)
 #publish_errors=false
index 50294c5945ff8623c0ba0c8db8769acbc93d74b6..da7ed46c6844d8dfc75cb11aa2a8be070c22f67e 100644 (file)
@@ -25,6 +25,7 @@ module=rpc
 module=scheduler
 module=scheduler.filters
 module=scheduler.weights
+module=service
 module=strutils
 module=timeutils
 module=uuidutils