]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update RPC code from oslo
authorGary Kotton <gkotton@vmware.com>
Mon, 30 Dec 2013 08:48:15 +0000 (00:48 -0800)
committerGary Kotton <gkotton@vmware.com>
Wed, 15 Jan 2014 12:26:57 +0000 (04:26 -0800)
The common RPC code has been updated to include the following:
    8575d87af49ea276341908f83c8c51db13afca44
    8b2b0b743e84ceed7841cf470afed6a5da8e1d07
    23f602940c64ba408d77ceb8f5ba0f67ee4a18ef
    6d0a6c3083218cdac52758a8b6aac6b03402c658
    7cac1ac1bd9df36d4e5183afac3b643df10b1d4d
    8159efddabb09dd9b7c99963ff7c9de0a6c62b62

Updated to include the following in modules in openstack-common.conf:
py3kcompat, sslutils, and versionutils.

The update also includes imports from the RPC code

Change-Id: I84c5b8e2b17da0018dd69ecb354d123a609afe98

42 files changed:
neutron/api/v2/resource.py
neutron/api/versions.py
neutron/debug/commands.py
neutron/openstack/common/eventlet_backdoor.py
neutron/openstack/common/excutils.py
neutron/openstack/common/fileutils.py
neutron/openstack/common/gettextutils.py
neutron/openstack/common/importutils.py
neutron/openstack/common/jsonutils.py
neutron/openstack/common/local.py
neutron/openstack/common/log.py
neutron/openstack/common/log_handler.py
neutron/openstack/common/loopingcall.py
neutron/openstack/common/network_utils.py
neutron/openstack/common/periodic_task.py
neutron/openstack/common/processutils.py
neutron/openstack/common/py3kcompat/__init__.py [new file with mode: 0644]
neutron/openstack/common/py3kcompat/urlutils.py [new file with mode: 0644]
neutron/openstack/common/rpc/__init__.py
neutron/openstack/common/rpc/amqp.py
neutron/openstack/common/rpc/common.py
neutron/openstack/common/rpc/dispatcher.py
neutron/openstack/common/rpc/impl_fake.py
neutron/openstack/common/rpc/impl_kombu.py
neutron/openstack/common/rpc/impl_qpid.py
neutron/openstack/common/rpc/impl_zmq.py
neutron/openstack/common/rpc/matchmaker.py
neutron/openstack/common/rpc/matchmaker_redis.py
neutron/openstack/common/rpc/matchmaker_ring.py
neutron/openstack/common/rpc/proxy.py
neutron/openstack/common/rpc/serializer.py
neutron/openstack/common/rpc/service.py
neutron/openstack/common/rpc/zmq_receiver.py [changed mode: 0755->0644]
neutron/openstack/common/service.py
neutron/openstack/common/sslutils.py [new file with mode: 0644]
neutron/openstack/common/threadgroup.py
neutron/openstack/common/timeutils.py
neutron/openstack/common/uuidutils.py
neutron/openstack/common/versionutils.py [new file with mode: 0644]
neutron/tests/unit/test_api_v2_resource.py
neutron/wsgi.py
openstack-common.conf

index 459390724bf53293ee1ac21ff2bb703731c95342..a6e9765e387382f1441581b3e5ba199ba721f23b 100644 (file)
@@ -149,7 +149,7 @@ def translate(translatable, locale):
     :returns: the translated object, or the object as-is if it
               was not translated
     """
-    localize = gettextutils.get_localized_message
+    localize = gettextutils.translate
     if isinstance(translatable, exceptions.NeutronException):
         translatable.msg = localize(translatable.msg, locale)
     elif isinstance(translatable, webob.exc.HTTPError):
index 5707b34873ffab134a1e09a5c6cc9b6879dae550..671a56ec3059cfa99222f41da1e54b63d7227de1 100644 (file)
@@ -45,7 +45,7 @@ class Versions(object):
         if req.path != '/':
             language = req.best_match_language()
             msg = _('Unknown API version specified')
-            msg = gettextutils.get_localized_message(msg, language)
+            msg = gettextutils.translate(msg, language)
             return webob.exc.HTTPNotFound(explanation=msg)
 
         builder = versions_view.get_view_builder(req)
index 829ed22cc209003f542c9f4bfcf95129472450c1..59f820391e962fb057bafb999fc7185566590fb3 100644 (file)
@@ -30,8 +30,8 @@ class ProbeCommand(NeutronCommand):
         return self.app.debug_agent
 
     def run(self, parsed_args):
-        self.log.debug('run(%s)' % parsed_args)
-        self.app.stdout.write(_('Unimplemented commands') + '\n')
+        self.log.debug('run(%s)', parsed_args)
+        self.log.info(_('Unimplemented commands'))
 
 
 class CreateProbe(ProbeCommand):
@@ -55,7 +55,7 @@ class CreateProbe(ProbeCommand):
         debug_agent = self.get_debug_agent()
         port = debug_agent.create_probe(parsed_args.id,
                                         parsed_args.device_owner)
-        self.app.stdout.write(_('Probe created : %s ') % port.id + '\n')
+        self.log.info(_('Probe created : %s '), port.id)
 
 
 class DeleteProbe(ProbeCommand):
@@ -74,7 +74,7 @@ class DeleteProbe(ProbeCommand):
         self.log.debug('run(%s)' % parsed_args)
         debug_agent = self.get_debug_agent()
         debug_agent.delete_probe(parsed_args.id)
-        self.app.stdout.write(_('Probe %s deleted') % parsed_args.id + '\n')
+        self.log.info(_('Probe %s deleted'), parsed_args.id)
 
 
 class ListProbe(NeutronCommand, lister.Lister):
@@ -105,7 +105,7 @@ class ClearProbe(ProbeCommand):
         self.log.debug('run(%s)' % parsed_args)
         debug_agent = self.get_debug_agent()
         debug_agent.clear_probe()
-        self.app.stdout.write(_('All Probes deleted ') + '\n')
+        self.log.info(_('All Probes deleted '))
 
 
 class ExecProbe(ProbeCommand):
index 57b89ae914ebdfcd4ad041135d749319b5f4de77..b55b0ceb3b6daa028447e7543e95775533932952 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2012 OpenStack Foundation.
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
 
 from __future__ import print_function
 
+import errno
 import gc
+import os
 import pprint
+import socket
 import sys
 import traceback
 
@@ -28,14 +29,34 @@ import eventlet.backdoor
 import greenlet
 from oslo.config import cfg
 
+from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import log as logging
+
+help_for_backdoor_port = (
+    "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+    "in listening on a random tcp port number; <port> results in listening "
+    "on the specified port number (and not enabling backdoor if that port "
+    "is in use); and <start>:<end> results in listening on the smallest "
+    "unused port number within the specified range of port numbers.  The "
+    "chosen port is displayed in the service's log file.")
 eventlet_backdoor_opts = [
-    cfg.IntOpt('backdoor_port',
+    cfg.StrOpt('backdoor_port',
                default=None,
-               help='port for eventlet backdoor to listen')
+               help="Enable eventlet backdoor.  %s" % help_for_backdoor_port)
 ]
 
 CONF = cfg.CONF
 CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+    def __init__(self, port_range, help_msg, ex):
+        msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+               '%(help)s' %
+               {'range': port_range, 'ex': ex, 'help': help_msg})
+        super(EventletBackdoorConfigValueError, self).__init__(msg)
+        self.port_range = port_range
 
 
 def _dont_use_this():
@@ -43,7 +64,7 @@ def _dont_use_this():
 
 
 def _find_objects(t):
-    return filter(lambda o: isinstance(o, t), gc.get_objects())
+    return [o for o in gc.get_objects() if isinstance(o, t)]
 
 
 def _print_greenthreads():
@@ -60,6 +81,33 @@ def _print_nativethreads():
         print()
 
 
+def _parse_port_range(port_range):
+    if ':' not in port_range:
+        start, end = port_range, port_range
+    else:
+        start, end = port_range.split(':', 1)
+    try:
+        start, end = int(start), int(end)
+        if end < start:
+            raise ValueError
+        return start, end
+    except ValueError as ex:
+        raise EventletBackdoorConfigValueError(port_range, ex,
+                                               help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+    try_port = start_port
+    while True:
+        try:
+            return listen_func((host, try_port))
+        except socket.error as exc:
+            if (exc.errno != errno.EADDRINUSE or
+               try_port >= end_port):
+                raise
+            try_port += 1
+
+
 def initialize_if_enabled():
     backdoor_locals = {
         'exit': _dont_use_this,      # So we don't exit the entire process
@@ -72,6 +120,8 @@ def initialize_if_enabled():
     if CONF.backdoor_port is None:
         return None
 
+    start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
     # NOTE(johannes): The standard sys.displayhook will print the value of
     # the last expression and set it to __builtin__._, which overwrites
     # the __builtin__._ that gettext sets. Let's switch to using pprint
@@ -82,8 +132,13 @@ def initialize_if_enabled():
             pprint.pprint(val)
     sys.displayhook = displayhook
 
-    sock = eventlet.listen(('localhost', CONF.backdoor_port))
+    sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+    # In the case of backdoor port being zero, a port number is assigned by
+    # listen().  In any case, pull the port number out here.
     port = sock.getsockname()[1]
+    LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+             {'port': port, 'pid': os.getpid()})
     eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
                      locals=backdoor_locals)
     return port
index 676baaeae47b4190f06c94252fbb39643ad25a6e..b7c762890e012b3cdf02a005c6be73acc8a056d9 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # Copyright 2012, Red Hat, Inc.
 #
 Exception related utilities.
 """
 
-import contextlib
 import logging
 import sys
+import time
 import traceback
 
+import six
+
 from neutron.openstack.common.gettextutils import _
 
 
-@contextlib.contextmanager
-def save_and_reraise_exception():
+class save_and_reraise_exception(object):
     """Save current exception, run some code and then re-raise.
 
     In some cases the exception context can be cleared, resulting in None
@@ -40,12 +39,61 @@ def save_and_reraise_exception():
     To work around this, we save the exception state, run handler code, and
     then re-raise the original exception. If another exception occurs, the
     saved exception is logged and the new exception is re-raised.
+
+    In some cases the caller may not want to re-raise the exception, and
+    for those circumstances this context provides a reraise flag that
+    can be used to suppress the exception.  For example::
+
+      except Exception:
+          with save_and_reraise_exception() as ctxt:
+              decide_if_need_reraise()
+              if not should_be_reraised:
+                  ctxt.reraise = False
     """
-    type_, value, tb = sys.exc_info()
-    try:
-        yield
-    except Exception:
-        logging.error(_('Original exception being dropped: %s'),
-                      traceback.format_exception(type_, value, tb))
-        raise
-    raise type_, value, tb
+    def __init__(self):
+        self.reraise = True
+
+    def __enter__(self):
+        self.type_, self.value, self.tb, = sys.exc_info()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if exc_type is not None:
+            logging.error(_('Original exception being dropped: %s'),
+                          traceback.format_exception(self.type_,
+                                                     self.value,
+                                                     self.tb))
+            return False
+        if self.reraise:
+            six.reraise(self.type_, self.value, self.tb)
+
+
+def forever_retry_uncaught_exceptions(infunc):
+    def inner_func(*args, **kwargs):
+        last_log_time = 0
+        last_exc_message = None
+        exc_count = 0
+        while True:
+            try:
+                return infunc(*args, **kwargs)
+            except Exception as exc:
+                this_exc_message = six.u(str(exc))
+                if this_exc_message == last_exc_message:
+                    exc_count += 1
+                else:
+                    exc_count = 1
+                # Do not log any more frequently than once a minute unless
+                # the exception message changes
+                cur_time = int(time.time())
+                if (cur_time - last_log_time > 60 or
+                        this_exc_message != last_exc_message):
+                    logging.exception(
+                        _('Unexpected exception occurred %d time(s)... '
+                          'retrying.') % exc_count)
+                    last_log_time = cur_time
+                    last_exc_message = this_exc_message
+                    exc_count = 0
+                # This should be a very rare event. In case it isn't, do
+                # a sleep.
+                time.sleep(1)
+    return inner_func
index 48aaee895a09d95c4a9e5a3b639fbec0ace0f58e..704af09623e218960bbdb0fe0877410db77da01a 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -19,6 +17,7 @@
 import contextlib
 import errno
 import os
+import tempfile
 
 from neutron.openstack.common import excutils
 from neutron.openstack.common.gettextutils import _
@@ -69,33 +68,34 @@ def read_cached_file(filename, force_reload=False):
     return (reloaded, cache_info['data'])
 
 
-def delete_if_exists(path):
+def delete_if_exists(path, remove=os.unlink):
     """Delete a file, but ignore file not found error.
 
     :param path: File to delete
+    :param remove: Optional function to remove passed path
     """
 
     try:
-        os.unlink(path)
+        remove(path)
     except OSError as e:
-        if e.errno == errno.ENOENT:
-            return
-        else:
+        if e.errno != errno.ENOENT:
             raise
 
 
 @contextlib.contextmanager
