Get latest service module from Oslo to prepare for multi-process API service implementation.
Below are the commits included in this pull.
Changes being pulled into in service module are:
*
e7bc8c9 2013-11-20 | Merge "os._exit in _start_child may cause unexpected exception"
*
96a2d4e 2013-11-07 | os._exit in _start_child may cause unexpected exception
*
1771a77 2013-11-05 | Adjust import order according to PEP8 imports rule
*
3110c0f 2013-10-17 | Use multiprocessing.Event to ensure services have started
*
b5fba9e 2013-09-18 | Move comment in service.py to correct location
*
11cc74f 2013-08-26 | Fixes issue with SUGHUP in services on Windows
*
825ace5 2013-06-17 | Add service restart function in oslo-incubator
*
c935d1c 2013-07-16 | Merge "Allow launchers to be stopped multiple times"
*
dc8aa79 2013-07-08 | Allow launchers to be stopped multiple times
*
1a2df89 2013-06-25 | Enable H302 hacking check
*
52e857a 2013-06-19 | Ignore any exceptions from rpc.cleanup().
*
5518ad3 2013-05-16 | Add graceful service shutdown support to Launcher
And these dependent modules
- cinder/openstack/common/eventlet_backdoor.py
*
1dcc747 2013-07-15 | Fix stylistic problems with help text
*
1a2df89 2013-06-25 | Enable H302 hacking check
*
c7c55b2 2013-06-20 | Improve usability when backdoor_port is nonzero
- cinder/openstack/common/gettextutils.py
*
3970d46 2013-11-02 | Fix typos in oslo
*
88db9c8 2013-10-03 | When translating if no locale is given use default locale
- cinder/openstack/common/jsonutils.py
*
3d7504b 2013-09-23 | Ensure that Message objects will be sent via RPC in unicode format
*
1807d32 2013-08-22 | jsonutils: make types py3 compatible
*
bdef862 2013-08-22 | jsonutils: do not require xmlrpclib
*
ded9bd6 2013-08-04 | Make dependency on netaddr optional
*
7b7566b 2013-06-25 | Add netaddr.IPAddress support to to_primitive()
- cinder/openstack/common/local.py
*
cb2a2b6 2013-06-28 | Modify local.py to not be dependent on Eventlet
*
547ab34 2013-03-11 | Fix Copyright Headers - Rename LLC to Foundation
- cinder/openstack/common/log.py
*
a82e889 2013-11-14 | Merge "Do not name variables as builtins"
*
2251cb5 2013-11-13 | Do not name variables as builtins
*
25c5854 2013-11-13 | Adds admin_password as key to be sanitized when logging
*
cbfded9 2013-11-11 | Default iso8601 logging to WARN
*
76b0cd1 2013-11-04 | Add mask password impl from other projects
- cinder/openstack/common/loopingcall.py
*
1a2df89 2013-06-25 | Enable H302 hacking check
- cinder/openstack/common/threadgroup.py
*
9d3c34b 2013-10-25 | Add a link method to Thread
*
1a2df89 2013-06-25 | Enable H302 hacking check
- cinder/openstack/common/timeutils.py
*
f3b5f17 2013-11-12 | Add helper method total_seconds in timeutils.py
*
53ebd30 2013-10-18 | python3: use six.text_types for unicode()
*
3bc6f79 2013-09-19 | Fix timeutils.set_override_time not defaulting to current wall time
*
af76064 2013-08-29 | Optimize timeutils.utcnow_ts()
*
df3f2ba 2013-07-26 | BaseException.message is deprecated since Python 2.6
*
d28fa69 2013-06-27 | python3: Add python3 compatibility.
Partial bp: multi-process-api-service
Change-Id: Ifd25eae9eb2d6ae53bcf1665c3d5b7db4144433c
from __future__ import print_function
+import errno
import gc
+import os
import pprint
+import socket
import sys
import traceback
import greenlet
from oslo.config import cfg
+from cinder.openstack.common.gettextutils import _ # noqa
+from cinder.openstack.common import log as logging
+
+help_for_backdoor_port = (
+ "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+ "in listening on a random tcp port number; <port> results in listening "
+ "on the specified port number (and not enabling backdoor if that port "
+ "is in use); and <start>:<end> results in listening on the smallest "
+ "unused port number within the specified range of port numbers. The "
+ "chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
- cfg.IntOpt('backdoor_port',
+ cfg.StrOpt('backdoor_port',
default=None,
- help='port for eventlet backdoor to listen')
+ help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+ def __init__(self, port_range, help_msg, ex):
+ msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+ '%(help)s' %
+ {'range': port_range, 'ex': ex, 'help': help_msg})
+ super(EventletBackdoorConfigValueError, self).__init__(msg)
+ self.port_range = port_range
def _dont_use_this():
print()
+def _parse_port_range(port_range):
+ if ':' not in port_range:
+ start, end = port_range, port_range
+ else:
+ start, end = port_range.split(':', 1)
+ try:
+ start, end = int(start), int(end)
+ if end < start:
+ raise ValueError
+ return start, end
+ except ValueError as ex:
+ raise EventletBackdoorConfigValueError(port_range, ex,
+ help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+ try_port = start_port
+ while True:
+ try:
+ return listen_func((host, try_port))
+ except socket.error as exc:
+ if (exc.errno != errno.EADDRINUSE or
+ try_port >= end_port):
+ raise
+ try_port += 1
+
+
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
if CONF.backdoor_port is None:
return None
+ start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
pprint.pprint(val)
sys.displayhook = displayhook
- sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+ # In the case of backdoor port being zero, a port number is assigned by
+ # listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
+ LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+ {'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
- # this check when the master list updates to >=1.0, and all projects udpate
+ # this check when the master list updates to >=1.0, and update all projects
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
def get_localized_message(message, user_locale):
- """Gets a localized version of the given message in the given locale."""
+ """Gets a localized version of the given message in the given locale.
+
+ If the message is not a Message object the message is returned as-is.
+ If the locale is None the message is translated to the default locale.
+
+ :returns: the translated message in unicode, or the original message if
+ it could not be translated
+ """
+ translated = message
if isinstance(message, Message):
- if user_locale:
- message.locale = user_locale
- return six.text_type(message)
- else:
- return message
+ original_locale = message.locale
+ message.locale = user_locale
+ translated = six.text_type(message)
+ message.locale = original_locale
+ return translated
class LocaleHandler(logging.Handler):
import inspect
import itertools
import json
-import types
-import xmlrpclib
+try:
+ import xmlrpclib
+except ImportError:
+ # NOTE(jd): xmlrpclib is not shipped with Python 3
+ xmlrpclib = None
import six
+from cinder.openstack.common import gettextutils
+from cinder.openstack.common import importutils
from cinder.openstack.common import timeutils
+netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
-_simple_types = (types.NoneType, int, basestring, bool, float, long)
+_simple_types = (six.string_types + six.integer_types
+ + (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
- if isinstance(value, xmlrpclib.DateTime):
+ if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
+ elif isinstance(value, gettextutils.Message):
+ return value.data
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
+ elif netaddr and isinstance(value, netaddr.IPAddress):
+ return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):
return six.text_type(value)
# License for the specific language governing permissions and limitations
# under the License.
-"""Greenthread local storage of variables using weak references"""
+"""Local storage of variables using weak references"""
+import threading
import weakref
-from eventlet import corolocal
-
-class WeakLocal(corolocal.local):
+class WeakLocal(threading.local):
def __getattribute__(self, attr):
- rval = corolocal.local.__getattribute__(self, attr)
+ rval = super(WeakLocal, self).__getattribute__(attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
def __setattr__(self, attr, value):
value = weakref.ref(value)
- return corolocal.local.__setattr__(self, attr, value)
+ return super(WeakLocal, self).__setattr__(attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
-strong_store = corolocal.local
+strong_store = threading.local()
import logging.config
import logging.handlers
import os
+import re
import sys
import traceback
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
+
+# NOTE(ldbragst): Let's build a list of regex objects using the list of
+# _SANITIZE_KEYS we already have. This way, we only have to add the new key
+# to the list of _SANITIZE_KEYS and we can generate regular expressions
+# for XML and JSON automatically.
+_SANITIZE_PATTERNS = []
+_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
+ r'(<%(key)s>).*?(</%(key)s>)',
+ r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
+ r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
+
+for key in _SANITIZE_KEYS:
+ for pattern in _FORMAT_PATTERNS:
+ reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+ _SANITIZE_PATTERNS.append(reg_ex)
+
+
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
'qpid=WARN',
'sqlalchemy=WARN',
'suds=INFO',
+ 'iso8601=WARN',
],
help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors',
return None
+def mask_password(message, secret="***"):
+ """Replace password with 'secret' in message.
+
+ :param message: The string which includes security information.
+ :param secret: value with which to replace passwords, defaults to "***".
+ :returns: The unicode value of message with the password fields masked.
+
+ For example:
+ >>> mask_password("'adminPass' : 'aaaaa'")
+ "'adminPass' : '***'"
+ >>> mask_password("'admin_pass' : 'aaaaa'")
+ "'admin_pass' : '***'"
+ >>> mask_password('"password" : "aaaaa"')
+ '"password" : "***"'
+ >>> mask_password("'original_password' : 'aaaaa'")
+ "'original_password' : '***'"
+ >>> mask_password("u'original_password' : u'aaaaa'")
+ "u'original_password' : u'***'"
+ """
+ message = six.text_type(message)
+
+ # NOTE(ldbragst): Check to see if anything in message contains any key
+ # specified in _SANITIZE_KEYS, if not then just return the message since
+ # we don't have to mask any passwords.
+ if not any(key in message for key in _SANITIZE_KEYS):
+ return message
+
+ secret = r'\g<1>' + secret + r'\g<2>'
+ for pattern in _SANITIZE_PATTERNS:
+ message = re.sub(pattern, secret, message)
+ return message
+
+
class BaseLoggerAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
def _create_logging_excepthook(product_name):
- def logging_excepthook(type, value, tb):
+ def logging_excepthook(exc_type, value, tb):
extra = {}
if CONF.verbose:
- extra['exc_info'] = (type, value, tb)
+ extra['exc_info'] = (exc_type, value, tb)
getLogger(product_name).critical(str(value), **extra)
return logging_excepthook
from eventlet import event
from eventlet import greenthread
-from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common.gettextutils import _ # noqa
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
"""Generic Node base class for all workers that run on hosts."""
import errno
+import logging as std_logging
import os
import random
import signal
import time
import eventlet
-import logging as std_logging
+from eventlet import event
from oslo.config import cfg
from cinder.openstack.common import eventlet_backdoor
-from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common.gettextutils import _ # noqa
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import threadgroup
LOG = logging.getLogger(__name__)
+def _sighup_supported():
+ return hasattr(signal, 'SIGHUP')
+
+
+def _is_sighup(signo):
+ return _sighup_supported() and signo == signal.SIGHUP
+
+
+def _signo_to_signame(signo):
+ signals = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}
+ if _sighup_supported():
+ signals[signal.SIGHUP] = 'SIGHUP'
+ return signals[signo]
+
+
+def _set_signals_handler(handler):
+ signal.signal(signal.SIGTERM, handler)
+ signal.signal(signal.SIGINT, handler)
+ if _sighup_supported():
+ signal.signal(signal.SIGHUP, handler)
+
+
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
- self._services.wait()
+ self.services.wait()
+
+ def restart(self):
+ """Reload config files and restart service.
+
+ :returns: None
+
+ """
+ cfg.CONF.reload_config_files()
+ self.services.restart()
class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
+ _set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
- def wait(self):
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
+ def handle_signal(self):
+ _set_signals_handler(self._handle_signal)
+
+ def _wait_for_exit_or_signal(self, ready_callback=None):
+ status = None
+ signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
- status = None
try:
+ if ready_callback:
+ ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
+ signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
+ signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
- if rpc:
- rpc.cleanup()
self.stop()
- return status
+ if rpc:
+ try:
+ rpc.cleanup()
+ except Exception:
+ # We're shutting down, so it doesn't matter at this point.
+ LOG.exception(_('Exception during rpc cleanup.'))
+
+ return status, signo
+
+ def wait(self, ready_callback=None):
+ while True:
+ self.handle_signal()
+ status, signo = self._wait_for_exit_or_signal(ready_callback)
+ if not _is_sighup(signo):
+ return status
+ self.restart()
class ServiceWrapper(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+ self.handle_signal()
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
+ def handle_signal(self):
+ _set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ _set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
sys.exit(1)
- def _child_process(self, service):
+ def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
+ def _sighup(*args):
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ raise SignalExit(signal.SIGHUP)
+
signal.signal(signal.SIGTERM, _sigterm)
+ if _sighup_supported():
+ signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ def _child_wait_for_exit_or_signal(self, launcher):
+ status = 0
+ signo = 0
+
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ try:
+ launcher.wait()
+ except SignalExit as exc:
+ signame = _signo_to_signame(exc.signo)
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ signo = exc.signo
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ launcher.stop()
+
+ return status, signo
+
+ def _child_process(self, service):
+ self._child_process_handle_signal()
+
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
pid = os.fork()
if pid == 0:
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.service)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.service.stop()
+ launcher = self._child_process(wrap.service)
+ while True:
+ self._child_process_handle_signal()
+ status, signo = self._child_wait_for_exit_or_signal(launcher)
+ if not _is_sighup(signo):
+ break
+ launcher.restart()
os._exit(status)
wrap.children.remove(pid)
return wrap
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
-
- LOG.debug(_('Full set of CONF:'))
- CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+ def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
-
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary."""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while True:
+ self.handle_signal()
+ self._respawn_children()
+ if self.sigcaught:
+ signame = _signo_to_signame(self.sigcaught)
+ LOG.info(_('Caught %s, stopping children'), signame)
+ if not _is_sighup(self.sigcaught):
+ break
+
+ for pid in self.children:
+ os.kill(pid, signal.SIGHUP)
+ self.running = True
+ self.sigcaught = None
for pid in self.children:
try:
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
+ def reset(self):
+ # NOTE(Fengqian): docs for Event.reset() recommend against using it
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ # Signal that service cleanup is done:
+ if not self._done.ready():
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # Each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ if not self.done.ready():
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ def restart(self):
+ self.stop()
+ self.done = event.Event()
+ for restart_service in self.services:
+ restart_service.reset()
+ self.tg.add_thread(self.run_service, restart_service, self.done)
+
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers:
# License for the specific language governing permissions and limitations
# under the License.
-from eventlet import greenlet
+import eventlet
from eventlet import greenpool
from eventlet import greenthread
def wait(self):
return self.thread.wait()
+ def link(self, func, *args, **kwargs):
+ self.thread.link(func, *args, **kwargs)
+
class ThreadGroup(object):
"""The point of the ThreadGroup classis to:
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
+ return th
def thread_done(self, thread):
self.threads.remove(thread)
for x in self.timers:
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
continue
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
import calendar
import datetime
+import time
import iso8601
+import six
# ISO 8601 extended time format with microseconds
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
- raise ValueError(e.message)
+ raise ValueError(six.text_type(e))
except TypeError as e:
- raise ValueError(e.message)
+ raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
- if isinstance(before, basestring):
+ if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
- if isinstance(after, basestring):
+ if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
+ if utcnow.override_time is None:
+ # NOTE(kgriffs): This is several times faster
+ # than going through calendar.timegm(...)
+ return int(time.time())
+
return calendar.timegm(utcnow().timetuple())
utcnow.override_time = None
-def set_time_override(override_time=datetime.datetime.utcnow()):
+def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
+
+ :param override_time: datetime instance or list thereof. If not
+ given, defaults to the current UTC time.
"""
- utcnow.override_time = override_time
+ utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
+ return total_seconds(delta)
+
+
+def total_seconds(delta):
+ """Return the total seconds of datetime.timedelta object.
+
+ Compute total seconds of datetime.timedelta, datetime.timedelta
+ doesn't have method total_seconds in Python2.6, calculate it manually.
+ """
try:
return delta.total_seconds()
except AttributeError:
# Options defined in cinder.openstack.common.eventlet_backdoor
#
-# port for eventlet backdoor to listen (integer value)
+# Enable eventlet backdoor. Acceptable values are 0, <port>,
+# and <start>:<end>, where 0 results in listening on a random
+# tcp port number; <port> results in listening on the
+# specified port number (and not enabling backdoor if that
+# port is in use); and <start>:<end> results in listening on
+# the smallest unused port number within the specified range
+# of port numbers. The chosen port is displayed in the
+# service's log file. (string value)
#backdoor_port=<None>
#logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# list of logger=LEVEL pairs (list value)
-#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO
+#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN
# publish error events (boolean value)
#publish_errors=false
module=scheduler
module=scheduler.filters
module=scheduler.weights
+module=service
module=strutils
module=timeutils
module=uuidutils