:returns: the translated object, or the object as-is if it
was not translated
"""
- localize = gettextutils.get_localized_message
+ localize = gettextutils.translate
if isinstance(translatable, exceptions.NeutronException):
translatable.msg = localize(translatable.msg, locale)
elif isinstance(translatable, webob.exc.HTTPError):
if req.path != '/':
language = req.best_match_language()
msg = _('Unknown API version specified')
- msg = gettextutils.get_localized_message(msg, language)
+ msg = gettextutils.translate(msg, language)
return webob.exc.HTTPNotFound(explanation=msg)
builder = versions_view.get_view_builder(req)
return self.app.debug_agent
def run(self, parsed_args):
- self.log.debug('run(%s)' % parsed_args)
- self.app.stdout.write(_('Unimplemented commands') + '\n')
+ self.log.debug('run(%s)', parsed_args)
+ self.log.info(_('Unimplemented commands'))
class CreateProbe(ProbeCommand):
debug_agent = self.get_debug_agent()
port = debug_agent.create_probe(parsed_args.id,
parsed_args.device_owner)
- self.app.stdout.write(_('Probe created : %s ') % port.id + '\n')
+ self.log.info(_('Probe created : %s '), port.id)
class DeleteProbe(ProbeCommand):
self.log.debug('run(%s)' % parsed_args)
debug_agent = self.get_debug_agent()
debug_agent.delete_probe(parsed_args.id)
- self.app.stdout.write(_('Probe %s deleted') % parsed_args.id + '\n')
+ self.log.info(_('Probe %s deleted'), parsed_args.id)
class ListProbe(NeutronCommand, lister.Lister):
self.log.debug('run(%s)' % parsed_args)
debug_agent = self.get_debug_agent()
debug_agent.clear_probe()
- self.app.stdout.write(_('All Probes deleted ') + '\n')
+ self.log.info(_('All Probes deleted '))
class ExecProbe(ProbeCommand):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
from __future__ import print_function
+import errno
import gc
+import os
import pprint
+import socket
import sys
import traceback
import greenlet
from oslo.config import cfg
+from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import log as logging
+
+help_for_backdoor_port = (
+ "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+ "in listening on a random tcp port number; <port> results in listening "
+ "on the specified port number (and not enabling backdoor if that port "
+ "is in use); and <start>:<end> results in listening on the smallest "
+ "unused port number within the specified range of port numbers. The "
+ "chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
- cfg.IntOpt('backdoor_port',
+ cfg.StrOpt('backdoor_port',
default=None,
- help='port for eventlet backdoor to listen')
+ help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+ def __init__(self, port_range, help_msg, ex):
+ msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+ '%(help)s' %
+ {'range': port_range, 'ex': ex, 'help': help_msg})
+ super(EventletBackdoorConfigValueError, self).__init__(msg)
+ self.port_range = port_range
def _dont_use_this():
def _find_objects(t):
- return filter(lambda o: isinstance(o, t), gc.get_objects())
+ return [o for o in gc.get_objects() if isinstance(o, t)]
def _print_greenthreads():
print()
+def _parse_port_range(port_range):
+ if ':' not in port_range:
+ start, end = port_range, port_range
+ else:
+ start, end = port_range.split(':', 1)
+ try:
+ start, end = int(start), int(end)
+ if end < start:
+ raise ValueError
+ return start, end
+ except ValueError as ex:
+ raise EventletBackdoorConfigValueError(port_range, ex,
+ help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+ try_port = start_port
+ while True:
+ try:
+ return listen_func((host, try_port))
+ except socket.error as exc:
+ if (exc.errno != errno.EADDRINUSE or
+ try_port >= end_port):
+ raise
+ try_port += 1
+
+
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
if CONF.backdoor_port is None:
return None
+ start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
pprint.pprint(val)
sys.displayhook = displayhook
- sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+ # In the case of backdoor port being zero, a port number is assigned by
+ # listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
+ LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+ {'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
Exception related utilities.
"""
-import contextlib
import logging
import sys
+import time
import traceback
+import six
+
from neutron.openstack.common.gettextutils import _
-@contextlib.contextmanager
-def save_and_reraise_exception():
+class save_and_reraise_exception(object):
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised.
+
+ In some cases the caller may not want to re-raise the exception, and
+ for those circumstances this context provides a reraise flag that
+ can be used to suppress the exception. For example::
+
+ except Exception:
+ with save_and_reraise_exception() as ctxt:
+ decide_if_need_reraise()
+ if not should_be_reraised:
+ ctxt.reraise = False
"""
- type_, value, tb = sys.exc_info()
- try:
- yield
- except Exception:
- logging.error(_('Original exception being dropped: %s'),
- traceback.format_exception(type_, value, tb))
- raise
- raise type_, value, tb
+ def __init__(self):
+ self.reraise = True
+
+ def __enter__(self):
+ self.type_, self.value, self.tb, = sys.exc_info()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ logging.error(_('Original exception being dropped: %s'),
+ traceback.format_exception(self.type_,
+ self.value,
+ self.tb))
+ return False
+ if self.reraise:
+ six.reraise(self.type_, self.value, self.tb)
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ this_exc_message = six.u(str(exc))
+ if this_exc_message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ this_exc_message != last_exc_message):
+ logging.exception(
+ _('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = this_exc_message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
import contextlib
import errno
import os
+import tempfile
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
return (reloaded, cache_info['data'])
-def delete_if_exists(path):
+def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
+ :param remove: Optional function to remove passed path
"""
try:
- os.unlink(path)
+ remove(path)
except OSError as e:
- if e.errno == errno.ENOENT:
- return
- else:
+ if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
-def remove_path_on_error(path):
+def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
+ :param remove: Optional function to remove passed path
"""
+
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
- delete_if_exists(path)
+ remove(path)
def file_open(*args, **kwargs):
state at all (for unit tests)
"""
return file(*args, **kwargs)
+
+
+def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
+ """Create temporary file or use existing file.
+
+ This util is needed for creating temporary file with
+ specified content, suffix and prefix. If path is not None,
+ it will be used for writing content. If the path doesn't
+ exist it'll be created.
+
+ :param content: content for temporary file.
+ :param path: same as parameter 'dir' for mkstemp
+ :param suffix: same as parameter 'suffix' for mkstemp
+ :param prefix: same as parameter 'prefix' for mkstemp
+
+ For example: it can be used in database tests for creating
+ configuration files.
+ """
+ if path:
+ ensure_tree(path)
+
+ (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
+ try:
+ os.write(fd, content)
+ finally:
+ os.close(fd)
+ return path
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Red Hat, Inc.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
import copy
import gettext
-import logging
+import locale
+from logging import handlers
import os
import re
-try:
- import UserString as _userString
-except ImportError:
- import collections as _userString
from babel import localedata
import six
def _(msg):
if USE_LAZY:
- return Message(msg, 'neutron')
+ return Message(msg, domain='neutron')
else:
if six.PY3:
return _t.gettext(msg)
# messages in OpenStack. We override the standard _() function
# and % (format string) operation to build Message objects that can
# later be translated when we have more information.
- #
- # Also included below is an example LocaleHandler that translates
- # Messages to an associated locale, effectively allowing many logs,
- # each with their own locale.
-
def _lazy_gettext(msg):
"""Create and return a Message object.
Message encapsulates a string so that we can translate
it later when needed.
"""
- return Message(msg, domain)
+ return Message(msg, domain=domain)
from six import moves
moves.builtins.__dict__['_'] = _lazy_gettext
unicode=True)
-class Message(_userString.UserString, object):
- """Class used to encapsulate translatable messages."""
- def __init__(self, msg, domain):
- # _msg is the gettext msgid and should never change
- self._msg = msg
- self._left_extra_msg = ''
- self._right_extra_msg = ''
- self._locale = None
- self.params = None
- self.domain = domain
-
- @property
- def data(self):
- # NOTE(mrodden): this should always resolve to a unicode string
- # that best represents the state of the message currently
-
- localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
- if self.locale:
- lang = gettext.translation(self.domain,
- localedir=localedir,
- languages=[self.locale],
- fallback=True)
- else:
- # use system locale for translations
- lang = gettext.translation(self.domain,
- localedir=localedir,
- fallback=True)
+class Message(six.text_type):
+ """A Message object is a unicode object that can be translated.
+
+ Translation of Message is done explicitly using the translate() method.
+ For all non-translation intents and purposes, a Message is simply unicode,
+ and can be treated as such.
+ """
+ def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
+ """Create a new Message object.
+
+ In order for translation to work gettext requires a message ID, this
+ msgid will be used as the base unicode text. It is also possible
+ for the msgid and the base unicode text to be different by passing
+ the msgtext parameter.
+ """
+ # If the base msgtext is not given, we use the default translation
+ # of the msgid (which is in English) just in case the system locale is
+ # not English, so that the base text will be in that locale by default.
+ if not msgtext:
+ msgtext = Message._translate_msgid(msgid, domain)
+ # We want to initialize the parent unicode with the actual object that
+ # would have been plain unicode if 'Message' was not enabled.
+ msg = super(Message, cls).__new__(cls, msgtext)
+ msg.msgid = msgid
+ msg.domain = domain
+ msg.params = params
+ return msg
+
+ def translate(self, desired_locale=None):
+ """Translate this message to the desired locale.
+
+ :param desired_locale: The desired locale to translate the message to,
+ if no locale is provided the message will be
+ translated to the system's default locale.
+
+ :returns: the translated message in unicode
+ """
+
+ translated_message = Message._translate_msgid(self.msgid,
+ self.domain,
+ desired_locale)
+ if self.params is None:
+ # No need for more translation
+ return translated_message
+
+ # This Message object may have been formatted with one or more
+ # Message objects as substitution arguments, given either as a single
+ # argument, part of a tuple, or as one or more values in a dictionary.
+ # When translating this Message we need to translate those Messages too
+ translated_params = _translate_args(self.params, desired_locale)
+
+ translated_message = translated_message % translated_params
+
+ return translated_message
+
+ @staticmethod
+ def _translate_msgid(msgid, domain, desired_locale=None):
+ if not desired_locale:
+ system_locale = locale.getdefaultlocale()
+ # If the system locale is not available to the runtime use English
+ if not system_locale[0]:
+ desired_locale = 'en_US'
+ else:
+ desired_locale = system_locale[0]
+
+ locale_dir = os.environ.get(domain.upper() + '_LOCALEDIR')
+ lang = gettext.translation(domain,
+ localedir=locale_dir,
+ languages=[desired_locale],
+ fallback=True)
if six.PY3:
- ugettext = lang.gettext
- else:
- ugettext = lang.ugettext
-
- full_msg = (self._left_extra_msg +
- ugettext(self._msg) +
- self._right_extra_msg)
-
- if self.params is not None:
- full_msg = full_msg % self.params
-
- return six.text_type(full_msg)
-
- @property
- def locale(self):
- return self._locale
-
- @locale.setter
- def locale(self, value):
- self._locale = value
- if not self.params:
- return
-
- # This Message object may have been constructed with one or more
- # Message objects as substitution parameters, given as a single
- # Message, or a tuple or Map containing some, so when setting the
- # locale for this Message we need to set it for those Messages too.
- if isinstance(self.params, Message):
- self.params.locale = value
- return
- if isinstance(self.params, tuple):
- for param in self.params:
- if isinstance(param, Message):
- param.locale = value
- return
- if isinstance(self.params, dict):
- for param in self.params.values():
- if isinstance(param, Message):
- param.locale = value
-
- def _save_dictionary_parameter(self, dict_param):
- full_msg = self.data
- # look for %(blah) fields in string;
- # ignore %% and deal with the
- # case where % is first character on the line
- keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg)
-
- # if we don't find any %(blah) blocks but have a %s
- if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
- # apparently the full dictionary is the parameter
- params = copy.deepcopy(dict_param)
+ translator = lang.gettext
else:
- params = {}
- for key in keys:
- try:
- params[key] = copy.deepcopy(dict_param[key])
- except TypeError:
- # cast uncopyable thing to unicode string
- params[key] = six.text_type(dict_param[key])
+ translator = lang.ugettext
- return params
+ translated_message = translator(msgid)
+ return translated_message
- def _save_parameters(self, other):
- # we check for None later to see if
- # we actually have parameters to inject,
- # so encapsulate if our parameter is actually None
+ def __mod__(self, other):
+ # When we mod a Message we want the actual operation to be performed
+ # by the parent class (i.e. unicode()), the only thing we do here is
+ # save the original msgid and the parameters in case of a translation
+ unicode_mod = super(Message, self).__mod__(other)
+ modded = Message(self.msgid,
+ msgtext=unicode_mod,
+ params=self._sanitize_mod_params(other),
+ domain=self.domain)
+ return modded
+
+ def _sanitize_mod_params(self, other):
+ """Sanitize the object being modded with this Message.
+
+ - Add support for modding 'None' so translation supports it
+ - Trim the modded object, which can be a large dictionary, to only
+ those keys that would actually be used in a translation
+ - Snapshot the object being modded, in case the message is
+ translated, it will be used as it was when the Message was created
+ """
if other is None:
- self.params = (other, )
+ params = (other,)
elif isinstance(other, dict):
- self.params = self._save_dictionary_parameter(other)
+ params = self._trim_dictionary_parameters(other)
else:
- # fallback to casting to unicode,
- # this will handle the problematic python code-like
- # objects that cannot be deep-copied
- try:
- self.params = copy.deepcopy(other)
- except TypeError:
- self.params = six.text_type(other)
-
- return self
-
- # overrides to be more string-like
- def __unicode__(self):
- return self.data
-
- def __str__(self):
- if six.PY3:
- return self.__unicode__()
- return self.data.encode('utf-8')
+ params = self._copy_param(other)
+ return params
- def __getstate__(self):
- to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
- 'domain', 'params', '_locale']
- new_dict = self.__dict__.fromkeys(to_copy)
- for attr in to_copy:
- new_dict[attr] = copy.deepcopy(self.__dict__[attr])
+ def _trim_dictionary_parameters(self, dict_param):
+ """Return a dict that only has matching entries in the msgid."""
+ # NOTE(luisg): Here we trim down the dictionary passed as parameters
+ # to avoid carrying a lot of unnecessary weight around in the message
+ # object, for example if someone passes in Message() % locals() but
+ # only some params are used, and additionally we prevent errors for
+ # non-deepcopyable objects by unicoding() them.
+
+ # Look for %(param) keys in msgid;
+ # Skip %% and deal with the case where % is first character on the line
+ keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
+
+ # If we don't find any %(param) keys but have a %s
+ if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
+ # Apparently the full dictionary is the parameter
+ params = self._copy_param(dict_param)
+ else:
+ params = {}
+ for key in keys:
+ params[key] = self._copy_param(dict_param[key])
- return new_dict
+ return params
- def __setstate__(self, state):
- for (k, v) in state.items():
- setattr(self, k, v)
+ def _copy_param(self, param):
+ try:
+ return copy.deepcopy(param)
+ except TypeError:
+ # Fallback to casting to unicode this will handle the
+ # python code-like objects that can't be deep-copied
+ return six.text_type(param)
- # operator overloads
def __add__(self, other):
- copied = copy.deepcopy(self)
- copied._right_extra_msg += other.__str__()
- return copied
+ msg = _('Message objects do not support addition.')
+ raise TypeError(msg)
def __radd__(self, other):
- copied = copy.deepcopy(self)
- copied._left_extra_msg += other.__str__()
- return copied
+ return self.__add__(other)
- def __mod__(self, other):
- # do a format string to catch and raise
- # any possible KeyErrors from missing parameters
- self.data % other
- copied = copy.deepcopy(self)
- return copied._save_parameters(other)
-
- def __mul__(self, other):
- return self.data * other
-
- def __rmul__(self, other):
- return other * self.data
-
- def __getitem__(self, key):
- return self.data[key]
-
- def __getslice__(self, start, end):
- return self.data.__getslice__(start, end)
-
- def __getattribute__(self, name):
- # NOTE(mrodden): handle lossy operations that we can't deal with yet
- # These override the UserString implementation, since UserString
- # uses our __class__ attribute to try and build a new message
- # after running the inner data string through the operation.
- # At that point, we have lost the gettext message id and can just
- # safely resolve to a string instead.
- ops = ['capitalize', 'center', 'decode', 'encode',
- 'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
- 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
- if name in ops:
- return getattr(self.data, name)
- else:
- return _userString.UserString.__getattribute__(self, name)
+ def __str__(self):
+ # NOTE(luisg): Logging in python 2.6 tries to str() log records,
+ # and it expects specifically a UnicodeError in order to proceed.
+ msg = _('Message objects do not support str() because they may '
+ 'contain non-ascii characters. '
+ 'Please use unicode() or translate() instead.')
+ raise UnicodeError(msg)
def get_available_languages(domain):
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
- # this check when the master list updates to >=1.0, and all projects udpate
+ # this check when the master list updates to >=1.0, and update all projects
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
return copy.copy(language_list)
-def get_localized_message(message, user_locale):
- """Gets a localized version of the given message in the given locale."""
+def translate(obj, desired_locale=None):
+ """Gets the translated unicode representation of the given object.
+
+ If the object is not translatable it is returned as-is.
+ If the locale is None the object is translated to the system locale.
+
+ :param obj: the object to translate
+ :param desired_locale: the locale to translate the message to, if None the
+ default system locale will be used
+ :returns: the translated object in unicode, or the original object if
+ it could not be translated
+ """
+ message = obj
+ if not isinstance(message, Message):
+ # If the object to translate is not already translatable,
+ # let's first get its unicode representation
+ message = six.text_type(obj)
if isinstance(message, Message):
- if user_locale:
- message.locale = user_locale
- return six.text_type(message)
- else:
- return message
+ # Even after unicoding() we still need to check if we are
+ # running with translatable unicode before translating
+ return message.translate(desired_locale)
+ return obj
+
+def _translate_args(args, desired_locale=None):
+ """Translates all the translatable elements of the given arguments object.
-class LocaleHandler(logging.Handler):
- """Handler that can have a locale associated to translate Messages.
+ This method is used for translating the translatable values in method
+ arguments which include values of tuples or dictionaries.
+ If the object is not a tuple or a dictionary the object itself is
+ translated if it is translatable.
- A quick example of how to utilize the Message class above.
- LocaleHandler takes a locale and a target logging.Handler object
- to forward LogRecord objects to after translating the internal Message.
+ If the locale is None the object is translated to the system locale.
+
+ :param args: the args to translate
+ :param desired_locale: the locale to translate the args to, if None the
+ default system locale will be used
+ :returns: a new args object with the translated contents of the original
"""
+ if isinstance(args, tuple):
+ return tuple(translate(v, desired_locale) for v in args)
+ if isinstance(args, dict):
+ translated_dict = {}
+ for (k, v) in six.iteritems(args):
+ translated_v = translate(v, desired_locale)
+ translated_dict[k] = translated_v
+ return translated_dict
+ return translate(args, desired_locale)
+
+
+class TranslationHandler(handlers.MemoryHandler):
+ """Handler that translates records before logging them.
+
+ The TranslationHandler takes a locale and a target logging.Handler object
+ to forward LogRecord objects to after translating them. This handler
+ depends on Message objects being logged, instead of regular strings.
+
+ The handler can be configured declaratively in the logging.conf as follows:
+
+ [handlers]
+ keys = translatedlog, translator
- def __init__(self, locale, target):
- """Initialize a LocaleHandler
+ [handler_translatedlog]
+ class = handlers.WatchedFileHandler
+ args = ('/var/log/api-localized.log',)
+ formatter = context
+
+ [handler_translator]
+ class = openstack.common.log.TranslationHandler
+ target = translatedlog
+ args = ('zh_CN',)
+
+ If the specified locale is not available in the system, the handler will
+ log in the default locale.
+ """
+
+ def __init__(self, locale=None, target=None):
+ """Initialize a TranslationHandler
:param locale: locale to use for translating messages
:param target: logging.Handler object to forward
LogRecord objects to after translation
"""
- logging.Handler.__init__(self)
+ # NOTE(luisg): In order to allow this handler to be a wrapper for
+ # other handlers, such as a FileHandler, and still be able to
+ # configure it using logging.conf, this handler has to extend
+ # MemoryHandler because only the MemoryHandlers' logging.conf
+ # parsing is implemented such that it accepts a target handler.
+ handlers.MemoryHandler.__init__(self, capacity=0, target=target)
self.locale = locale
- self.target = target
+
+ def setFormatter(self, fmt):
+ self.target.setFormatter(fmt)
def emit(self, record):
- if isinstance(record.msg, Message):
- # set the locale and resolve to a string
- record.msg.locale = self.locale
+ # We save the message from the original record to restore it
+ # after translation, so other handlers are not affected by this
+ original_msg = record.msg
+ original_args = record.args
+
+ try:
+ self._translate_and_log_record(record)
+ finally:
+ record.msg = original_msg
+ record.args = original_args
+
+ def _translate_and_log_record(self, record):
+ record.msg = translate(record.msg, self.locale)
+
+ # In addition to translating the message, we also need to translate
+ # arguments that were passed to the log method that were not part
+ # of the main message e.g., log.info(_('Some message %s'), this_one))
+ record.args = _translate_args(record.args, self.locale)
self.target.emit(record)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
import inspect
import itertools
import json
-import types
-import xmlrpclib
+try:
+ import xmlrpclib
+except ImportError:
+ # NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3
+ # however the function and object call signatures
+ # remained the same. This whole try/except block should
+ # be removed and replaced with a call to six.moves once
+ # six 1.4.2 is released. See http://bit.ly/1bqrVzu
+ import xmlrpc.client as xmlrpclib
import six
+from neutron.openstack.common import gettextutils
+from neutron.openstack.common import importutils
from neutron.openstack.common import timeutils
+netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
-_simple_types = (types.NoneType, int, basestring, bool, float, long)
+_simple_types = (six.string_types + six.integer_types
+ + (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=level,
max_depth=max_depth)
if isinstance(value, dict):
- return dict((k, recursive(v)) for k, v in value.iteritems())
+ return dict((k, recursive(v)) for k, v in six.iteritems(value))
elif isinstance(value, (list, tuple)):
return [recursive(lv) for lv in value]
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
+ elif isinstance(value, gettextutils.Message):
+ return value.data
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
+ elif netaddr and isinstance(value, netaddr.IPAddress):
+ return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):
return six.text_type(value)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
import logging.config
import logging.handlers
import os
+import re
import sys
import traceback
import six
from six import moves
-from neutron.openstack.common.gettextutils import _ # noqa
+from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import local
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
+
+# NOTE(ldbragst): Let's build a list of regex objects using the list of
+# _SANITIZE_KEYS we already have. This way, we only have to add the new key
+# to the list of _SANITIZE_KEYS and we can generate regular expressions
+# for XML and JSON automatically.
+_SANITIZE_PATTERNS = []
+_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
+ r'(<%(key)s>).*?(</%(key)s>)',
+ r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
+ r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
+
+for key in _SANITIZE_KEYS:
+ for pattern in _FORMAT_PATTERNS:
+ reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+ _SANITIZE_PATTERNS.append(reg_ex)
+
+
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
- '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+ '%(name)s [%(request_id)s %(user_identity)s] '
'%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
'amqp=WARN',
'amqplib=WARN',
'boto=WARN',
- 'keystone=INFO',
'qpid=WARN',
'sqlalchemy=WARN',
'suds=INFO',
return None
+def mask_password(message, secret="***"):
+ """Replace password with 'secret' in message.
+
+ :param message: The string which includes security information.
+ :param secret: value with which to replace passwords.
+ :returns: The unicode value of message with the password fields masked.
+
+ For example:
+
+ >>> mask_password("'adminPass' : 'aaaaa'")
+ "'adminPass' : '***'"
+ >>> mask_password("'admin_pass' : 'aaaaa'")
+ "'admin_pass' : '***'"
+ >>> mask_password('"password" : "aaaaa"')
+ '"password" : "***"'
+ >>> mask_password("'original_password' : 'aaaaa'")
+ "'original_password' : '***'"
+ >>> mask_password("u'original_password' : u'aaaaa'")
+ "u'original_password' : u'***'"
+ """
+ message = six.text_type(message)
+
+ # NOTE(ldbragst): Check to see if anything in message contains any key
+ # specified in _SANITIZE_KEYS, if not then just return the message since
+ # we don't have to mask any passwords.
+ if not any(key in message for key in _SANITIZE_KEYS):
+ return message
+
+ secret = r'\g<1>' + secret + r'\g<2>'
+ for pattern in _SANITIZE_PATTERNS:
+ message = re.sub(pattern, secret, message)
+ return message
+
+
class BaseLoggerAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
elif instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
- extra.update({'instance': instance_extra})
+ extra['instance'] = instance_extra
+
+ extra.setdefault('user_identity', kwargs.pop('user_identity', None))
- extra.update({"project": self.project})
- extra.update({"version": self.version})
+ extra['project'] = self.project
+ extra['version'] = self.version
extra['extra'] = extra.copy()
return msg, kwargs
def formatException(self, ei, strip_newlines=True):
lines = traceback.format_exception(*ei)
if strip_newlines:
- lines = [itertools.ifilter(
+ lines = [moves.filter(
lambda x: x,
line.rstrip().splitlines()) for line in lines]
lines = list(itertools.chain(*lines))
def _create_logging_excepthook(product_name):
- def logging_excepthook(type, value, tb):
+ def logging_excepthook(exc_type, value, tb):
extra = {}
if CONF.verbose:
- extra['exc_info'] = (type, value, tb)
+ extra['exc_info'] = (exc_type, value, tb)
getLogger(product_name).critical(str(value), **extra)
return logging_excepthook
streamlog = ColorHandler()
log_root.addHandler(streamlog)
- elif not CONF.log_file:
+ elif not logpath:
# pass sys.stdout as a positional argument
# python2.6 calls the argument strm, in 2.7 it's stream
streamlog = logging.StreamHandler(sys.stdout)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# under the License.
import logging
-from neutron.openstack.common import notifier
-
from oslo.config import cfg
+from neutron.openstack.common import notifier
+
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
Network-related utilities and helper functions.
"""
-from neutron.openstack.common import log as logging
-
-
-LOG = logging.getLogger(__name__)
+from neutron.openstack.common.py3kcompat import urlutils
def parse_host_port(address, default_port=None):
port = default_port
return (host, None if port is None else int(port))
+
+
+def urlsplit(url, scheme='', allow_fragments=True):
+ """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
+ This function papers over Python issue9374 when needed.
+
+ The parameters are the same as urlparse.urlsplit.
+ """
+ scheme, netloc, path, query, fragment = urlutils.urlsplit(
+ url, scheme, allow_fragments)
+ if allow_fragments and '#' in path:
+ path, fragment = path.split('#', 1)
+ if '?' in path:
+ path, query = path.split('?', 1)
+ return urlutils.SplitResult(scheme, netloc, path, query, fragment)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
import time
from oslo.config import cfg
+import six
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import log as logging
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
- # and without parens.
+ # and without parents.
#
- # In the 'with-parens' case (with kwargs present), this function needs to
+ # In the 'with-parents' case (with kwargs present), this function needs to
# return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
- # In the 'without-parens' case, the original function will be passed
+ # In the 'without-parents' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
cls._periodic_last_run[name] = task._periodic_last_run
+@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
- __metaclass__ = _PeriodicTasksMeta
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
if spacing is not None:
idle_for = min(idle_for, spacing)
- LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+ LOG.debug(_("Running periodic task %(full_task_name)s"),
+ {"full_task_name": full_task_name})
self._periodic_last_run[task_name] = timeutils.utcnow()
try:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
- locals())
+ {"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
System-level utilities and helper functions.
"""
+import logging as stdlib_logging
import os
import random
import shlex
:param cmd: Passed to subprocess.Popen.
:type cmd: string
:param process_input: Send to opened process.
- :type proces_input: string
+ :type process_input: string
:param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
+ :param loglevel: log level for execute commands.
+ :type loglevel: int. (Should be stdlib_logging.DEBUG or
+ stdlib_logging.INFO)
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
shell = kwargs.pop('shell', False)
+ loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
- if run_as_root and os.geteuid() != 0:
+ if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
while attempts > 0:
attempts -= 1
try:
- LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
+ LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
- if _returncode:
- LOG.debug(_('Result was %s') % _returncode)
- if not ignore_exit_code and _returncode not in check_exit_code:
- (stdout, stderr) = result
- raise ProcessExecutionError(exit_code=_returncode,
- stdout=stdout,
- stderr=stderr,
- cmd=' '.join(cmd))
+ LOG.log(loglevel, _('Result was %s') % _returncode)
+ if not ignore_exit_code and _returncode not in check_exit_code:
+ (stdout, stderr) = result
+ raise ProcessExecutionError(exit_code=_returncode,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:
raise
else:
- LOG.debug(_('%r failed. Retrying.'), cmd)
+ LOG.log(loglevel, _('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
--- /dev/null
+#
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Python2/Python3 compatibility layer for OpenStack
+"""
+
+import six
+
+if six.PY3:
+ # python3
+ import urllib.error
+ import urllib.parse
+ import urllib.request
+
+ urlencode = urllib.parse.urlencode
+ urljoin = urllib.parse.urljoin
+ quote = urllib.parse.quote
+ parse_qsl = urllib.parse.parse_qsl
+ unquote = urllib.parse.unquote
+ unquote_plus = urllib.parse.unquote_plus
+ urlparse = urllib.parse.urlparse
+ urlsplit = urllib.parse.urlsplit
+ urlunsplit = urllib.parse.urlunsplit
+ SplitResult = urllib.parse.SplitResult
+
+ urlopen = urllib.request.urlopen
+ URLError = urllib.error.URLError
+ pathname2url = urllib.request.pathname2url
+else:
+ # python2
+ import urllib
+ import urllib2
+ import urlparse
+
+ urlencode = urllib.urlencode
+ quote = urllib.quote
+ unquote = urllib.unquote
+ unquote_plus = urllib.unquote_plus
+
+ parse = urlparse
+ parse_qsl = parse.parse_qsl
+ urljoin = parse.urljoin
+ urlparse = parse.urlparse
+ urlsplit = parse.urlsplit
+ urlunsplit = parse.urlunsplit
+ SplitResult = parse.SplitResult
+
+ urlopen = urllib2.urlopen
+ URLError = urllib2.URLError
+ pathname2url = urllib.pathname2url
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
- default=['neutron.openstack.common.exception',
- 'nova.exception',
+ default=['nova.exception',
'cinder.exception',
'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
- 'upon receiving exception data from an rpc call.'),
+ ' upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
def cleanup():
- """Clean up resoruces in use by implementation.
+ """Clean up resources in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
"""
Shared code between AMQP based openstack.common.rpc implementations.
-The code in this module is shared between the rpc implemenations based on AMQP.
-Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
-AMQP, but is deprecated and predates this code.
+The code in this module is shared between the rpc implementations based on
+AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
+uses AMQP, but is deprecated and predates this code.
"""
import collections
from eventlet import queue
from eventlet import semaphore
from oslo.config import cfg
+import six
+
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+ ack_on_error=True):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
- exchange_name)
+ exchange_name,
+ ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
- self._num_call_waiters_wrn_threshhold = 10
+ self._num_call_waiters_wrn_threshold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
- if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
LOG.warn(_('Number of call waiters is greater than warning '
- 'threshhold: %d. There could be a MulticallProxyWaiter '
- 'leak.') % self._num_call_waiters_wrn_threshhold)
- self._num_call_waiters_wrn_threshhold *= 2
+ 'threshold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshold)
+ self._num_call_waiters_wrn_threshold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
- try:
- msg = {'result': reply, 'failure': failure}
- except TypeError:
- msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
- # Otherwise use the msg_id for backward compatibilty.
+ # Otherwise use the msg_id for backward compatibility.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
for args at some point.
"""
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
+ if isinstance(context, dict):
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in six.iteritems(context)])
+ else:
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in
+ six.iteritems(context.to_dict())])
+
msg.update(context_d)
Allows it to be invoked in a green thread.
"""
- def __init__(self, conf, callback, connection_pool):
+ def __init__(self, conf, callback, connection_pool,
+ wait_for_consumers=False):
"""Initiates CallbackWrapper object.
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
+ :param wait_for_consumers: wait for all green threads to
+ complete and raise the last
+ caught exception, if any.
+
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
+ self.wait_for_consumers = wait_for_consumers
+ self.exc_info = None
+
+ def _wrap(self, message_data, **kwargs):
+ """Wrap the callback invocation to catch exceptions.
+ """
+ try:
+ self.callback(message_data, **kwargs)
+ except Exception:
+ self.exc_info = sys.exc_info()
def __call__(self, message_data):
- self.pool.spawn_n(self.callback, message_data)
+ self.exc_info = None
+ self.pool.spawn_n(self._wrap, message_data)
+
+ if self.wait_for_consumers:
+ self.pool.waitall()
+ if self.exc_info:
+ six.reraise(self.exc_info[1], None, self.exc_info[2])
class ProxyCallback(_ThreadPoolWithWait):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
from neutron.openstack.common import jsonutils
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
+from neutron.openstack.common import versionutils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+_RPC_ENVELOPE_VERSION = '2.0'
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.0) is very simple. It is:
+The current message format (version 2.0) is very simple. It is::
{
'oslo.version': <RPC Envelope Version as a String>,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
class RPCException(Exception):
- message = _("An unknown RPC related exception occurred.")
+ msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
- message = self.message % kwargs
+ message = self.msg_fmt % kwargs
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
- for name, value in kwargs.iteritems():
+ for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
- message = self.message
+ message = self.msg_fmt
super(RPCException, self).__init__(message)
contains all of the relevant info.
"""
- message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+ msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _('Timeout while waiting on RPC response - '
+ msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
class DuplicateMessageError(RPCException):
- message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+ msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
- message = _("Invalid reuse of an RPC connection.")
+ msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException):
- message = _("Specified RPC version, %(version)s, not supported by "
+ msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
- message = _("Specified RPC envelope version, %(version)s, "
+ msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
- message = _("Specified RPC version cap, %(version_cap)s, is too low")
+ msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': [('args', 'new_pass')],
- 'run_instance': [('args', 'admin_password')],
- 'route_message': [('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'password'),
- ('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'admin_password')]}
-
- has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
- has_context_token = '_context_auth_token' in msg_data
- has_token = 'auth_token' in msg_data
-
- if not any([has_method, has_context_token, has_token]):
- return log_func(msg, msg_data)
-
- msg_data = copy.deepcopy(msg_data)
-
- if has_method:
- for arg in SANITIZE.get(msg_data['method'], []):
- try:
- d = msg_data
- for elem in arg[:-1]:
- d = d[elem]
- d[arg[-1]] = '<SANITIZED>'
- except KeyError as e:
- LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
- {'item': arg,
- 'err': e})
-
- if has_context_token:
- msg_data['_context_auth_token'] = '<SANITIZED>'
+ SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
- if has_token:
- msg_data['auth_token'] = '<SANITIZED>'
+ def _fix_passwords(d):
+ """Sanitizes the password fields in the dictionary."""
+ for k in six.iterkeys(d):
+ if k.lower().find('password') != -1:
+ d[k] = '<SANITIZED>'
+ elif k.lower() in SANITIZE:
+ d[k] = '<SANITIZED>'
+ elif isinstance(d[k], dict):
+ _fix_passwords(d[k])
+ return d
- return log_func(msg, msg_data)
+ return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
def serialize_remote_exception(failure_info, log_failure=True):
return outer
+# TODO(sirp): we should deprecate this in favor of
+# using `versionutils.is_compatible` directly
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
- version_parts = version.split('.')
- imp_version_parts = imp_version.split('.')
- if int(version_parts[0]) != int(imp_version_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
- return False
- return True
+ return versionutils.is_compatible(version, imp_version)
def serialize_msg(raw_msg):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
minimum version that supports the new parameter should be specified.
"""
+import six
+
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import serializer as rpc_serializer
:returns: A new set of deserialized args
"""
new_kwargs = dict()
- for argname, arg in kwargs.iteritems():
+ for argname, arg in six.iteritems(kwargs):
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import time
import eventlet
+import six
from neutron.openstack.common.rpc import common as rpc_common
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
- raise failure[0], failure[1], failure[2]
+ six.reraise(failure[0], failure[1], failure[2])
res.append(reply)
# if ending not 'sent'...we might have more data to
# return from the function itself
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
- return iter([None])
+ raise rpc_common.Timeout("No consumers available")
else:
return consumer.call(context, version, method, namespace, args,
timeout)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import itertools
import socket
import ssl
-import sys
import time
import uuid
import kombu.entity
import kombu.messaging
from oslo.config import cfg
+import six
+from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import network_utils
from neutron.openstack.common.rpc import amqp as rpc_amqp
from neutron.openstack.common.rpc import common as rpc_common
+from neutron.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
- help='SSL version to use (valid only if SSL enabled)'),
+ help='SSL version to use (valid only if SSL enabled). '
+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+ 'be available on some distributions'
+ ),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
+ self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
+ def _callback_handler(self, message, callback):
+ """Call callback with deserialized message.
+
+ Messages that are processed without exception are ack'ed.
+
+ If the message processing generates an exception, it will be
+ ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
+ """
+
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+ except Exception:
+ if self.ack_on_error:
+ LOG.exception(_("Failed to process message"
+ " ... skipping it."))
+ message.ack()
+ else:
+ LOG.exception(_("Failed to process message"
+ " ... will requeue."))
+ message.requeue()
+ else:
+ message.ack()
+
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
If kwargs['nowait'] is True, then this call will block until
a message is read.
- Messages will automatically be acked if the callback doesn't
- raise an exception
"""
options = {'consumer_tag': self.tag}
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
+ self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
'virtual_host': self.conf.rabbit_virtual_host,
}
- for sp_key, value in server_params.iteritems():
+ for sp_key, value in six.iteritems(server_params):
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
- ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+ ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+ self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
- if not ssl_params:
- # Just have the default behavior
- return True
- else:
- # Return the extended behavior
- return ssl_params
+ # Return the extended behavior or just have the default behavior
+ return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.error(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
- # NOTE(comstud): Copied from original code. There's
- # really no better recourse because if this was a queue we
- # need to consume on, we have no way to consume anymore.
- sys.exit(1)
+ msg = _('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info
+ LOG.error(msg)
+ raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
- self.consumer_num.next())
+ six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
def _consume():
if info['do_consume']:
- queues_head = self.consumers[:-1]
- queues_tail = self.consumers[-1]
+ queues_head = self.consumers[:-1] # not fanout.
+ queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
+ ack_on_error=ack_on_error,
),
topic, callback)
it = self.iterconsume(limit=limit)
while True:
try:
- it.next()
+ six.next(it)
except StopIteration:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
+ wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
+ ack_on_error=ack_on_error,
)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
import eventlet
import greenlet
from oslo.config import cfg
+import six
+from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- self.reconnect(session)
+ self.connect(session)
+
+ def connect(self, session):
+ """Declare the receiver on connect."""
+ self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
+ self._declare_receiver(session)
+
+ def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
+ # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
+ def get_node_name(self):
+ return self.address.split(';')[0]
+
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
+ self.conf = conf
link_opts = {"exclusive": True}
class TopicPublisher(Publisher):
"""Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
- """init a 'topic' publisher.
+ """Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
- """init a 'fanout' publisher.
+ """Init a 'fanout' publisher.
"""
if conf.qpid_topology_version == 1:
class NotifyPublisher(Publisher):
"""Publisher class for notifications."""
def __init__(self, conf, session, topic):
- """init a 'topic' publisher.
+ """Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
node_opts = {"durable": True}
consumers = self.consumers
self.consumers = {}
- for consumer in consumers.itervalues():
+ for consumer in six.itervalues(consumers):
consumer.reconnect(self.session)
self._register_consumer(consumer)
it = self.iterconsume(limit=limit)
while True:
try:
- it.next()
+ six.next(it)
except StopIteration:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
+ wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import eventlet
import greenlet
from oslo.config import cfg
+import six
+from six import moves
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
- LOG.error("ZeroMQ socket could not be closed.")
+ LOG.error(_("ZeroMQ socket could not be closed."))
self.sock = None
def recv(self, **kwargs):
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
- zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
- self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
- def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
- zmq_type_out=None, in_bind=True, out_bind=True,
- subscribe=None):
+ def register(self, proxy, in_addr, zmq_type_in,
+ in_bind=True, subscribe=None):
LOG.info(_("Registering reactor"))
LOG.info(_("In reactor registered"))
- if not out_addr:
- return
-
- if zmq_type_out not in (zmq.PUSH, zmq.PUB):
- raise RPCException("Bad output socktype")
-
- # Items push out.
- outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
- self.mapping[inq] = outq
- self.mapping[outq] = inq
- self.sockets.append(outq)
-
- LOG.info(_("Out reactor registered"))
-
def consume_in_thread(self):
+ @excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
try:
self.register(consumption_proxy,
consume_in,
- zmq.PULL,
- out_bind=True)
+ zmq.PULL)
except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
h = {}
try:
while True:
- k = i.next()
- h[k] = i.next()
+ k = six.next(i)
+ h[k] = six.next(i)
except StopIteration:
return h
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
- if sock in self.mapping:
- LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
- 'data': data})
- self.mapping[sock].send(data)
- return
proxy = self.proxies[sock]
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
"""Acknowledge that a key.host is alive.
Used internally for updating heartbeats, but may also be used
- publically to acknowledge a system is alive (i.e. rpc message
+ publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
pass
"""Acknowledge that a host.topic is alive.
Used internally for updating heartbeats, but may also be used
- publically to acknowledge a system is alive (i.e. rpc message
+ publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
that it maps directly to a host, thus direct.
"""
def test(self, key):
- if '.' in key:
- return True
- return False
+ return '.' in key
class TopicBinding(Binding):
matches that of a direct exchange.
"""
def test(self, key):
- if '.' not in key:
- return True
- return False
+ return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
- if key.startswith('fanout~'):
- return True
- return False
+ return key.startswith('fanout~')
class StubExchange(Exchange):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
if not redis:
raise ImportError("Failed to import module redis.")
- self.redis = redis.StrictRedis(
+ self.redis = redis.Redis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
+ return key in self.ring0
class RoundRobinRingExchange(RingExchange):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
rpc/dispatcher.py
"""
+import six
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import common as rpc_common
rpc API.
"""
- # The default namespace, which can be overriden in a subclass.
+ # The default namespace, which can be overridden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
- raise rpc_common.RpcVersionCapError(version=self.version_cap)
+ raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
- for argname, arg in kwargs.iteritems():
+ for argname, arg in six.iteritems(kwargs):
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
import abc
+import six
+
+@six.add_metaclass(abc.ABCMeta)
class Serializer(object):
"""Generic (de-)serialization definition base class."""
- __metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
A service enables rpc by listening to queues based on topic and host.
"""
- def __init__(self, host, topic, manager=None):
+ def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
+ self.serializer = serializer
if manager is None:
self.manager = self
else:
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+ self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
"""Generic Node base class for all workers that run on hosts."""
import errno
+import logging as std_logging
import os
import random
import signal
import sys
import time
+try:
+ # Importing just the symbol here because the io module does not
+ # exist in Python 2.6.
+ from io import UnsupportedOperation # noqa
+except ImportError:
+ # Python 2.6
+ UnsupportedOperation = None
+
import eventlet
-import logging as std_logging
+from eventlet import event
from oslo.config import cfg
from neutron.openstack.common import eventlet_backdoor
LOG = logging.getLogger(__name__)
+def _sighup_supported():
+ return hasattr(signal, 'SIGHUP')
+
+
+def _is_daemon():
+ # The process group for a foreground process will match the
+ # process group of the controlling terminal. If those values do
+ # not match, or ioctl() fails on the stdout file handle, we assume
+ # the process is running in the background as a daemon.
+ # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
+ try:
+ is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
+ except OSError as err:
+ if err.errno == errno.ENOTTY:
+ # Assume we are a daemon because there is no terminal.
+ is_daemon = True
+ else:
+ raise
+ except UnsupportedOperation:
+ # Could not get the fileno for stdout, so we must be a daemon.
+ is_daemon = True
+ return is_daemon
+
+
+def _is_sighup_and_daemon(signo):
+ if not (_sighup_supported() and signo == signal.SIGHUP):
+ # Avoid checking if we are a daemon, because the signal isn't
+ # SIGHUP.
+ return False
+ return _is_daemon()
+
+
+def _signo_to_signame(signo):
+ signals = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}
+ if _sighup_supported():
+ signals[signal.SIGHUP] = 'SIGHUP'
+ return signals[signo]
+
+
+def _set_signals_handler(handler):
+ signal.signal(signal.SIGTERM, handler)
+ signal.signal(signal.SIGINT, handler)
+ if _sighup_supported():
+ signal.signal(signal.SIGHUP, handler)
+
+
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
- self._services.wait()
+ self.services.wait()
+
+ def restart(self):
+ """Reload config files and restart service.
+
+ :returns: None
+
+ """
+ cfg.CONF.reload_config_files()
+ self.services.restart()
class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
+ _set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
- def wait(self):
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
+ def handle_signal(self):
+ _set_signals_handler(self._handle_signal)
+
+ def _wait_for_exit_or_signal(self, ready_callback=None):
+ status = None
+ signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
- status = None
try:
+ if ready_callback:
+ ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
+ signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
+ signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
- if rpc:
- rpc.cleanup()
self.stop()
- return status
+ if rpc:
+ try:
+ rpc.cleanup()
+ except Exception:
+ # We're shutting down, so it doesn't matter at this point.
+ LOG.exception(_('Exception during rpc cleanup.'))
+
+ return status, signo
+
+ def wait(self, ready_callback=None):
+ while True:
+ self.handle_signal()
+ status, signo = self._wait_for_exit_or_signal(ready_callback)
+ if not _is_sighup_and_daemon(signo):
+ return status
+ self.restart()
class ServiceWrapper(object):
class ProcessLauncher(object):
- def __init__(self):
+ def __init__(self, wait_interval=0.01):
+ """Constructor.
+
+ :param wait_interval: The interval to sleep for between checks
+ of child process exit.
+ """
self.children = {}
self.sigcaught = None
self.running = True
+ self.wait_interval = wait_interval
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+ self.handle_signal()
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
+ def handle_signal(self):
+ _set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ _set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
sys.exit(1)
- def _child_process(self, service):
+ def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
+ def _sighup(*args):
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ raise SignalExit(signal.SIGHUP)
+
signal.signal(signal.SIGTERM, _sigterm)
+ if _sighup_supported():
+ signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ def _child_wait_for_exit_or_signal(self, launcher):
+ status = 0
+ signo = 0
+
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ try:
+ launcher.wait()
+ except SignalExit as exc:
+ signame = _signo_to_signame(exc.signo)
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ signo = exc.signo
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ launcher.stop()
+
+ return status, signo
+
+ def _child_process(self, service):
+ self._child_process_handle_signal()
+
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
pid = os.fork()
if pid == 0:
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.service)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.service.stop()
+ launcher = self._child_process(wrap.service)
+ while True:
+ self._child_process_handle_signal()
+ status, signo = self._child_wait_for_exit_or_signal(launcher)
+ if not _is_sighup_and_daemon(signo):
+ break
+ launcher.restart()
os._exit(status)
wrap.children.remove(pid)
return wrap
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
-
- LOG.debug(_('Full set of CONF:'))
- CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+ def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
- eventlet.greenthread.sleep(.01)
+ eventlet.greenthread.sleep(self.wait_interval)
continue
-
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary."""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while True:
+ self.handle_signal()
+ self._respawn_children()
+ if self.sigcaught:
+ signame = _signo_to_signame(self.sigcaught)
+ LOG.info(_('Caught %s, stopping children'), signame)
+ if not _is_sighup_and_daemon(self.sigcaught):
+ break
+
+ for pid in self.children:
+ os.kill(pid, signal.SIGHUP)
+ self.running = True
+ self.sigcaught = None
for pid in self.children:
try:
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
+ def reset(self):
+ # NOTE(Fengqian): docs for Event.reset() recommend against using it
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ # Signal that service cleanup is done:
+ if not self._done.ready():
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # Each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ if not self.done.ready():
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ def restart(self):
+ self.stop()
+ self.done = event.Event()
+ for restart_service in self.services:
+ restart_service.reset()
+ self.tg.add_thread(self.run_service, restart_service, self.done)
+
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers:
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import ssl
+
+from oslo.config import cfg
+
+from neutron.openstack.common.gettextutils import _
+
+
+ssl_opts = [
+ cfg.StrOpt('ca_file',
+ default=None,
+ help="CA certificate file to use to verify "
+ "connecting clients"),
+ cfg.StrOpt('cert_file',
+ default=None,
+ help="Certificate file to use when starting "
+ "the server securely"),
+ cfg.StrOpt('key_file',
+ default=None,
+ help="Private key file to use when starting "
+ "the server securely"),
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(ssl_opts, "ssl")
+
+
+def is_enabled():
+ cert_file = CONF.ssl.cert_file
+ key_file = CONF.ssl.key_file
+ ca_file = CONF.ssl.ca_file
+ use_ssl = cert_file or key_file
+
+ if cert_file and not os.path.exists(cert_file):
+ raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
+
+ if ca_file and not os.path.exists(ca_file):
+ raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
+
+ if key_file and not os.path.exists(key_file):
+ raise RuntimeError(_("Unable to find key_file : %s") % key_file)
+
+ if use_ssl and (not cert_file or not key_file):
+ raise RuntimeError(_("When running server in SSL mode, you must "
+ "specify both a cert_file and key_file "
+ "option value in your configuration file"))
+
+ return use_ssl
+
+
+def wrap(sock):
+ ssl_kwargs = {
+ 'server_side': True,
+ 'certfile': CONF.ssl.cert_file,
+ 'keyfile': CONF.ssl.key_file,
+ 'cert_reqs': ssl.CERT_NONE,
+ }
+
+ if CONF.ssl.ca_file:
+ ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
+ ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+ return ssl.wrap_socket(sock, **ssl_kwargs)
+
+
+_SSL_PROTOCOLS = {
+ "tlsv1": ssl.PROTOCOL_TLSv1,
+ "sslv23": ssl.PROTOCOL_SSLv23,
+ "sslv3": ssl.PROTOCOL_SSLv3
+}
+
+try:
+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
+except AttributeError:
+ pass
+
+
+def validate_ssl_version(version):
+ key = version.lower()
+ try:
+ return _SSL_PROTOCOLS[key]
+ except KeyError:
+ raise RuntimeError(_("Invalid SSL version : %s") % version)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# License for the specific language governing permissions and limitations
# under the License.
-from eventlet import greenlet
+import eventlet
from eventlet import greenpool
from eventlet import greenthread
def wait(self):
return self.thread.wait()
+ def link(self, func, *args, **kwargs):
+ self.thread.link(func, *args, **kwargs)
+
class ThreadGroup(object):
"""The point of the ThreadGroup classis to:
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
+ return th
def thread_done(self, thread):
self.threads.remove(thread)
def stop(self):
current = greenthread.getcurrent()
- for x in self.threads:
+
+ # Iterate over a copy of self.threads so thread_done doesn't
+ # modify the list while we're iterating
+ for x in self.threads[:]:
if x is current:
# don't kill the current thread.
continue
for x in self.timers:
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
current = greenthread.getcurrent()
- for x in self.threads:
+
+ # Iterate over a copy of self.threads so thread_done doesn't
+ # modify the list while we're iterating
+ for x in self.threads[:]:
if x is current:
continue
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
import calendar
import datetime
+import time
import iso8601
+import six
# ISO 8601 extended time format with microseconds
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
- raise ValueError(e.message)
+ raise ValueError(six.text_type(e))
except TypeError as e:
- raise ValueError(e.message)
+ raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
- if isinstance(before, basestring):
+ if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
+ else:
+ before = before.replace(tzinfo=None)
+
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
- if isinstance(after, basestring):
+ if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
+ else:
+ after = after.replace(tzinfo=None)
+
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
+ if utcnow.override_time is None:
+ # NOTE(kgriffs): This is several times faster
+ # than going through calendar.timegm(...)
+ return int(time.time())
+
return calendar.timegm(utcnow().timetuple())
utcnow.override_time = None
-def set_time_override(override_time=datetime.datetime.utcnow()):
+def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
+
+ :param override_time: datetime instance or list thereof. If not
+ given, defaults to the current UTC time.
"""
- utcnow.override_time = override_time
+ utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
+ return total_seconds(delta)
+
+
+def total_seconds(delta):
+ """Return the total seconds of datetime.timedelta object.
+
+ Compute total seconds of datetime.timedelta, datetime.timedelta
+ doesn't have method total_seconds in Python2.6, calculate it manually.
+ """
try:
return delta.total_seconds()
except AttributeError:
def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds.
- :params dt: the time
- :params window: minimum seconds to remain to consider the time not soon
+ :param dt: the time
+ :param window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helpers for comparing version strings.
+"""
+
+import functools
+import pkg_resources
+
+from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class deprecated(object):
+ """A decorator to mark callables as deprecated.
+
+ This decorator logs a deprecation message when the callable it decorates is
+ used. The message will include the release where the callable was
+ deprecated, the release where it may be removed and possibly an optional
+ replacement.
+
+ Examples:
+
+ 1. Specifying the required deprecated release
+
+ >>> @deprecated(as_of=deprecated.ICEHOUSE)
+ ... def a(): pass
+
+ 2. Specifying a replacement:
+
+ >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()')
+ ... def b(): pass
+
+ 3. Specifying the release where the functionality may be removed:
+
+ >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1)
+ ... def c(): pass
+
+ """
+
+ FOLSOM = 'F'
+ GRIZZLY = 'G'
+ HAVANA = 'H'
+ ICEHOUSE = 'I'
+
+ _RELEASES = {
+ 'F': 'Folsom',
+ 'G': 'Grizzly',
+ 'H': 'Havana',
+ 'I': 'Icehouse',
+ }
+
+ _deprecated_msg_with_alternative = _(
+ '%(what)s is deprecated as of %(as_of)s in favor of '
+ '%(in_favor_of)s and may be removed in %(remove_in)s.')
+
+ _deprecated_msg_no_alternative = _(
+ '%(what)s is deprecated as of %(as_of)s and may be '
+ 'removed in %(remove_in)s. It will not be superseded.')
+
+ def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None):
+ """Initialize decorator
+
+ :param as_of: the release deprecating the callable. Constants
+ are define in this class for convenience.
+ :param in_favor_of: the replacement for the callable (optional)
+ :param remove_in: an integer specifying how many releases to wait
+ before removing (default: 2)
+ :param what: name of the thing being deprecated (default: the
+ callable's name)
+
+ """
+ self.as_of = as_of
+ self.in_favor_of = in_favor_of
+ self.remove_in = remove_in
+ self.what = what
+
+ def __call__(self, func):
+ if not self.what:
+ self.what = func.__name__ + '()'
+
+ @functools.wraps(func)
+ def wrapped(*args, **kwargs):
+ msg, details = self._build_message()
+ LOG.deprecated(msg, details)
+ return func(*args, **kwargs)
+ return wrapped
+
+ def _get_safe_to_remove_release(self, release):
+ # TODO(dstanek): this method will have to be reimplemented once
+ # when we get to the X release because once we get to the Y
+ # release, what is Y+2?
+ new_release = chr(ord(release) + self.remove_in)
+ if new_release in self._RELEASES:
+ return self._RELEASES[new_release]
+ else:
+ return new_release
+
+ def _build_message(self):
+ details = dict(what=self.what,
+ as_of=self._RELEASES[self.as_of],
+ remove_in=self._get_safe_to_remove_release(self.as_of))
+
+ if self.in_favor_of:
+ details['in_favor_of'] = self.in_favor_of
+ msg = self._deprecated_msg_with_alternative
+ else:
+ msg = self._deprecated_msg_no_alternative
+ return msg, details
+
+
+def is_compatible(requested_version, current_version, same_major=True):
+ """Determine whether `requested_version` is satisfied by
+ `current_version`; in other words, `current_version` is >=
+ `requested_version`.
+
+ :param requested_version: version to check for compatibility
+ :param current_version: version to check against
+ :param same_major: if True, the major version must be identical between
+ `requested_version` and `current_version`. This is used when a
+ major-version difference indicates incompatibility between the two
+ versions. Since this is the common-case in practice, the default is
+ True.
+ :returns: True if compatible, False if not
+ """
+ requested_parts = pkg_resources.parse_version(requested_version)
+ current_parts = pkg_resources.parse_version(current_version)
+
+ if same_major and (requested_parts[0] != current_parts[0]):
+ return False
+
+ return current_parts >= requested_parts
self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
expected_res)
- @mock.patch('neutron.openstack.common.gettextutils.get_localized_message')
+ @mock.patch('neutron.openstack.common.gettextutils.translate')
def test_unmapped_neutron_error_localized(self, mock_translation):
gettextutils.install('blaa', lazy=True)
msg_translation = 'Translated error'
self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
expected_res)
- @mock.patch('neutron.openstack.common.gettextutils.get_localized_message')
+ @mock.patch('neutron.openstack.common.gettextutils.translate')
def test_mapped_neutron_error_localized(self, mock_translation):
gettextutils.install('blaa', lazy=True)
msg_translation = 'Translated error'
if not match:
language = req.best_match_language()
msg = _('The resource could not be found.')
- msg = gettextutils.get_localized_message(msg, language)
+ msg = gettextutils.translate(msg, language)
return webob.exc.HTTPNotFound(explanation=msg)
app = match['controller']
return app
module=periodic_task
module=policy
module=processutils
+module=py3kcompat
module=rpc
module=service
+module=sslutils
module=rootwrap
module=threadgroup
module=timeutils
module=uuidutils
+module=versionutils
# The base module to hold the copy of openstack.common
base=neutron