-def remove_path_on_error(path):
+def remove_path_on_error(path, remove=delete_if_exists):
     """Protect code that wants to operate on PATH atomically.
     Any exception will cause PATH to be removed.
 
     :param path: File to work with
+    :param remove: Optional function to remove passed path
     """
+
     try:
         yield
     except Exception:
         with excutils.save_and_reraise_exception():
-            delete_if_exists(path)
+            remove(path)
 
 
 def file_open(*args, **kwargs):
@@ -108,3 +108,30 @@ def file_open(*args, **kwargs):
     state at all (for unit tests)
     """
     return file(*args, **kwargs)
+
+
+def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
+    """Create temporary file or use existing file.
+
+    This util is needed for creating temporary file with
+    specified content, suffix and prefix. If path is not None,
+    it will be used for writing content. If the path doesn't
+    exist it'll be created.
+
+    :param content: content for temporary file.
+    :param path: same as parameter 'dir' for mkstemp
+    :param suffix: same as parameter 'suffix' for mkstemp
+    :param prefix: same as parameter 'prefix' for mkstemp
+
+    For example: it can be used in database tests for creating
+    configuration files.
+    """
+    if path:
+        ensure_tree(path)
+
+    (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
+    try:
+        os.write(fd, content)
+    finally:
+        os.close(fd)
+    return path
index bc1afd17ac21e159ba10a0b68ebf4594f113f76a..b060c417d604925bd99d8091218e9b5a052f3833 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 # Copyright 2013 IBM Corp.
 # All Rights Reserved.
@@ -26,13 +24,10 @@ Usual usage in an openstack.common module:
 
 import copy
 import gettext
-import logging
+import locale
+from logging import handlers
 import os
 import re
-try:
-    import UserString as _userString
-except ImportError:
-    import collections as _userString
 
 from babel import localedata
 import six
@@ -58,7 +53,7 @@ def enable_lazy():
 
 def _(msg):
     if USE_LAZY:
-        return Message(msg, 'neutron')
+        return Message(msg, domain='neutron')
     else:
         if six.PY3:
             return _t.gettext(msg)
@@ -90,11 +85,6 @@ def install(domain, lazy=False):
         # messages in OpenStack. We override the standard _() function
         # and % (format string) operation to build Message objects that can
         # later be translated when we have more information.
-        #
-        # Also included below is an example LocaleHandler that translates
-        # Messages to an associated locale, effectively allowing many logs,
-        # each with their own locale.
-
         def _lazy_gettext(msg):
             """Create and return a Message object.
 
@@ -105,7 +95,7 @@ def install(domain, lazy=False):
             Message encapsulates a string so that we can translate
             it later when needed.
             """
-            return Message(msg, domain)
+            return Message(msg, domain=domain)
 
         from six import moves
         moves.builtins.__dict__['_'] = _lazy_gettext
@@ -120,182 +110,158 @@ def install(domain, lazy=False):
                             unicode=True)
 
 
-class Message(_userString.UserString, object):
-    """Class used to encapsulate translatable messages."""
-    def __init__(self, msg, domain):
-        # _msg is the gettext msgid and should never change
-        self._msg = msg
-        self._left_extra_msg = ''
-        self._right_extra_msg = ''
-        self._locale = None
-        self.params = None
-        self.domain = domain
-
-    @property
-    def data(self):
-        # NOTE(mrodden): this should always resolve to a unicode string
-        # that best represents the state of the message currently
-
-        localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
-        if self.locale:
-            lang = gettext.translation(self.domain,
-                                       localedir=localedir,
-                                       languages=[self.locale],
-                                       fallback=True)
-        else:
-            # use system locale for translations
-            lang = gettext.translation(self.domain,
-                                       localedir=localedir,
-                                       fallback=True)
+class Message(six.text_type):
+    """A Message object is a unicode object that can be translated.
+
+    Translation of Message is done explicitly using the translate() method.
+    For all non-translation intents and purposes, a Message is simply unicode,
+    and can be treated as such.
+    """
 
+    def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
+        """Create a new Message object.
+
+        In order for translation to work gettext requires a message ID, this
+        msgid will be used as the base unicode text. It is also possible
+        for the msgid and the base unicode text to be different by passing
+        the msgtext parameter.
+        """
+        # If the base msgtext is not given, we use the default translation
+        # of the msgid (which is in English) just in case the system locale is
+        # not English, so that the base text will be in that locale by default.
+        if not msgtext:
+            msgtext = Message._translate_msgid(msgid, domain)
+        # We want to initialize the parent unicode with the actual object that
+        # would have been plain unicode if 'Message' was not enabled.
+        msg = super(Message, cls).__new__(cls, msgtext)
+        msg.msgid = msgid
+        msg.domain = domain
+        msg.params = params
+        return msg
+
+    def translate(self, desired_locale=None):
+        """Translate this message to the desired locale.
+
+        :param desired_locale: The desired locale to translate the message to,
+                               if no locale is provided the message will be
+                               translated to the system's default locale.
+
+        :returns: the translated message in unicode
+        """
+
+        translated_message = Message._translate_msgid(self.msgid,
+                                                      self.domain,
+                                                      desired_locale)
+        if self.params is None:
+            # No need for more translation
+            return translated_message
+
+        # This Message object may have been formatted with one or more
+        # Message objects as substitution arguments, given either as a single
+        # argument, part of a tuple, or as one or more values in a dictionary.
+        # When translating this Message we need to translate those Messages too
+        translated_params = _translate_args(self.params, desired_locale)
+
+        translated_message = translated_message % translated_params
+
+        return translated_message
+
+    @staticmethod
+    def _translate_msgid(msgid, domain, desired_locale=None):
+        if not desired_locale:
+            system_locale = locale.getdefaultlocale()
+            # If the system locale is not available to the runtime use English
+            if not system_locale[0]:
+                desired_locale = 'en_US'
+            else:
+                desired_locale = system_locale[0]
+
+        locale_dir = os.environ.get(domain.upper() + '_LOCALEDIR')
+        lang = gettext.translation(domain,
+                                   localedir=locale_dir,
+                                   languages=[desired_locale],
+                                   fallback=True)
         if six.PY3:
-            ugettext = lang.gettext
-        else:
-            ugettext = lang.ugettext
-
-        full_msg = (self._left_extra_msg +
-                    ugettext(self._msg) +
-                    self._right_extra_msg)
-
-        if self.params is not None:
-            full_msg = full_msg % self.params
-
-        return six.text_type(full_msg)
-
-    @property
-    def locale(self):
-        return self._locale
-
-    @locale.setter
-    def locale(self, value):
-        self._locale = value
-        if not self.params:
-            return
-
-        # This Message object may have been constructed with one or more
-        # Message objects as substitution parameters, given as a single
-        # Message, or a tuple or Map containing some, so when setting the
-        # locale for this Message we need to set it for those Messages too.
-        if isinstance(self.params, Message):
-            self.params.locale = value
-            return
-        if isinstance(self.params, tuple):
-            for param in self.params:
-                if isinstance(param, Message):
-                    param.locale = value
-            return
-        if isinstance(self.params, dict):
-            for param in self.params.values():
-                if isinstance(param, Message):
-                    param.locale = value
-
-    def _save_dictionary_parameter(self, dict_param):
-        full_msg = self.data
-        # look for %(blah) fields in string;
-        # ignore %% and deal with the
-        # case where % is first character on the line
-        keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg)
-
-        # if we don't find any %(blah) blocks but have a %s
-        if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
-            # apparently the full dictionary is the parameter
-            params = copy.deepcopy(dict_param)
+            translator = lang.gettext
         else:
-            params = {}
-            for key in keys:
-                try:
-                    params[key] = copy.deepcopy(dict_param[key])
-                except TypeError:
-                    # cast uncopyable thing to unicode string
-                    params[key] = six.text_type(dict_param[key])
+            translator = lang.ugettext
 
-        return params
+        translated_message = translator(msgid)
+        return translated_message
 
-    def _save_parameters(self, other):
-        # we check for None later to see if
-        # we actually have parameters to inject,
-        # so encapsulate if our parameter is actually None
+    def __mod__(self, other):
+        # When we mod a Message we want the actual operation to be performed
+        # by the parent class (i.e. unicode()), the only thing  we do here is
+        # save the original msgid and the parameters in case of a translation
+        unicode_mod = super(Message, self).__mod__(other)
+        modded = Message(self.msgid,
+                         msgtext=unicode_mod,
+                         params=self._sanitize_mod_params(other),
+                         domain=self.domain)
+        return modded
+
+    def _sanitize_mod_params(self, other):
+        """Sanitize the object being modded with this Message.
+
+        - Add support for modding 'None' so translation supports it
+        - Trim the modded object, which can be a large dictionary, to only
+        those keys that would actually be used in a translation
+        - Snapshot the object being modded, in case the message is
+        translated, it will be used as it was when the Message was created
+        """
         if other is None:
-            self.params = (other, )
+            params = (other,)
         elif isinstance(other, dict):
-            self.params = self._save_dictionary_parameter(other)
+            params = self._trim_dictionary_parameters(other)
         else:
-            # fallback to casting to unicode,
-            # this will handle the problematic python code-like
-            # objects that cannot be deep-copied
-            try:
-                self.params = copy.deepcopy(other)
-            except TypeError:
-                self.params = six.text_type(other)
-
-        return self
-
-    # overrides to be more string-like
-    def __unicode__(self):
-        return self.data
-
-    def __str__(self):
-        if six.PY3:
-            return self.__unicode__()
-        return self.data.encode('utf-8')
+            params = self._copy_param(other)
+        return params
 
-    def __getstate__(self):
-        to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
-                   'domain', 'params', '_locale']
-        new_dict = self.__dict__.fromkeys(to_copy)
-        for attr in to_copy:
-            new_dict[attr] = copy.deepcopy(self.__dict__[attr])
+    def _trim_dictionary_parameters(self, dict_param):
+        """Return a dict that only has matching entries in the msgid."""
+        # NOTE(luisg): Here we trim down the dictionary passed as parameters
+        # to avoid carrying a lot of unnecessary weight around in the message
+        # object, for example if someone passes in Message() % locals() but
+        # only some params are used, and additionally we prevent errors for
+        # non-deepcopyable objects by unicoding() them.
+
+        # Look for %(param) keys in msgid;
+        # Skip %% and deal with the case where % is first character on the line
+        keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
+
+        # If we don't find any %(param) keys but have a %s
+        if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
+            # Apparently the full dictionary is the parameter
+            params = self._copy_param(dict_param)
+        else:
+            params = {}
+            for key in keys:
+                params[key] = self._copy_param(dict_param[key])
 
-        return new_dict
+        return params
 
-    def __setstate__(self, state):
-        for (k, v) in state.items():
-            setattr(self, k, v)
+    def _copy_param(self, param):
+        try:
+            return copy.deepcopy(param)
+        except TypeError:
+            # Fallback to casting to unicode this will handle the
+            # python code-like objects that can't be deep-copied
+            return six.text_type(param)
 
-    # operator overloads
     def __add__(self, other):
-        copied = copy.deepcopy(self)
-        copied._right_extra_msg += other.__str__()
-        return copied
+        msg = _('Message objects do not support addition.')
+        raise TypeError(msg)
 
     def __radd__(self, other):
-        copied = copy.deepcopy(self)
-        copied._left_extra_msg += other.__str__()
-        return copied
+        return self.__add__(other)
 
-    def __mod__(self, other):
-        # do a format string to catch and raise
-        # any possible KeyErrors from missing parameters
-        self.data % other
-        copied = copy.deepcopy(self)
-        return copied._save_parameters(other)
-
-    def __mul__(self, other):
-        return self.data * other
-
-    def __rmul__(self, other):
-        return other * self.data
-
-    def __getitem__(self, key):
-        return self.data[key]
-
-    def __getslice__(self, start, end):
-        return self.data.__getslice__(start, end)
-
-    def __getattribute__(self, name):
-        # NOTE(mrodden): handle lossy operations that we can't deal with yet
-        # These override the UserString implementation, since UserString
-        # uses our __class__ attribute to try and build a new message
-        # after running the inner data string through the operation.
-        # At that point, we have lost the gettext message id and can just
-        # safely resolve to a string instead.
-        ops = ['capitalize', 'center', 'decode', 'encode',
-               'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
-               'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
-        if name in ops:
-            return getattr(self.data, name)
-        else:
-            return _userString.UserString.__getattribute__(self, name)
+    def __str__(self):
+        # NOTE(luisg): Logging in python 2.6 tries to str() log records,
+        # and it expects specifically a UnicodeError in order to proceed.
+        msg = _('Message objects do not support str() because they may '
+                'contain non-ascii characters. '
+                'Please use unicode() or translate() instead.')
+        raise UnicodeError(msg)
 
 
 def get_available_languages(domain):
@@ -317,7 +283,7 @@ def get_available_languages(domain):
     # NOTE(luisg): Babel <1.0 used a function called list(), which was
     # renamed to locale_identifiers() in >=1.0, the requirements master list
     # requires >=0.9.6, uncapped, so defensively work with both. We can remove
-    # this check when the master list updates to >=1.0, and all projects udpate
+    # this check when the master list updates to >=1.0, and update all projects
     list_identifiers = (getattr(localedata, 'list', None) or
                         getattr(localedata, 'locale_identifiers'))
     locale_identifiers = list_identifiers()
@@ -328,38 +294,118 @@ def get_available_languages(domain):
     return copy.copy(language_list)
 
 
-def get_localized_message(message, user_locale):
-    """Gets a localized version of the given message in the given locale."""
+def translate(obj, desired_locale=None):
+    """Gets the translated unicode representation of the given object.
+
+    If the object is not translatable it is returned as-is.
+    If the locale is None the object is translated to the system locale.
+
+    :param obj: the object to translate
+    :param desired_locale: the locale to translate the message to, if None the
+                           default system locale will be used
+    :returns: the translated object in unicode, or the original object if
+              it could not be translated
+    """
+    message = obj
+    if not isinstance(message, Message):
+        # If the object to translate is not already translatable,
+        # let's first get its unicode representation
+        message = six.text_type(obj)
     if isinstance(message, Message):
-        if user_locale:
-            message.locale = user_locale
-        return six.text_type(message)
-    else:
-        return message
+        # Even after unicoding() we still need to check if we are
+        # running with translatable unicode before translating
+        return message.translate(desired_locale)
+    return obj
+
 
+def _translate_args(args, desired_locale=None):
+    """Translates all the translatable elements of the given arguments object.
 
-class LocaleHandler(logging.Handler):
-    """Handler that can have a locale associated to translate Messages.
+    This method is used for translating the translatable values in method
+    arguments which include values of tuples or dictionaries.
+    If the object is not a tuple or a dictionary the object itself is
+    translated if it is translatable.
 
-    A quick example of how to utilize the Message class above.
-    LocaleHandler takes a locale and a target logging.Handler object
-    to forward LogRecord objects to after translating the internal Message.
+    If the locale is None the object is translated to the system locale.
+
+    :param args: the args to translate
+    :param desired_locale: the locale to translate the args to, if None the
+                           default system locale will be used
+    :returns: a new args object with the translated contents of the original
     """
+    if isinstance(args, tuple):
+        return tuple(translate(v, desired_locale) for v in args)
+    if isinstance(args, dict):
+        translated_dict = {}
+        for (k, v) in six.iteritems(args):
+            translated_v = translate(v, desired_locale)
+            translated_dict[k] = translated_v
+        return translated_dict
+    return translate(args, desired_locale)
+
+
+class TranslationHandler(handlers.MemoryHandler):
+    """Handler that translates records before logging them.
+
+    The TranslationHandler takes a locale and a target logging.Handler object
+    to forward LogRecord objects to after translating them. This handler
+    depends on Message objects being logged, instead of regular strings.
+
+    The handler can be configured declaratively in the logging.conf as follows:
+
+        [handlers]
+        keys = translatedlog, translator
 
-    def __init__(self, locale, target):
-        """Initialize a LocaleHandler
+        [handler_translatedlog]
+        class = handlers.WatchedFileHandler
+        args = ('/var/log/api-localized.log',)
+        formatter = context
+
+        [handler_translator]
+        class = openstack.common.log.TranslationHandler
+        target = translatedlog
+        args = ('zh_CN',)
+
+    If the specified locale is not available in the system, the handler will
+    log in the default locale.
+    """
+
+    def __init__(self, locale=None, target=None):
+        """Initialize a TranslationHandler
 
         :param locale: locale to use for translating messages
         :param target: logging.Handler object to forward
                        LogRecord objects to after translation
         """
-        logging.Handler.__init__(self)
+        # NOTE(luisg): In order to allow this handler to be a wrapper for
+        # other handlers, such as a FileHandler, and still be able to
+        # configure it using logging.conf, this handler has to extend
+        # MemoryHandler because only the MemoryHandlers' logging.conf
+        # parsing is implemented such that it accepts a target handler.
+        handlers.MemoryHandler.__init__(self, capacity=0, target=target)
         self.locale = locale
-        self.target = target
+
+    def setFormatter(self, fmt):
+        self.target.setFormatter(fmt)
 
     def emit(self, record):
-        if isinstance(record.msg, Message):
-            # set the locale and resolve to a string
-            record.msg.locale = self.locale
+        # We save the message from the original record to restore it
+        # after translation, so other handlers are not affected by this
+        original_msg = record.msg
+        original_args = record.args
+
+        try:
+            self._translate_and_log_record(record)
+        finally:
+            record.msg = original_msg
+            record.args = original_args
+
+    def _translate_and_log_record(self, record):
+        record.msg = translate(record.msg, self.locale)
+
+        # In addition to translating the message, we also need to translate
+        # arguments that were passed to the log method that were not part
+        # of the main message e.g., log.info(_('Some message %s'), this_one))
+        record.args = _translate_args(record.args, self.locale)
 
         self.target.emit(record)
index 7a303f93f21d42c831883bbe4cf7c38aea842bf7..4fd9ae2bc26b280706fd0a499ed4b99cf1afca24 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
index 2833876792c97d90399e123007d3aadd12c55e44..d4a764b02dd26780c17721d7227ac437a263cb23 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
@@ -38,13 +36,23 @@ import functools
 import inspect
 import itertools
 import json
-import types
-import xmlrpclib
+try:
+    import xmlrpclib
+except ImportError:
+    # NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3
+    #                 however the function and object call signatures
+    #                 remained the same. This whole try/except block should
+    #                 be removed and replaced with a call to six.moves once
+    #                 six 1.4.2 is released. See http://bit.ly/1bqrVzu
+    import xmlrpc.client as xmlrpclib
 
 import six
 
+from neutron.openstack.common import gettextutils
+from neutron.openstack.common import importutils
 from neutron.openstack.common import timeutils
 
+netaddr = importutils.try_import("netaddr")
 
 _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.isfunction, inspect.isgeneratorfunction,
@@ -52,7 +60,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.iscode, inspect.isbuiltin, inspect.isroutine,
                      inspect.isabstract]
 
-_simple_types = (types.NoneType, int, basestring, bool, float, long)
+_simple_types = (six.string_types + six.integer_types
+                 + (type(None), bool, float))
 
 
 def to_primitive(value, convert_instances=False, convert_datetime=True,
@@ -117,7 +126,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
                                       level=level,
                                       max_depth=max_depth)
         if isinstance(value, dict):
-            return dict((k, recursive(v)) for k, v in value.iteritems())
+            return dict((k, recursive(v)) for k, v in six.iteritems(value))
         elif isinstance(value, (list, tuple)):
             return [recursive(lv) for lv in value]
 
@@ -129,6 +138,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
 
         if convert_datetime and isinstance(value, datetime.datetime):
             return timeutils.strtime(value)
+        elif isinstance(value, gettextutils.Message):
+            return value.data
         elif hasattr(value, 'iteritems'):
             return recursive(dict(value.iteritems()), level=level + 1)
         elif hasattr(value, '__iter__'):
@@ -137,6 +148,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             # Likely an instance of something. Watch for cycles.
             # Ignore class member vars.
             return recursive(value.__dict__, level=level + 1)
+        elif netaddr and isinstance(value, netaddr.IPAddress):
+            return six.text_type(value)
         else:
             if any(test(value) for test in _nasty_type_tests):
                 return six.text_type(value)
index e82f17d0f3fd307724a7f1b3ffb0454cd449219e..0819d5b97cbb0ce9e998a71ccc5d8cbb9de320ee 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
index 854212bff10a382b96b0319eadb3167c7701590e..427921c4ab3118fe9401f41ce1650c90638c7df9 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
@@ -35,6 +33,7 @@ import logging
 import logging.config
 import logging.handlers
 import os
+import re
 import sys
 import traceback
 
@@ -42,7 +41,7 @@ from oslo.config import cfg
 import six
 from six import moves
 
-from neutron.openstack.common.gettextutils import _  # noqa
+from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import local
@@ -50,6 +49,24 @@ from neutron.openstack.common import local
 
 _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
 
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
+
+# NOTE(ldbragst): Let's build a list of regex objects using the list of
+# _SANITIZE_KEYS we already have. This way, we only have to add the new key
+# to the list of _SANITIZE_KEYS and we can generate regular expressions
+# for XML and JSON automatically.
+_SANITIZE_PATTERNS = []
+_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
+                    r'(<%(key)s>).*?(</%(key)s>)',
+                    r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
+                    r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
+
+for key in _SANITIZE_KEYS:
+    for pattern in _FORMAT_PATTERNS:
+        reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+        _SANITIZE_PATTERNS.append(reg_ex)
+
+
 common_cli_opts = [
     cfg.BoolOpt('debug',
                 short='d',
@@ -113,7 +130,7 @@ generic_log_opts = [
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
                default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
-                       '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+                       '%(name)s [%(request_id)s %(user_identity)s] '
                        '%(instance)s%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
@@ -132,7 +149,6 @@ log_opts = [
                     'amqp=WARN',
                     'amqplib=WARN',
                     'boto=WARN',
-                    'keystone=INFO',
                     'qpid=WARN',
                     'sqlalchemy=WARN',
                     'suds=INFO',
@@ -215,6 +231,40 @@ def _get_log_file_path(binary=None):
     return None
 
 
+def mask_password(message, secret="***"):
+    """Replace password with 'secret' in message.
+
+    :param message: The string which includes security information.
+    :param secret: value with which to replace passwords.
+    :returns: The unicode value of message with the password fields masked.
+
+    For example:
+
+    >>> mask_password("'adminPass' : 'aaaaa'")
+    "'adminPass' : '***'"
+    >>> mask_password("'admin_pass' : 'aaaaa'")
+    "'admin_pass' : '***'"
+    >>> mask_password('"password" : "aaaaa"')
+    '"password" : "***"'
+    >>> mask_password("'original_password' : 'aaaaa'")
+    "'original_password' : '***'"
+    >>> mask_password("u'original_password' :   u'aaaaa'")
+    "u'original_password' :   u'***'"
+    """
+    message = six.text_type(message)
+
+    # NOTE(ldbragst): Check to see if anything in message contains any key
+    # specified in _SANITIZE_KEYS, if not then just return the message since
+    # we don't have to mask any passwords.
+    if not any(key in message for key in _SANITIZE_KEYS):
+        return message
+
+    secret = r'\g<1>' + secret + r'\g<2>'
+    for pattern in _SANITIZE_PATTERNS:
+        message = re.sub(pattern, secret, message)
+    return message
+
+
 class BaseLoggerAdapter(logging.LoggerAdapter):
 
     def audit(self, msg, *args, **kwargs):
@@ -282,10 +332,12 @@ class ContextAdapter(BaseLoggerAdapter):
         elif instance_uuid:
             instance_extra = (CONF.instance_uuid_format
                               % {'uuid': instance_uuid})
-        extra.update({'instance': instance_extra})
+        extra['instance'] = instance_extra
+
+        extra.setdefault('user_identity', kwargs.pop('user_identity', None))
 
-        extra.update({"project": self.project})
-        extra.update({"version": self.version})
+        extra['project'] = self.project
+        extra['version'] = self.version
         extra['extra'] = extra.copy()
         return msg, kwargs
 
@@ -299,7 +351,7 @@ class JSONFormatter(logging.Formatter):
     def formatException(self, ei, strip_newlines=True):
         lines = traceback.format_exception(*ei)
         if strip_newlines:
-            lines = [itertools.ifilter(
+            lines = [moves.filter(
                 lambda x: x,
                 line.rstrip().splitlines()) for line in lines]
             lines = list(itertools.chain(*lines))
@@ -337,10 +389,10 @@ class JSONFormatter(logging.Formatter):
 
 
 def _create_logging_excepthook(product_name):
-    def logging_excepthook(type, value, tb):
+    def logging_excepthook(exc_type, value, tb):
         extra = {}
         if CONF.verbose:
-            extra['exc_info'] = (type, value, tb)
+            extra['exc_info'] = (exc_type, value, tb)
         getLogger(product_name).critical(str(value), **extra)
     return logging_excepthook
 
@@ -425,7 +477,7 @@ def _setup_logging_from_conf():
         streamlog = ColorHandler()
         log_root.addHandler(streamlog)
 
-    elif not CONF.log_file:
+    elif not logpath:
         # pass sys.stdout as a positional argument
         # python2.6 calls the argument strm, in 2.7 it's stream
         streamlog = logging.StreamHandler(sys.stdout)
index a8586debcd66c0e0bead0b9b5c3e051571f2d71a..8156f37e196ceec34fff1f429aeee0a803f2f141 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2013 IBM Corp.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    under the License.
 import logging
 
-from neutron.openstack.common import notifier
-
 from oslo.config import cfg
 
+from neutron.openstack.common import notifier
+
 
 class PublishErrorsHandler(logging.Handler):
     def emit(self, record):
index f82dc7f3ae273a088ccecc8631d777155d0a35e6..e588c8309b839093dd306403e39598b86ad4e766 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
index 6ddb441d746715ce8f449ede5def7481b31ce975..29061519ab4e0d8f613fdafbb8b860a001edc5ee 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 OpenStack Foundation.
 # All Rights Reserved.
 #
 Network-related utilities and helper functions.
 """
 
-from neutron.openstack.common import log as logging
-
-
-LOG = logging.getLogger(__name__)
+from neutron.openstack.common.py3kcompat import urlutils
 
 
 def parse_host_port(address, default_port=None):
@@ -67,3 +62,18 @@ def parse_host_port(address, default_port=None):
             port = default_port
 
     return (host, None if port is None else int(port))
+
+
+def urlsplit(url, scheme='', allow_fragments=True):
+    """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
+    This function papers over Python issue9374 when needed.
+
+    The parameters are the same as urlparse.urlsplit.
+    """
+    scheme, netloc, path, query, fragment = urlutils.urlsplit(
+        url, scheme, allow_fragments)
+    if allow_fragments and '#' in path:
+        path, fragment = path.split('#', 1)
+    if '?' in path:
+        path, query = path.split('?', 1)
+    return urlutils.SplitResult(scheme, netloc, path, query, fragment)
index 02d0048c6ce965056db98d34d3d7e0dc4826192b..cf2985e13aa5b94954729535d6e8ae3863abb052 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -17,6 +15,7 @@ import datetime
 import time
 
 from oslo.config import cfg
+import six
 
 from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import log as logging
@@ -83,14 +82,14 @@ def periodic_task(*args, **kwargs):
         return f
 
     # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
-    # and without parens.
+    # and without parents.
     #
-    # In the 'with-parens' case (with kwargs present), this function needs to
+    # In the 'with-parents' case (with kwargs present), this function needs to
     # return a decorator function since the interpreter will invoke it like:
     #
     #   periodic_task(*args, **kwargs)(f)
     #
-    # In the 'without-parens' case, the original function will be passed
+    # In the 'without-parents' case, the original function will be passed
     # in as the first argument, like:
     #
     #   periodic_task(f)
@@ -150,8 +149,8 @@ class _PeriodicTasksMeta(type):
                 cls._periodic_last_run[name] = task._periodic_last_run
 
 
+@six.add_metaclass(_PeriodicTasksMeta)
 class PeriodicTasks(object):
-    __metaclass__ = _PeriodicTasksMeta
 
     def run_periodic_tasks(self, context, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
@@ -173,7 +172,8 @@ class PeriodicTasks(object):
             if spacing is not None:
                 idle_for = min(idle_for, spacing)
 
-            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+            LOG.debug(_("Running periodic task %(full_task_name)s"),
+                      {"full_task_name": full_task_name})
             self._periodic_last_run[task_name] = timeutils.utcnow()
 
             try:
@@ -182,7 +182,7 @@ class PeriodicTasks(object):
                 if raise_on_error:
                     raise
                 LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
-                              locals())
+                              {"full_task_name": full_task_name, "e": e})
             time.sleep(0)
 
         return idle_for
index 73c214e8b2441c3330b2cd990e30cd8dfd820aaa..039b9ad467a2e790b0b329407e5145faac7decff 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -19,6 +17,7 @@
 System-level utilities and helper functions.
 """
 
+import logging as stdlib_logging
 import os
 import random
 import shlex
@@ -81,7 +80,7 @@ def execute(*cmd, **kwargs):
     :param cmd:             Passed to subprocess.Popen.
     :type cmd:              string
     :param process_input:   Send to opened process.
-    :type proces_input:     string
+    :type process_input:    string
     :param check_exit_code: Single bool, int, or list of allowed exit
                             codes.  Defaults to [0].  Raise
                             :class:`ProcessExecutionError` unless
@@ -102,6 +101,9 @@ def execute(*cmd, **kwargs):
     :param shell:           whether or not there should be a shell used to
                             execute this command. Defaults to false.
     :type shell:            boolean
+    :param loglevel:        log level for execute commands.
+    :type loglevel:         int.  (Should be stdlib_logging.DEBUG or
+                            stdlib_logging.INFO)
     :returns:               (stdout, stderr) from process execution
     :raises:                :class:`UnknownArgumentError` on
                             receiving unknown arguments
@@ -116,6 +118,7 @@ def execute(*cmd, **kwargs):
     run_as_root = kwargs.pop('run_as_root', False)
     root_helper = kwargs.pop('root_helper', '')
     shell = kwargs.pop('shell', False)
+    loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG)
 
     if isinstance(check_exit_code, bool):
         ignore_exit_code = not check_exit_code
@@ -127,7 +130,7 @@ def execute(*cmd, **kwargs):
         raise UnknownArgumentError(_('Got unknown keyword args '
                                      'to utils.execute: %r') % kwargs)
 
-    if run_as_root and os.geteuid() != 0:
+    if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
         if not root_helper:
             raise NoRootWrapSpecified(
                 message=('Command requested root, but did not specify a root '
@@ -139,7 +142,7 @@ def execute(*cmd, **kwargs):
     while attempts > 0:
         attempts -= 1
         try:
-            LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
+            LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd))
             _PIPE = subprocess.PIPE  # pylint: disable=E1101
 
             if os.name == 'nt':
@@ -163,20 +166,19 @@ def execute(*cmd, **kwargs):
                 result = obj.communicate()
             obj.stdin.close()  # pylint: disable=E1101
             _returncode = obj.returncode  # pylint: disable=E1101
-            if _returncode:
-                LOG.debug(_('Result was %s') % _returncode)
-                if not ignore_exit_code and _returncode not in check_exit_code:
-                    (stdout, stderr) = result
-                    raise ProcessExecutionError(exit_code=_returncode,
-                                                stdout=stdout,
-                                                stderr=stderr,
-                                                cmd=' '.join(cmd))
+            LOG.log(loglevel, _('Result was %s') % _returncode)
+            if not ignore_exit_code and _returncode not in check_exit_code:
+                (stdout, stderr) = result
+                raise ProcessExecutionError(exit_code=_returncode,
+                                            stdout=stdout,
+                                            stderr=stderr,
+                                            cmd=' '.join(cmd))
             return result
         except ProcessExecutionError:
             if not attempts:
                 raise
             else:
-                LOG.debug(_('%r failed. Retrying.'), cmd)
+                LOG.log(loglevel, _('%r failed. Retrying.'), cmd)
                 if delay_on_retry:
                     greenthread.sleep(random.randint(20, 200) / 100.0)
         finally:
diff --git a/neutron/openstack/common/py3kcompat/__init__.py b/neutron/openstack/common/py3kcompat/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/openstack/common/py3kcompat/urlutils.py b/neutron/openstack/common/py3kcompat/urlutils.py
new file mode 100644 (file)
index 0000000..6200271
--- /dev/null
@@ -0,0 +1,65 @@
+#
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""
+Python2/Python3 compatibility layer for OpenStack
+"""
+
+import six
+
+if six.PY3:
+    # python3
+    import urllib.error
+    import urllib.parse
+    import urllib.request
+
+    urlencode = urllib.parse.urlencode
+    urljoin = urllib.parse.urljoin
+    quote = urllib.parse.quote
+    parse_qsl = urllib.parse.parse_qsl
+    unquote = urllib.parse.unquote
+    unquote_plus = urllib.parse.unquote_plus
+    urlparse = urllib.parse.urlparse
+    urlsplit = urllib.parse.urlsplit
+    urlunsplit = urllib.parse.urlunsplit
+    SplitResult = urllib.parse.SplitResult
+
+    urlopen = urllib.request.urlopen
+    URLError = urllib.error.URLError
+    pathname2url = urllib.request.pathname2url
+else:
+    # python2
+    import urllib
+    import urllib2
+    import urlparse
+
+    urlencode = urllib.urlencode
+    quote = urllib.quote
+    unquote = urllib.unquote
+    unquote_plus = urllib.unquote_plus
+
+    parse = urlparse
+    parse_qsl = parse.parse_qsl
+    urljoin = parse.urljoin
+    urlparse = parse.urlparse
+    urlsplit = parse.urlsplit
+    urlunsplit = parse.urlunsplit
+    SplitResult = parse.SplitResult
+
+    urlopen = urllib2.urlopen
+    URLError = urllib2.URLError
+    pathname2url = urllib.pathname2url
index e20d0b288983f4ef0dbc5fd20965e2805c07c18c..046b35272b5afa5c346ab7d835604f362891fbbc 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -56,13 +54,12 @@ rpc_opts = [
                help='Seconds to wait before a cast expires (TTL). '
                     'Only supported by impl_zmq.'),
     cfg.ListOpt('allowed_rpc_exception_modules',
-                default=['neutron.openstack.common.exception',
-                         'nova.exception',
+                default=['nova.exception',
                          'cinder.exception',
                          'exceptions',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
-                     'upon receiving exception data from an rpc call.'),
+                     ' upon receiving exception data from an rpc call.'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
@@ -228,7 +225,7 @@ def notify(context, topic, msg, envelope=False):
 
 
 def cleanup():
-    """Clean up resoruces in use by implementation.
+    """Clean up resources in use by implementation.
 
     Clean up any resources that have been allocated by the RPC implementation.
     This is typically open connections to a messaging service.  This function
index f055696c5c575d84d3f74a7ff291be5a5e629135..b74fad059850558d1ea95e7eaf6561473f34ebb2 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -20,9 +18,9 @@
 """
 Shared code between AMQP based openstack.common.rpc implementations.
 
-The code in this module is shared between the rpc implemenations based on AMQP.
-Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
-AMQP, but is deprecated and predates this code.
+The code in this module is shared between the rpc implementations based on
+AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
+uses AMQP, but is deprecated and predates this code.
 """
 
 import collections
@@ -35,6 +33,8 @@ from eventlet import pools
 from eventlet import queue
 from eventlet import semaphore
 from oslo.config import cfg
+import six
+
 
 from neutron.openstack.common import excutils
 from neutron.openstack.common.gettextutils import _
@@ -165,11 +165,13 @@ class ConnectionContext(rpc_common.Connection):
     def create_worker(self, topic, proxy, pool_name):
         self.connection.create_worker(topic, proxy, pool_name)
 
-    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+                           ack_on_error=True):
         self.connection.join_consumer_pool(callback,
                                            pool_name,
                                            topic,
-                                           exchange_name)
+                                           exchange_name,
+                                           ack_on_error)
 
     def consume_in_thread(self):
         self.connection.consume_in_thread()
@@ -187,7 +189,7 @@ class ReplyProxy(ConnectionContext):
     def __init__(self, conf, connection_pool):
         self._call_waiters = {}
         self._num_call_waiters = 0
-        self._num_call_waiters_wrn_threshhold = 10
+        self._num_call_waiters_wrn_threshold = 10
         self._reply_q = 'reply_' + uuid.uuid4().hex
         super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
         self.declare_direct_consumer(self._reply_q, self._process_data)
@@ -206,11 +208,11 @@ class ReplyProxy(ConnectionContext):
 
     def add_call_waiter(self, waiter, msg_id):
         self._num_call_waiters += 1
-        if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+        if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
             LOG.warn(_('Number of call waiters is greater than warning '
-                       'threshhold: %d. There could be a MulticallProxyWaiter '
-                       'leak.') % self._num_call_waiters_wrn_threshhold)
-            self._num_call_waiters_wrn_threshhold *= 2
+                       'threshold: %d. There could be a MulticallProxyWaiter '
+                       'leak.') % self._num_call_waiters_wrn_threshold)
+            self._num_call_waiters_wrn_threshold *= 2
         self._call_waiters[msg_id] = waiter
 
     def del_call_waiter(self, msg_id):
@@ -233,18 +235,13 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
             failure = rpc_common.serialize_remote_exception(failure,
                                                             log_failure)
 
-        try:
-            msg = {'result': reply, 'failure': failure}
-        except TypeError:
-            msg = {'result': dict((k, repr(v))
-                   for k, v in reply.__dict__.iteritems()),
-                   'failure': failure}
+        msg = {'result': reply, 'failure': failure}
         if ending:
             msg['ending'] = True
         _add_unique_id(msg)
         # If a reply_q exists, add the msg_id to the reply and pass the
         # reply_q to direct_send() to use it as the response queue.
-        # Otherwise use the msg_id for backward compatibilty.
+        # Otherwise use the msg_id for backward compatibility.
         if reply_q:
             msg['_msg_id'] = msg_id
             conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
@@ -303,8 +300,14 @@ def pack_context(msg, context):
     for args at some point.
 
     """
-    context_d = dict([('_context_%s' % key, value)
-                      for (key, value) in context.to_dict().iteritems()])
+    if isinstance(context, dict):
+        context_d = dict([('_context_%s' % key, value)
+                          for (key, value) in six.iteritems(context)])
+    else:
+        context_d = dict([('_context_%s' % key, value)
+                          for (key, value) in
+                          six.iteritems(context.to_dict())])
+
     msg.update(context_d)
 
 
@@ -362,22 +365,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
     Allows it to be invoked in a green thread.
     """
 
-    def __init__(self, conf, callback, connection_pool):
+    def __init__(self, conf, callback, connection_pool,
+                 wait_for_consumers=False):
         """Initiates CallbackWrapper object.
 
         :param conf: cfg.CONF instance
         :param callback: a callable (probably a function)
         :param connection_pool: connection pool as returned by
                                 get_connection_pool()
+        :param wait_for_consumers: wait for all green threads to
+                                   complete and raise the last
+                                   caught exception, if any.
+
         """
         super(CallbackWrapper, self).__init__(
             conf=conf,
             connection_pool=connection_pool,
         )
         self.callback = callback
+        self.wait_for_consumers = wait_for_consumers
+        self.exc_info = None
+
+    def _wrap(self, message_data, **kwargs):
+        """Wrap the callback invocation to catch exceptions.
+        """
+        try:
+            self.callback(message_data, **kwargs)
+        except Exception:
+            self.exc_info = sys.exc_info()
 
     def __call__(self, message_data):
-        self.pool.spawn_n(self.callback, message_data)
+        self.exc_info = None
+        self.pool.spawn_n(self._wrap, message_data)
+
+        if self.wait_for_consumers:
+            self.pool.waitall()
+            if self.exc_info:
+                six.reraise(self.exc_info[1], None, self.exc_info[2])
 
 
 class ProxyCallback(_ThreadPoolWithWait):
index d240597afbdcf2e2c3c5350eb5698f761b70c930..aa772fb95bd23cebedac41137b31aa1f6c6a668a 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -29,12 +27,14 @@ from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import local
 from neutron.openstack.common import log as logging
+from neutron.openstack.common import versionutils
 
 
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+_RPC_ENVELOPE_VERSION = '2.0'
 '''RPC Envelope Version.
 
 This version number applies to the top level structure of messages sent out.
@@ -47,7 +47,7 @@ This version number applies to the message envelope that is used in the
 serialization done inside the rpc layer.  See serialize_msg() and
 deserialize_msg().
 
-The current message format (version 2.0) is very simple.  It is:
+The current message format (version 2.0) is very simple.  It is::
 
     {
         'oslo.version': <RPC Envelope Version as a String>,
@@ -65,7 +65,6 @@ We will JSON encode the application message payload.  The message envelope,
 which includes the JSON encoded application message body, will be passed down
 to the messaging libraries as a dict.
 '''
-_RPC_ENVELOPE_VERSION = '2.0'
 
 _VERSION_KEY = 'oslo.version'
 _MESSAGE_KEY = 'oslo.message'
@@ -74,23 +73,23 @@ _REMOTE_POSTFIX = '_Remote'
 
 
 class RPCException(Exception):
-    message = _("An unknown RPC related exception occurred.")
+    msg_fmt = _("An unknown RPC related exception occurred.")
 
     def __init__(self, message=None, **kwargs):
         self.kwargs = kwargs
 
         if not message:
             try:
-                message = self.message % kwargs
+                message = self.msg_fmt % kwargs
 
             except Exception:
                 # kwargs doesn't match a variable in the message
                 # log the issue and the kwargs
                 LOG.exception(_('Exception in string format operation'))
-                for name, value in kwargs.iteritems():
+                for name, value in six.iteritems(kwargs):
                     LOG.error("%s: %s" % (name, value))
                 # at least get the core message out if something happened
-                message = self.message
+                message = self.msg_fmt
 
         super(RPCException, self).__init__(message)
 
@@ -104,7 +103,7 @@ class RemoteError(RPCException):
     contains all of the relevant info.
 
     """
-    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+    msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
 
     def __init__(self, exc_type=None, value=None, traceback=None):
         self.exc_type = exc_type
@@ -121,7 +120,7 @@ class Timeout(RPCException):
     This exception is raised if the rpc_response_timeout is reached while
     waiting for a response from the remote side.
     """
-    message = _('Timeout while waiting on RPC response - '
+    msg_fmt = _('Timeout while waiting on RPC response - '
                 'topic: "%(topic)s", RPC method: "%(method)s" '
                 'info: "%(info)s"')
 
@@ -144,25 +143,25 @@ class Timeout(RPCException):
 
 
 class DuplicateMessageError(RPCException):
-    message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+    msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
 
 
 class InvalidRPCConnectionReuse(RPCException):
-    message = _("Invalid reuse of an RPC connection.")
+    msg_fmt = _("Invalid reuse of an RPC connection.")
 
 
 class UnsupportedRpcVersion(RPCException):
-    message = _("Specified RPC version, %(version)s, not supported by "
+    msg_fmt = _("Specified RPC version, %(version)s, not supported by "
                 "this endpoint.")
 
 
 class UnsupportedRpcEnvelopeVersion(RPCException):
-    message = _("Specified RPC envelope version, %(version)s, "
+    msg_fmt = _("Specified RPC envelope version, %(version)s, "
                 "not supported by this endpoint.")
 
 
 class RpcVersionCapError(RPCException):
-    message = _("Specified RPC version cap, %(version_cap)s, is too low")
+    msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
 
 
 class Connection(object):
@@ -261,41 +260,20 @@ class Connection(object):
 
 def _safe_log(log_func, msg, msg_data):
     """Sanitizes the msg_data field before logging."""
-    SANITIZE = {'set_admin_password': [('args', 'new_pass')],
-                'run_instance': [('args', 'admin_password')],
-                'route_message': [('args', 'message', 'args', 'method_info',
-                                   'method_kwargs', 'password'),
-                                  ('args', 'message', 'args', 'method_info',
-                                   'method_kwargs', 'admin_password')]}
-
-    has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
-    has_context_token = '_context_auth_token' in msg_data
-    has_token = 'auth_token' in msg_data
-
-    if not any([has_method, has_context_token, has_token]):
-        return log_func(msg, msg_data)
-
-    msg_data = copy.deepcopy(msg_data)
-
-    if has_method:
-        for arg in SANITIZE.get(msg_data['method'], []):
-            try:
-                d = msg_data
-                for elem in arg[:-1]:
-                    d = d[elem]
-                d[arg[-1]] = '<SANITIZED>'
-            except KeyError as e:
-                LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
-                         {'item': arg,
-                          'err': e})
-
-    if has_context_token:
-        msg_data['_context_auth_token'] = '<SANITIZED>'
+    SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
 
-    if has_token:
-        msg_data['auth_token'] = '<SANITIZED>'
+    def _fix_passwords(d):
+        """Sanitizes the password fields in the dictionary."""
+        for k in six.iterkeys(d):
+            if k.lower().find('password') != -1:
+                d[k] = '<SANITIZED>'
+            elif k.lower() in SANITIZE:
+                d[k] = '<SANITIZED>'
+            elif isinstance(d[k], dict):
+                _fix_passwords(d[k])
+        return d
 
-    return log_func(msg, msg_data)
+    return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
 
 
 def serialize_remote_exception(failure_info, log_failure=True):
@@ -462,19 +440,15 @@ def client_exceptions(*exceptions):
     return outer
 
 
+# TODO(sirp): we should deprecate this in favor of
+# using `versionutils.is_compatible` directly
 def version_is_compatible(imp_version, version):
     """Determine whether versions are compatible.
 
     :param imp_version: The version implemented
     :param version: The version requested by an incoming message.
     """
-    version_parts = version.split('.')
-    imp_version_parts = imp_version.split('.')
-    if int(version_parts[0]) != int(imp_version_parts[0]):  # Major
-        return False
-    if int(version_parts[1]) > int(imp_version_parts[1]):  # Minor
-        return False
-    return True
+    return versionutils.is_compatible(version, imp_version)
 
 
 def serialize_msg(raw_msg):
index d734e32a6b39f3c4833b495bd4429433aa8e820a..f266469d771e6ea0b640847719982972cde54c9b 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -83,6 +81,8 @@ On the client side, the same changes should be made as in example 1.  The
 minimum version that supports the new parameter should be specified.
 """
 
+import six
+
 from neutron.openstack.common.rpc import common as rpc_common
 from neutron.openstack.common.rpc import serializer as rpc_serializer
 
@@ -121,7 +121,7 @@ class RpcDispatcher(object):
         :returns: A new set of deserialized args
         """
         new_kwargs = dict()
-        for argname, arg in kwargs.iteritems():
+        for argname, arg in six.iteritems(kwargs):
             new_kwargs[argname] = self.serializer.deserialize_entity(context,
                                                                      arg)
         return new_kwargs
index e24db8a6c7ae67f01e3c7880cdc8fc277c18549d..f6eea936d2489cd887e7b96bae0f020b20052c65 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -26,6 +24,7 @@ import json
 import time
 
 import eventlet
+import six
 
 from neutron.openstack.common.rpc import common as rpc_common
 
@@ -69,7 +68,7 @@ class Consumer(object):
                 # Caller might have called ctxt.reply() manually
                 for (reply, failure) in ctxt._response:
                     if failure:
-                        raise failure[0], failure[1], failure[2]
+                        six.reraise(failure[0], failure[1], failure[2])
                     res.append(reply)
                 # if ending not 'sent'...we might have more data to
                 # return from the function itself
@@ -146,7 +145,7 @@ def multicall(conf, context, topic, msg, timeout=None):
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
-        return iter([None])
+        raise rpc_common.Timeout("No consumers available")
     else:
         return consumer.call(context, version, method, namespace, args,
                              timeout)
index fd73564d558fd73fd186a23b4d99996ca67840ad..b0cb70f9eae1568254d0542be506c23d710ad463 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -18,7 +16,6 @@ import functools
 import itertools
 import socket
 import ssl
-import sys
 import time
 import uuid
 
@@ -29,16 +26,22 @@ import kombu.connection
 import kombu.entity
 import kombu.messaging
 from oslo.config import cfg
+import six
 
+from neutron.openstack.common import excutils
 from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import network_utils
 from neutron.openstack.common.rpc import amqp as rpc_amqp
 from neutron.openstack.common.rpc import common as rpc_common
+from neutron.openstack.common import sslutils
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
                default='',
-               help='SSL version to use (valid only if SSL enabled)'),
+               help='SSL version to use (valid only if SSL enabled). '
+                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+                    'be available on some distributions'
+               ),
     cfg.StrOpt('kombu_ssl_keyfile',
                default='',
                help='SSL key file (valid only if SSL enabled)'),
@@ -126,6 +129,7 @@ class ConsumerBase(object):
         self.tag = str(tag)
         self.kwargs = kwargs
         self.queue = None
+        self.ack_on_error = kwargs.get('ack_on_error', True)
         self.reconnect(channel)
 
     def reconnect(self, channel):
@@ -135,6 +139,30 @@ class ConsumerBase(object):
         self.queue = kombu.entity.Queue(**self.kwargs)
         self.queue.declare()
 
+    def _callback_handler(self, message, callback):
+        """Call callback with deserialized message.
+
+        Messages that are processed without exception are ack'ed.
+
+        If the message processing generates an exception, it will be
+        ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
+        """
+
+        try:
+            msg = rpc_common.deserialize_msg(message.payload)
+            callback(msg)
+        except Exception:
+            if self.ack_on_error:
+                LOG.exception(_("Failed to process message"
+                                " ... skipping it."))
+                message.ack()
+            else:
+                LOG.exception(_("Failed to process message"
+                                " ... will requeue."))
+                message.requeue()
+        else:
+            message.ack()
+
     def consume(self, *args, **kwargs):
         """Actually declare the consumer on the amqp channel.  This will
         start the flow of messages from the queue.  Using the
@@ -147,8 +175,6 @@ class ConsumerBase(object):
         If kwargs['nowait'] is True, then this call will block until
         a message is read.
 
-        Messages will automatically be acked if the callback doesn't
-        raise an exception
         """
 
         options = {'consumer_tag': self.tag}
@@ -159,13 +185,7 @@ class ConsumerBase(object):
 
         def _callback(raw_message):
             message = self.channel.message_to_python(raw_message)
-            try:
-                msg = rpc_common.deserialize_msg(message.payload)
-                callback(msg)
-            except Exception:
-                LOG.exception(_("Failed to process message... skipping it."))
-            finally:
-                message.ack()
+            self._callback_handler(message, callback)
 
         self.queue.consume(*args, callback=_callback, **options)
 
@@ -425,7 +445,7 @@ class Connection(object):
                 'virtual_host': self.conf.rabbit_virtual_host,
             }
 
-            for sp_key, value in server_params.iteritems():
+            for sp_key, value in six.iteritems(server_params):
                 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
                 params[p_key] = value
 
@@ -451,7 +471,8 @@ class Connection(object):
 
         # http://docs.python.org/library/ssl.html - ssl.wrap_socket
         if self.conf.kombu_ssl_version:
-            ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+            ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+                self.conf.kombu_ssl_version)
         if self.conf.kombu_ssl_keyfile:
             ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
         if self.conf.kombu_ssl_certfile:
@@ -462,12 +483,8 @@ class Connection(object):
             # future with this?
             ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
 
-        if not ssl_params:
-            # Just have the default behavior
-            return True
-        else:
-            # Return the extended behavior
-            return ssl_params
+        # Return the extended behavior or just have the default behavior
+        return ssl_params or True
 
     def _connect(self, params):
         """Connect to rabbit.  Re-establish any queues that may have
@@ -534,13 +551,11 @@ class Connection(object):
             log_info.update(params)
 
             if self.max_retries and attempt == self.max_retries:
-                LOG.error(_('Unable to connect to AMQP server on '
-                            '%(hostname)s:%(port)d after %(max_retries)d '
-                            'tries: %(err_str)s') % log_info)
-                # NOTE(comstud): Copied from original code.  There's
-                # really no better recourse because if this was a queue we
-                # need to consume on, we have no way to consume anymore.
-                sys.exit(1)
+                msg = _('Unable to connect to AMQP server on '
+                        '%(hostname)s:%(port)d after %(max_retries)d '
+                        'tries: %(err_str)s') % log_info
+                LOG.error(msg)
+                raise rpc_common.RPCException(msg)
 
             if attempt == 1:
                 sleep_time = self.interval_start or 1
@@ -609,7 +624,7 @@ class Connection(object):
 
         def _declare_consumer():
             consumer = consumer_cls(self.conf, self.channel, topic, callback,
-                                    self.consumer_num.next())
+                                    six.next(self.consumer_num))
             self.consumers.append(consumer)
             return consumer
 
@@ -632,8 +647,8 @@ class Connection(object):
 
         def _consume():
             if info['do_consume']:
-                queues_head = self.consumers[:-1]
-                queues_tail = self.consumers[-1]
+                queues_head = self.consumers[:-1]  # not fanout.
+                queues_tail = self.consumers[-1]  # fanout
                 for queue in queues_head:
                     queue.consume(nowait=True)
                 queues_tail.consume(nowait=False)
@@ -682,11 +697,12 @@ class Connection(object):
         self.declare_consumer(DirectConsumer, topic, callback)
 
     def declare_topic_consumer(self, topic, callback=None, queue_name=None,
-                               exchange_name=None):
+                               exchange_name=None, ack_on_error=True):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
                                                 exchange_name=exchange_name,
+                                                ack_on_error=ack_on_error,
                                                 ),
                               topic, callback)
 
@@ -715,12 +731,13 @@ class Connection(object):
         it = self.iterconsume(limit=limit)
         while True:
             try:
-                it.next()
+                six.next(it)
             except StopIteration:
                 return
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -751,7 +768,7 @@ class Connection(object):
         self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
+                           exchange_name=None, ack_on_error=True):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.
 
@@ -765,6 +782,7 @@ class Connection(object):
             callback=callback,
             connection_pool=rpc_amqp.get_connection_pool(self.conf,
                                                          Connection),
+            wait_for_consumers=not ack_on_error
         )
         self.proxy_callbacks.append(callback_wrapper)
         self.declare_topic_consumer(
@@ -772,6 +790,7 @@ class Connection(object):
             topic=topic,
             exchange_name=exchange_name,
             callback=callback_wrapper,
+            ack_on_error=ack_on_error,
         )
 
 
index 67e0f9c628086109cf035a0f7739acfc9397dbdf..03b12e5d4d8aafca3806a35dcfd4e03d74bb6b1d 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #    Copyright 2011 - 2012, Red Hat, Inc.
 #
@@ -22,7 +20,9 @@ import time
 import eventlet
 import greenlet
 from oslo.config import cfg
+import six
 
+from neutron.openstack.common import excutils
 from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
@@ -149,10 +149,17 @@ class ConsumerBase(object):
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
-        self.reconnect(session)
+        self.connect(session)
+
+    def connect(self, session):
+        """Declare the receiver on connect."""
+        self._declare_receiver(session)
 
     def reconnect(self, session):
         """Re-declare the receiver after a qpid reconnect."""
+        self._declare_receiver(session)
+
+    def _declare_receiver(self, session):
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
@@ -183,11 +190,15 @@ class ConsumerBase(object):
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
+            # TODO(sandy): Need support for optional ack_on_error.
             self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
 
+    def get_node_name(self):
+        return self.address.split(';')[0]
+
 
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'."""
@@ -263,6 +274,7 @@ class FanoutConsumer(ConsumerBase):
         'topic' is the topic to listen on
         'callback' is the callback to call when messages are received
         """
+        self.conf = conf
 
         link_opts = {"exclusive": True}
 
@@ -371,7 +383,7 @@ class DirectPublisher(Publisher):
 class TopicPublisher(Publisher):
     """Publisher class for 'topic'."""
     def __init__(self, conf, session, topic):
-        """init a 'topic' publisher.
+        """Init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
 
@@ -388,7 +400,7 @@ class TopicPublisher(Publisher):
 class FanoutPublisher(Publisher):
     """Publisher class for 'fanout'."""
     def __init__(self, conf, session, topic):
-        """init a 'fanout' publisher.
+        """Init a 'fanout' publisher.
         """
 
         if conf.qpid_topology_version == 1:
@@ -407,7 +419,7 @@ class FanoutPublisher(Publisher):
 class NotifyPublisher(Publisher):
     """Publisher class for notifications."""
     def __init__(self, conf, session, topic):
-        """init a 'topic' publisher.
+        """Init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
         node_opts = {"durable": True}
@@ -515,7 +527,7 @@ class Connection(object):
             consumers = self.consumers
             self.consumers = {}
 
-            for consumer in consumers.itervalues():
+            for consumer in six.itervalues(consumers):
                 consumer.reconnect(self.session)
                 self._register_consumer(consumer)
 
@@ -673,12 +685,13 @@ class Connection(object):
         it = self.iterconsume(limit=limit)
         while True:
             try:
-                it.next()
+                six.next(it)
             except StopIteration:
                 return
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -719,7 +732,7 @@ class Connection(object):
         return consumer
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
+                           exchange_name=None, ack_on_error=True):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.
 
@@ -733,6 +746,7 @@ class Connection(object):
             callback=callback,
             connection_pool=rpc_amqp.get_connection_pool(self.conf,
                                                          Connection),
+            wait_for_consumers=not ack_on_error
         )
         self.proxy_callbacks.append(callback_wrapper)
 
index 2d9adf56477d01a7fce7f3918aa8621b8a1570df..33fa95dd0252cf327e17137df7b4abe72d55f24c 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -25,6 +23,8 @@ import uuid
 import eventlet
 import greenlet
 from oslo.config import cfg
+import six
+from six import moves
 
 from neutron.openstack.common import excutils
 from neutron.openstack.common.gettextutils import _
@@ -192,7 +192,7 @@ class ZmqSocket(object):
             # it would be much worse if some of the code calling this
             # were to fail. For now, lets log, and later evaluate
             # if we can safely raise here.
-            LOG.error("ZeroMQ socket could not be closed.")
+            LOG.error(_("ZeroMQ socket could not be closed."))
         self.sock = None
 
     def recv(self, **kwargs):
@@ -221,7 +221,7 @@ class ZmqClient(object):
             return
 
         rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
-        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+        zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
         self.outq.send(map(bytes,
                        (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
 
@@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase):
     def __init__(self, conf):
         super(ZmqBaseReactor, self).__init__()
 
-        self.mapping = {}
         self.proxies = {}
         self.threads = []
         self.sockets = []
@@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
 
         self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
 
-    def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
-                 zmq_type_out=None, in_bind=True, out_bind=True,
-                 subscribe=None):
+    def register(self, proxy, in_addr, zmq_type_in,
+                 in_bind=True, subscribe=None):
 
         LOG.info(_("Registering reactor"))
 
@@ -384,22 +382,8 @@ class ZmqBaseReactor(ConsumerBase):
 
         LOG.info(_("In reactor registered"))
 
-        if not out_addr:
-            return
-
-        if zmq_type_out not in (zmq.PUSH, zmq.PUB):
-            raise RPCException("Bad output socktype")
-
-        # Items push out.
-        outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
-        self.mapping[inq] = outq
-        self.mapping[outq] = inq
-        self.sockets.append(outq)
-
-        LOG.info(_("Out reactor registered"))
-
     def consume_in_thread(self):
+        @excutils.forever_retry_uncaught_exceptions
         def _consume(sock):
             LOG.info(_("Consuming socket"))
             while True:
@@ -516,8 +500,7 @@ class ZmqProxy(ZmqBaseReactor):
         try:
             self.register(consumption_proxy,
                           consume_in,
-                          zmq.PULL,
-                          out_bind=True)
+                          zmq.PULL)
         except zmq.ZMQError:
             if os.access(ipc_dir, os.X_OK):
                 with excutils.save_and_reraise_exception():
@@ -540,8 +523,8 @@ def unflatten_envelope(packenv):
     h = {}
     try:
         while True:
-            k = i.next()
-            h[k] = i.next()
+            k = six.next(i)
+            h[k] = six.next(i)
     except StopIteration:
         return h
 
@@ -559,11 +542,6 @@ class ZmqReactor(ZmqBaseReactor):
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
         LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
-        if sock in self.mapping:
-            LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
-                'data': data})
-            self.mapping[sock].send(data)
-            return
 
         proxy = self.proxies[sock]
 
index 71507f5700802e089d2cd159321cdb5d94b007fa..26b6b9f76fca78ff1d10b288ca731206c1b3ac73 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -92,7 +90,7 @@ class MatchMakerBase(object):
         """Acknowledge that a key.host is alive.
 
         Used internally for updating heartbeats, but may also be used
-        publically to acknowledge a system is alive (i.e. rpc message
+        publicly to acknowledge a system is alive (i.e. rpc message
         successfully sent to host)
         """
         pass
@@ -174,7 +172,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         """Acknowledge that a host.topic is alive.
 
         Used internally for updating heartbeats, but may also be used
-        publically to acknowledge a system is alive (i.e. rpc message
+        publicly to acknowledge a system is alive (i.e. rpc message
         successfully sent to host)
         """
         raise NotImplementedError("Must implement ack_alive")
@@ -248,9 +246,7 @@ class DirectBinding(Binding):
     that it maps directly to a host, thus direct.
     """
     def test(self, key):
-        if '.' in key:
-            return True
-        return False
+        return '.' in key
 
 
 class TopicBinding(Binding):
@@ -262,17 +258,13 @@ class TopicBinding(Binding):
     matches that of a direct exchange.
     """
     def test(self, key):
-        if '.' not in key:
-            return True
-        return False
+        return '.' not in key
 
 
 class FanoutBinding(Binding):
     """Match on fanout keys, where key starts with 'fanout.' string."""
     def test(self, key):
-        if key.startswith('fanout~'):
-            return True
-        return False
+        return key.startswith('fanout~')
 
 
 class StubExchange(Exchange):
index 5b53e5d9a34eb4c70e241383542050e65c8da563..0458060bf3de540962725eaac1629a191df6fe90 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2013 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -95,7 +93,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
         if not redis:
             raise ImportError("Failed to import module redis.")
 
-        self.redis = redis.StrictRedis(
+        self.redis = redis.Redis(
             host=CONF.matchmaker_redis.host,
             port=CONF.matchmaker_redis.port,
             password=CONF.matchmaker_redis.password)
index 15a4971d85d60214d481a1e460912f3db18cf559..831741922ea03a7397c0ad1374ad5a14ebefe94e 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011-2013 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -63,9 +61,7 @@ class RingExchange(mm.Exchange):
             self.ring0[k] = itertools.cycle(self.ring[k])
 
     def _ring_has(self, key):
-        if key in self.ring0:
-            return True
-        return False
+        return key in self.ring0
 
 
 class RoundRobinRingExchange(RingExchange):
index e180cdec7922662cc0f87addf8bbc83dbbd0fabe..5cb435fbf24e7901a1ba0cab876dd6ba0e706e37 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012-2013 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -21,6 +19,7 @@ For more information about rpc API version numbers, see:
     rpc/dispatcher.py
 """
 
+import six
 
 from neutron.openstack.common import rpc
 from neutron.openstack.common.rpc import common as rpc_common
@@ -36,7 +35,7 @@ class RpcProxy(object):
     rpc API.
     """
 
-    # The default namespace, which can be overriden in a subclass.
+    # The default namespace, which can be overridden in a subclass.
     RPC_API_NAMESPACE = None
 
     def __init__(self, topic, default_version, version_cap=None,
@@ -69,7 +68,7 @@ class RpcProxy(object):
         v = vers if vers else self.default_version
         if (self.version_cap and not
                 rpc_common.version_is_compatible(self.version_cap, v)):
-            raise rpc_common.RpcVersionCapError(version=self.version_cap)
+            raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
         msg['version'] = v
 
     def _get_topic(self, topic):
@@ -100,7 +99,7 @@ class RpcProxy(object):
         :returns: A new set of serialized arguments
         """
         new_kwargs = dict()
-        for argname, arg in kwargs.iteritems():
+        for argname, arg in six.iteritems(kwargs):
             new_kwargs[argname] = self.serializer.serialize_entity(context,
                                                                    arg)
         return new_kwargs
index 76c68310331b59c757a135fb974c129e13226f90..9bc6e2a3a03f339f8cef67aec14ed5b1a392a16f 100644 (file)
 
 import abc
 
+import six
 
+
+@six.add_metaclass(abc.ABCMeta)
 class Serializer(object):
     """Generic (de-)serialization definition base class."""
-    __metaclass__ = abc.ABCMeta
 
     @abc.abstractmethod
     def serialize_entity(self, context, entity):
index 39a98a2edcc4b2368afc56d146b4a8600301e229..4479dcc3ac9e1994bf5fac6cd2b1d7aec65a426d 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -32,10 +30,11 @@ class Service(service.Service):
 
     A service enables rpc by listening to queues based on topic and host.
     """
-    def __init__(self, host, topic, manager=None):
+    def __init__(self, host, topic, manager=None, serializer=None):
         super(Service, self).__init__()
         self.host = host
         self.topic = topic
+        self.serializer = serializer
         if manager is None:
             self.manager = self
         else:
@@ -48,7 +47,8 @@ class Service(service.Service):
         LOG.debug(_("Creating Consumer connection for Service %s") %
                   self.topic)
 
-        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+                                                  self.serializer)
 
         # Share this same connection for these Consumers
         self.conn.create_consumer(self.topic, dispatcher, fanout=False)
old mode 100755 (executable)
new mode 100644 (file)
index ca0e28e..bd32f0e
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
index b76d355ec0f1f71fb3d281132ab724d85e64c5b9..b8144bb3a73495966df82869c688e3b3f9a9410c 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
 """Generic Node base class for all workers that run on hosts."""
 
 import errno
+import logging as std_logging
 import os
 import random
 import signal
 import sys
 import time
 
+try:
+    # Importing just the symbol here because the io module does not
+    # exist in Python 2.6.
+    from io import UnsupportedOperation  # noqa
+except ImportError:
+    # Python 2.6
+    UnsupportedOperation = None
+
 import eventlet
-import logging as std_logging
+from eventlet import event
 from oslo.config import cfg
 
 from neutron.openstack.common import eventlet_backdoor
@@ -42,6 +49,53 @@ CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+def _sighup_supported():
+    return hasattr(signal, 'SIGHUP')
+
+
+def _is_daemon():
+    # The process group for a foreground process will match the
+    # process group of the controlling terminal. If those values do
+    # not match, or ioctl() fails on the stdout file handle, we assume
+    # the process is running in the background as a daemon.
+    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
+    try:
+        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
+    except OSError as err:
+        if err.errno == errno.ENOTTY:
+            # Assume we are a daemon because there is no terminal.
+            is_daemon = True
+        else:
+            raise
+    except UnsupportedOperation:
+        # Could not get the fileno for stdout, so we must be a daemon.
+        is_daemon = True
+    return is_daemon
+
+
+def _is_sighup_and_daemon(signo):
+    if not (_sighup_supported() and signo == signal.SIGHUP):
+        # Avoid checking if we are a daemon, because the signal isn't
+        # SIGHUP.
+        return False
+    return _is_daemon()
+
+
+def _signo_to_signame(signo):
+    signals = {signal.SIGTERM: 'SIGTERM',
+               signal.SIGINT: 'SIGINT'}
+    if _sighup_supported():
+        signals[signal.SIGHUP] = 'SIGHUP'
+    return signals[signo]
+
+
+def _set_signals_handler(handler):
+    signal.signal(signal.SIGTERM, handler)
+    signal.signal(signal.SIGINT, handler)
+    if _sighup_supported():
+        signal.signal(signal.SIGHUP, handler)
+
+
 class Launcher(object):
     """Launch one or more services and wait for them to complete."""
 
@@ -51,20 +105,9 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services = threadgroup.ThreadGroup()
+        self.services = Services()
         self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
 
-    @staticmethod
-    def run_service(service):
-        """Start and wait for a service to finish.
-
-        :param service: service to run and wait for.
-        :returns: None
-
-        """
-        service.start()
-        service.wait()
-
     def launch_service(self, service):
         """Load and start the given service.
 
@@ -73,7 +116,7 @@ class Launcher(object):
 
         """
         service.backdoor_port = self.backdoor_port
-        self._services.add_thread(self.run_service, service)
+        self.services.add(service)
 
     def stop(self):
         """Stop all services which are currently running.
@@ -81,7 +124,7 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.stop()
+        self.services.stop()
 
     def wait(self):
         """Waits until all services have been stopped, and then returns.
@@ -89,7 +132,16 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.wait()
+        self.services.wait()
+
+    def restart(self):
+        """Reload config files and restart service.
+
+        :returns: None
+
+        """
+        cfg.CONF.reload_config_files()
+        self.services.restart()
 
 
 class SignalExit(SystemExit):
@@ -101,33 +153,48 @@ class SignalExit(SystemExit):
 class ServiceLauncher(Launcher):
     def _handle_signal(self, signo, frame):
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
-
+        _set_signals_handler(signal.SIG_DFL)
         raise SignalExit(signo)
 
-    def wait(self):
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
+    def handle_signal(self):
+        _set_signals_handler(self._handle_signal)
+
+    def _wait_for_exit_or_signal(self, ready_callback=None):
+        status = None
+        signo = 0
 
         LOG.debug(_('Full set of CONF:'))
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
-        status = None
         try:
+            if ready_callback:
+                ready_callback()
             super(ServiceLauncher, self).wait()
         except SignalExit as exc:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[exc.signo]
+            signame = _signo_to_signame(exc.signo)
             LOG.info(_('Caught %s, exiting'), signame)
             status = exc.code
+            signo = exc.signo
         except SystemExit as exc:
             status = exc.code
         finally:
-            if rpc:
-                rpc.cleanup()
             self.stop()
-        return status
+            if rpc:
+                try:
+                    rpc.cleanup()
+                except Exception:
+                    # We're shutting down, so it doesn't matter at this point.
+                    LOG.exception(_('Exception during rpc cleanup.'))
+
+        return status, signo
+
+    def wait(self, ready_callback=None):
+        while True:
+            self.handle_signal()
+            status, signo = self._wait_for_exit_or_signal(ready_callback)
+            if not _is_sighup_and_daemon(signo):
+                return status
+            self.restart()
 
 
 class ServiceWrapper(object):
@@ -139,23 +206,29 @@ class ServiceWrapper(object):
 
 
 class ProcessLauncher(object):
-    def __init__(self):
+    def __init__(self, wait_interval=0.01):
+        """Constructor.
+
+        :param wait_interval: The interval to sleep for between checks
+                              of child process exit.
+        """
         self.children = {}
         self.sigcaught = None
         self.running = True
+        self.wait_interval = wait_interval
         rfd, self.writepipe = os.pipe()
         self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+        self.handle_signal()
 
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
+    def handle_signal(self):
+        _set_signals_handler(self._handle_signal)
 
     def _handle_signal(self, signo, frame):
         self.sigcaught = signo
         self.running = False
 
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
+        _set_signals_handler(signal.SIG_DFL)
 
     def _pipe_watcher(self):
         # This will block until the write end is closed when the parent
@@ -166,16 +239,49 @@ class ProcessLauncher(object):
 
         sys.exit(1)
 
-    def _child_process(self, service):
+    def _child_process_handle_signal(self):
         # Setup child signal handlers differently
         def _sigterm(*args):
             signal.signal(signal.SIGTERM, signal.SIG_DFL)
             raise SignalExit(signal.SIGTERM)
 
+        def _sighup(*args):
+            signal.signal(signal.SIGHUP, signal.SIG_DFL)
+            raise SignalExit(signal.SIGHUP)
+
         signal.signal(signal.SIGTERM, _sigterm)
+        if _sighup_supported():
+            signal.signal(signal.SIGHUP, _sighup)
         # Block SIGINT and let the parent send us a SIGTERM
         signal.signal(signal.SIGINT, signal.SIG_IGN)
 
+    def _child_wait_for_exit_or_signal(self, launcher):
+        status = 0
+        signo = 0
+
+        # NOTE(johannes): All exceptions are caught to ensure this
+        # doesn't fallback into the loop spawning children. It would
+        # be bad for a child to spawn more children.
+        try:
+            launcher.wait()
+        except SignalExit as exc:
+            signame = _signo_to_signame(exc.signo)
+            LOG.info(_('Caught %s, exiting'), signame)
+            status = exc.code
+            signo = exc.signo
+        except SystemExit as exc:
+            status = exc.code
+        except BaseException:
+            LOG.exception(_('Unhandled exception'))
+            status = 2
+        finally:
+            launcher.stop()
+
+        return status, signo
+
+    def _child_process(self, service):
+        self._child_process_handle_signal()
+
         # Reopen the eventlet hub to make sure we don't share an epoll
         # fd with parent and/or siblings, which would be bad
         eventlet.hubs.use_hub()
@@ -189,7 +295,8 @@ class ProcessLauncher(object):
         random.seed()
 
         launcher = Launcher()
-        launcher.run_service(service)
+        launcher.launch_service(service)
+        return launcher
 
     def _start_child(self, wrap):
         if len(wrap.forktimes) > wrap.workers:
@@ -207,24 +314,13 @@ class ProcessLauncher(object):
 
         pid = os.fork()
         if pid == 0:
-            # NOTE(johannes): All exceptions are caught to ensure this
-            # doesn't fallback into the loop spawning children. It would
-            # be bad for a child to spawn more children.
-            status = 0
-            try:
-                self._child_process(wrap.service)
-            except SignalExit as exc:
-                signame = {signal.SIGTERM: 'SIGTERM',
-                           signal.SIGINT: 'SIGINT'}[exc.signo]
-                LOG.info(_('Caught %s, exiting'), signame)
-                status = exc.code
-            except SystemExit as exc:
-                status = exc.code
-            except BaseException:
-                LOG.exception(_('Unhandled exception'))
-                status = 2
-            finally:
-                wrap.service.stop()
+            launcher = self._child_process(wrap.service)
+            while True:
+                self._child_process_handle_signal()
+                status, signo = self._child_wait_for_exit_or_signal(launcher)
+                if not _is_sighup_and_daemon(signo):
+                    break
+                launcher.restart()
 
             os._exit(status)
 
@@ -270,28 +366,37 @@ class ProcessLauncher(object):
         wrap.children.remove(pid)
         return wrap
 
-    def wait(self):
-        """Loop waiting on children to die and respawning as necessary."""
-
-        LOG.debug(_('Full set of CONF:'))
-        CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+    def _respawn_children(self):
         while self.running:
             wrap = self._wait_child()
             if not wrap:
                 # Yield to other threads if no children have exited
                 # Sleep for a short time to avoid excessive CPU usage
                 # (see bug #1095346)
-                eventlet.greenthread.sleep(.01)
+                eventlet.greenthread.sleep(self.wait_interval)
                 continue
-
             while self.running and len(wrap.children) < wrap.workers:
                 self._start_child(wrap)
 
-        if self.sigcaught:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
-            LOG.info(_('Caught %s, stopping children'), signame)
+    def wait(self):
+        """Loop waiting on children to die and respawning as necessary."""
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        while True:
+            self.handle_signal()
+            self._respawn_children()
+            if self.sigcaught:
+                signame = _signo_to_signame(self.sigcaught)
+                LOG.info(_('Caught %s, stopping children'), signame)
+            if not _is_sighup_and_daemon(self.sigcaught):
+                break
+
+            for pid in self.children:
+                os.kill(pid, signal.SIGHUP)
+            self.running = True
+            self.sigcaught = None
 
         for pid in self.children:
             try:
@@ -313,15 +418,74 @@ class Service(object):
     def __init__(self, threads=1000):
         self.tg = threadgroup.ThreadGroup(threads)
 
+        # signal that the service is done shutting itself down:
+        self._done = event.Event()
+
+    def reset(self):
+        # NOTE(Fengqian): docs for Event.reset() recommend against using it
+        self._done = event.Event()
+
     def start(self):
         pass
 
     def stop(self):
         self.tg.stop()
+        self.tg.wait()
+        # Signal that service cleanup is done:
+        if not self._done.ready():
+            self._done.send()
+
+    def wait(self):
+        self._done.wait()
+
+
+class Services(object):
+
+    def __init__(self):
+        self.services = []
+        self.tg = threadgroup.ThreadGroup()
+        self.done = event.Event()
+
+    def add(self, service):
+        self.services.append(service)
+        self.tg.add_thread(self.run_service, service, self.done)
+
+    def stop(self):
+        # wait for graceful shutdown of services:
+        for service in self.services:
+            service.stop()
+            service.wait()
+
+        # Each service has performed cleanup, now signal that the run_service
+        # wrapper threads can now die:
+        if not self.done.ready():
+            self.done.send()
+
+        # reap threads:
+        self.tg.stop()
 
     def wait(self):
         self.tg.wait()
 
+    def restart(self):
+        self.stop()
+        self.done = event.Event()
+        for restart_service in self.services:
+            restart_service.reset()
+            self.tg.add_thread(self.run_service, restart_service, self.done)
+
+    @staticmethod
+    def run_service(service, done):
+        """Service start wrapper.
+
+        :param service: service to run
+        :param done: event to wait on until a shutdown is triggered
+        :returns: None
+
+        """
+        service.start()
+        done.wait()
+
 
 def launch(service, workers=None):
     if workers:
diff --git a/neutron/openstack/common/sslutils.py b/neutron/openstack/common/sslutils.py
new file mode 100644 (file)
index 0000000..1d07937
--- /dev/null
@@ -0,0 +1,98 @@
+# Copyright 2013 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import ssl
+
+from oslo.config import cfg
+
+from neutron.openstack.common.gettextutils import _
+
+
+ssl_opts = [
+    cfg.StrOpt('ca_file',
+               default=None,
+               help="CA certificate file to use to verify "
+                    "connecting clients"),
+    cfg.StrOpt('cert_file',
+               default=None,
+               help="Certificate file to use when starting "
+                    "the server securely"),
+    cfg.StrOpt('key_file',
+               default=None,
+               help="Private key file to use when starting "
+                    "the server securely"),
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(ssl_opts, "ssl")
+
+
+def is_enabled():
+    cert_file = CONF.ssl.cert_file
+    key_file = CONF.ssl.key_file
+    ca_file = CONF.ssl.ca_file
+    use_ssl = cert_file or key_file
+
+    if cert_file and not os.path.exists(cert_file):
+        raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
+
+    if ca_file and not os.path.exists(ca_file):
+        raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
+
+    if key_file and not os.path.exists(key_file):
+        raise RuntimeError(_("Unable to find key_file : %s") % key_file)
+
+    if use_ssl and (not cert_file or not key_file):
+        raise RuntimeError(_("When running server in SSL mode, you must "
+                             "specify both a cert_file and key_file "
+                             "option value in your configuration file"))
+
+    return use_ssl
+
+
+def wrap(sock):
+    ssl_kwargs = {
+        'server_side': True,
+        'certfile': CONF.ssl.cert_file,
+        'keyfile': CONF.ssl.key_file,
+        'cert_reqs': ssl.CERT_NONE,
+    }
+
+    if CONF.ssl.ca_file:
+        ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
+        ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+    return ssl.wrap_socket(sock, **ssl_kwargs)
+
+
+_SSL_PROTOCOLS = {
+    "tlsv1": ssl.PROTOCOL_TLSv1,
+    "sslv23": ssl.PROTOCOL_SSLv23,
+    "sslv3": ssl.PROTOCOL_SSLv3
+}
+
+try:
+    _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
+except AttributeError:
+    pass
+
+
+def validate_ssl_version(version):
+    key = version.lower()
+    try:
+        return _SSL_PROTOCOLS[key]
+    except KeyError:
+        raise RuntimeError(_("Invalid SSL version : %s") % version)
index 27056904222871f2772f01d6c941e6e67a437628..5cfd59c94d4b264e9287dfc4fa100879bd79f7d9 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,7 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from eventlet import greenlet
+import eventlet
 from eventlet import greenpool
 from eventlet import greenthread
 
@@ -48,6 +46,9 @@ class Thread(object):
     def wait(self):
         return self.thread.wait()
 
+    def link(self, func, *args, **kwargs):
+        self.thread.link(func, *args, **kwargs)
+
 
 class ThreadGroup(object):
     """The point of the ThreadGroup classis to:
@@ -79,13 +80,17 @@ class ThreadGroup(object):
         gt = self.pool.spawn(callback, *args, **kwargs)
         th = Thread(gt, self)
         self.threads.append(th)
+        return th
 
     def thread_done(self, thread):
         self.threads.remove(thread)
 
     def stop(self):
         current = greenthread.getcurrent()
-        for x in self.threads:
+
+        # Iterate over a copy of self.threads so thread_done doesn't
+        # modify the list while we're iterating
+        for x in self.threads[:]:
             if x is current:
                 # don't kill the current thread.
                 continue
@@ -105,17 +110,20 @@ class ThreadGroup(object):
         for x in self.timers:
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
         current = greenthread.getcurrent()
-        for x in self.threads:
+
+        # Iterate over a copy of self.threads so thread_done doesn't
+        # modify the list while we're iterating
+        for x in self.threads[:]:
             if x is current:
                 continue
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
index ac2441bcb41cea1faeb2f93133f68fd07b848e59..d5ed81d3e3e255a3f252f5b24f29e226ce3121c6 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -21,8 +19,10 @@ Time related utilities and helper functions.
 
 import calendar
 import datetime
+import time
 
 import iso8601
+import six
 
 
 # ISO 8601 extended time format with microseconds
@@ -48,9 +48,9 @@ def parse_isotime(timestr):
     try:
         return iso8601.parse_date(timestr)
     except iso8601.ParseError as e:
-        raise ValueError(e.message)
+        raise ValueError(six.text_type(e))
     except TypeError as e:
-        raise ValueError(e.message)
+        raise ValueError(six.text_type(e))
 
 
 def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@@ -75,20 +75,31 @@ def normalize_time(timestamp):
 
 def is_older_than(before, seconds):
     """Return True if before is older than seconds."""
-    if isinstance(before, basestring):
+    if isinstance(before, six.string_types):
         before = parse_strtime(before).replace(tzinfo=None)
+    else:
+        before = before.replace(tzinfo=None)
+
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
 def is_newer_than(after, seconds):
     """Return True if after is newer than seconds."""
-    if isinstance(after, basestring):
+    if isinstance(after, six.string_types):
         after = parse_strtime(after).replace(tzinfo=None)
+    else:
+        after = after.replace(tzinfo=None)
+
     return after - utcnow() > datetime.timedelta(seconds=seconds)
 
 
 def utcnow_ts():
     """Timestamp version of our utcnow function."""
+    if utcnow.override_time is None:
+        # NOTE(kgriffs): This is several times faster
+        # than going through calendar.timegm(...)
+        return int(time.time())
+
     return calendar.timegm(utcnow().timetuple())
 
 
@@ -110,12 +121,15 @@ def iso8601_from_timestamp(timestamp):
 utcnow.override_time = None
 
 
-def set_time_override(override_time=datetime.datetime.utcnow()):
+def set_time_override(override_time=None):
     """Overrides utils.utcnow.
 
     Make it return a constant time or a list thereof, one at a time.
+
+    :param override_time: datetime instance or list thereof. If not
+                          given, defaults to the current UTC time.
     """
-    utcnow.override_time = override_time
+    utcnow.override_time = override_time or datetime.datetime.utcnow()
 
 
 def advance_time_delta(timedelta):
@@ -168,6 +182,15 @@ def delta_seconds(before, after):
     datetime objects (as a float, to microsecond resolution).
     """
     delta = after - before
+    return total_seconds(delta)
+
+
+def total_seconds(delta):
+    """Return the total seconds of datetime.timedelta object.
+
+    Compute total seconds of datetime.timedelta, datetime.timedelta
+    doesn't have method total_seconds in Python2.6, calculate it manually.
+    """
     try:
         return delta.total_seconds()
     except AttributeError:
@@ -178,8 +201,8 @@ def delta_seconds(before, after):
 def is_soon(dt, window):
     """Determines if time is going to happen in the next window seconds.
 
-    :params dt: the time
-    :params window: minimum seconds to remain to consider the time not soon
+    :param dt: the time
+    :param window: minimum seconds to remain to consider the time not soon
 
     :return: True if expiration is within the given duration
     """
index 7608acb9421fe93b28d8a0fffab2d66190548149..234b880c999608b15cca113921f16736df44a1e5 100644 (file)
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2012 Intel Corporation.
 # All Rights Reserved.
 #
diff --git a/neutron/openstack/common/versionutils.py b/neutron/openstack/common/versionutils.py
new file mode 100644 (file)
index 0000000..04472a8
--- /dev/null
@@ -0,0 +1,148 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Helpers for comparing version strings.
+"""
+
+import functools
+import pkg_resources
+
+from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class deprecated(object):
+    """A decorator to mark callables as deprecated.
+
+    This decorator logs a deprecation message when the callable it decorates is
+    used. The message will include the release where the callable was
+    deprecated, the release where it may be removed and possibly an optional
+    replacement.
+
+    Examples:
+
+    1. Specifying the required deprecated release
+
+    >>> @deprecated(as_of=deprecated.ICEHOUSE)
+    ... def a(): pass
+
+    2. Specifying a replacement:
+
+    >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()')
+    ... def b(): pass
+
+    3. Specifying the release where the functionality may be removed:
+
+    >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1)
+    ... def c(): pass
+
+    """
+
+    FOLSOM = 'F'
+    GRIZZLY = 'G'
+    HAVANA = 'H'
+    ICEHOUSE = 'I'
+
+    _RELEASES = {
+        'F': 'Folsom',
+        'G': 'Grizzly',
+        'H': 'Havana',
+        'I': 'Icehouse',
+    }
+
+    _deprecated_msg_with_alternative = _(
+        '%(what)s is deprecated as of %(as_of)s in favor of '
+        '%(in_favor_of)s and may be removed in %(remove_in)s.')
+
+    _deprecated_msg_no_alternative = _(
+        '%(what)s is deprecated as of %(as_of)s and may be '
+        'removed in %(remove_in)s. It will not be superseded.')
+
+    def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None):
+        """Initialize decorator
+
+        :param as_of: the release deprecating the callable. Constants
+            are define in this class for convenience.
+        :param in_favor_of: the replacement for the callable (optional)
+        :param remove_in: an integer specifying how many releases to wait
+            before removing (default: 2)
+        :param what: name of the thing being deprecated (default: the
+            callable's name)
+
+        """
+        self.as_of = as_of
+        self.in_favor_of = in_favor_of
+        self.remove_in = remove_in
+        self.what = what
+
+    def __call__(self, func):
+        if not self.what:
+            self.what = func.__name__ + '()'
+
+        @functools.wraps(func)
+        def wrapped(*args, **kwargs):
+            msg, details = self._build_message()
+            LOG.deprecated(msg, details)
+            return func(*args, **kwargs)
+        return wrapped
+
+    def _get_safe_to_remove_release(self, release):
+        # TODO(dstanek): this method will have to be reimplemented once
+        #    when we get to the X release because once we get to the Y
+        #    release, what is Y+2?
+        new_release = chr(ord(release) + self.remove_in)
+        if new_release in self._RELEASES:
+            return self._RELEASES[new_release]
+        else:
+            return new_release
+
+    def _build_message(self):
+        details = dict(what=self.what,
+                       as_of=self._RELEASES[self.as_of],
+                       remove_in=self._get_safe_to_remove_release(self.as_of))
+
+        if self.in_favor_of:
+            details['in_favor_of'] = self.in_favor_of
+            msg = self._deprecated_msg_with_alternative
+        else:
+            msg = self._deprecated_msg_no_alternative
+        return msg, details
+
+
+def is_compatible(requested_version, current_version, same_major=True):
+    """Determine whether `requested_version` is satisfied by
+    `current_version`; in other words, `current_version` is >=
+    `requested_version`.
+
+    :param requested_version: version to check for compatibility
+    :param current_version: version to check against
+    :param same_major: if True, the major version must be identical between
+        `requested_version` and `current_version`. This is used when a
+        major-version difference indicates incompatibility between the two
+        versions. Since this is the common-case in practice, the default is
+        True.
+    :returns: True if compatible, False if not
+    """
+    requested_parts = pkg_resources.parse_version(requested_version)
+    current_parts = pkg_resources.parse_version(current_version)
+
+    if same_major and (requested_parts[0] != current_parts[0]):
+        return False
+
+    return current_parts >= requested_parts
index 83e8bfa3d37a75bad069953d38c993b9080c2cc2..ed3ea33fc6186402ee2a3a6e32e73621a7b76a0f 100644 (file)
@@ -168,7 +168,7 @@ class ResourceTestCase(base.BaseTestCase):
         self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
                          expected_res)
 
-    @mock.patch('neutron.openstack.common.gettextutils.get_localized_message')
+    @mock.patch('neutron.openstack.common.gettextutils.translate')
     def test_unmapped_neutron_error_localized(self, mock_translation):
         gettextutils.install('blaa', lazy=True)
         msg_translation = 'Translated error'
@@ -238,7 +238,7 @@ class ResourceTestCase(base.BaseTestCase):
         self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
                          expected_res)
 
-    @mock.patch('neutron.openstack.common.gettextutils.get_localized_message')
+    @mock.patch('neutron.openstack.common.gettextutils.translate')
     def test_mapped_neutron_error_localized(self, mock_translation):
         gettextutils.install('blaa', lazy=True)
         msg_translation = 'Translated error'
index f19881ce471898cc08babc3906702db562e334f1..9e7964b4582f8e4f1c9fd6b222846b8df88e44be 100644 (file)
@@ -1020,7 +1020,7 @@ class Router(object):
         if not match:
             language = req.best_match_language()
             msg = _('The resource could not be found.')
-            msg = gettextutils.get_localized_message(msg, language)
+            msg = gettextutils.translate(msg, language)
             return webob.exc.HTTPNotFound(explanation=msg)
         app = match['controller']
         return app
index e548fd774f760ebbfc0733343c17e7fd09ccb7cf..8e578748aace2b265b7acfde6dd4fd5e8b3731f1 100644 (file)
@@ -21,12 +21,15 @@ module=notifier
 module=periodic_task
 module=policy
 module=processutils
+module=py3kcompat
 module=rpc
 module=service
+module=sslutils
 module=rootwrap
 module=threadgroup
 module=timeutils
 module=uuidutils
+module=versionutils
 
 # The base module to hold the copy of openstack.common
 base=neutron