]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Synced rpc and gettextutils modules from oslo-incubator
authorIhar Hrachyshka <ihrachys@redhat.com>
Mon, 17 Mar 2014 13:18:28 +0000 (14:18 +0100)
committerIhar Hrachyshka <ihrachys@redhat.com>
Wed, 26 Mar 2014 15:23:49 +0000 (16:23 +0100)
The main reason for sync is to get the following oslo-rpc fixes in Neutron:
* I537015f452eb770acba41fdedfe221628f52a920 (reduces delays when reconnecting
  to Qpid in HA deployments)
* Ia148baa6e1ec632789ac3621c85173c2c16f3918 (fixed HA failover, Qpid part)
* I67923cb024bbd143edc8edccf35b9b400df31eb3 (fixed HA failover, RabbitMQ part)

Latest oslo-incubator commit at the moment of sync:
2eab986ef3c43f8d1e25065e3cbc1307860c25c7

Change-Id: I2f5bb0d195e050f755ecdbf06a6bbed587a04fbe
Closes-Bug: 1281148
Closes-Bug: 1261631

13 files changed:
neutron/openstack/common/__init__.py
neutron/openstack/common/gettextutils.py
neutron/openstack/common/rpc/__init__.py
neutron/openstack/common/rpc/amqp.py
neutron/openstack/common/rpc/common.py
neutron/openstack/common/rpc/impl_fake.py
neutron/openstack/common/rpc/impl_kombu.py
neutron/openstack/common/rpc/impl_qpid.py
neutron/openstack/common/rpc/impl_zmq.py
neutron/openstack/common/rpc/matchmaker.py
neutron/openstack/common/rpc/matchmaker_redis.py
neutron/openstack/common/rpc/matchmaker_ring.py
neutron/openstack/common/rpc/service.py

index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..d1223eaf7656b29ca61b0b2a2fd33d3073feef11 100644 (file)
@@ -0,0 +1,17 @@
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import six
+
+
+six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
index 618d8fbc51568bd3d99e676c2e0492ebe5abee8d..1c33bfb83e4b9c8e2c5f242afa3184bebe3cef27 100644 (file)
@@ -23,11 +23,11 @@ Usual usage in an openstack.common module:
 """
 
 import copy
+import functools
 import gettext
 import locale
 from logging import handlers
 import os
-import re
 
 from babel import localedata
 import six
@@ -35,6 +35,17 @@ import six
 _localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
 _t = gettext.translation('neutron', localedir=_localedir, fallback=True)
 
+# We use separate translation catalogs for each log level, so set up a
+# mapping between the log level name and the translator. The domain
+# for the log level is project_name + "-log-" + log_level so messages
+# for each level end up in their own catalog.
+_t_log_levels = dict(
+    (level, gettext.translation('neutron' + '-log-' + level,
+                                localedir=_localedir,
+                                fallback=True))
+    for level in ['info', 'warning', 'error', 'critical']
+)
+
 _AVAILABLE_LANGUAGES = {}
 USE_LAZY = False
 
@@ -60,6 +71,28 @@ def _(msg):
         return _t.ugettext(msg)
 
 
+def _log_translation(msg, level):
+    """Build a single translation of a log message
+    """
+    if USE_LAZY:
+        return Message(msg, domain='neutron' + '-log-' + level)
+    else:
+        translator = _t_log_levels[level]
+        if six.PY3:
+            return translator.gettext(msg)
+        return translator.ugettext(msg)
+
+# Translators for log levels.
+#
+# The abbreviated names are meant to reflect the usual use of a short
+# name like '_'. The "L" is for "log" and the other letter comes from
+# the level.
+_LI = functools.partial(_log_translation, level='info')
+_LW = functools.partial(_log_translation, level='warning')
+_LE = functools.partial(_log_translation, level='error')
+_LC = functools.partial(_log_translation, level='critical')
+
+
 def install(domain, lazy=False):
     """Install a _() function using the given translation domain.
 
@@ -118,7 +151,8 @@ class Message(six.text_type):
     and can be treated as such.
     """
 
-    def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
+    def __new__(cls, msgid, msgtext=None, params=None,
+                domain='neutron', *args):
         """Create a new Message object.
 
         In order for translation to work gettext requires a message ID, this
@@ -213,47 +247,22 @@ class Message(six.text_type):
         if other is None:
             params = (other,)
         elif isinstance(other, dict):
-            params = self._trim_dictionary_parameters(other)
-        else:
-            params = self._copy_param(other)
-        return params
-
-    def _trim_dictionary_parameters(self, dict_param):
-        """Return a dict that only has matching entries in the msgid."""
-        # NOTE(luisg): Here we trim down the dictionary passed as parameters
-        # to avoid carrying a lot of unnecessary weight around in the message
-        # object, for example if someone passes in Message() % locals() but
-        # only some params are used, and additionally we prevent errors for
-        # non-deepcopyable objects by unicoding() them.
-
-        # Look for %(param) keys in msgid;
-        # Skip %% and deal with the case where % is first character on the line
-        keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
-
-        # If we don't find any %(param) keys but have a %s
-        if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
-            # Apparently the full dictionary is the parameter
-            params = self._copy_param(dict_param)
-        else:
+            # Merge the dictionaries
+            # Copy each item in case one does not support deep copy.
             params = {}
-            # Save our existing parameters as defaults to protect
-            # ourselves from losing values if we are called through an
-            # (erroneous) chain that builds a valid Message with
-            # arguments, and then does something like "msg % kwds"
-            # where kwds is an empty dictionary.
-            src = {}
             if isinstance(self.params, dict):
-                src.update(self.params)
-            src.update(dict_param)
-            for key in keys:
-                params[key] = self._copy_param(src[key])
-
+                for key, val in self.params.items():
+                    params[key] = self._copy_param(val)
+            for key, val in other.items():
+                params[key] = self._copy_param(val)
+        else:
+            params = self._copy_param(other)
         return params
 
     def _copy_param(self, param):
         try:
             return copy.deepcopy(param)
-        except TypeError:
+        except Exception:
             # Fallback to casting to unicode this will handle the
             # python code-like objects that can't be deep-copied
             return six.text_type(param)
@@ -297,9 +306,27 @@ def get_available_languages(domain):
     list_identifiers = (getattr(localedata, 'list', None) or
                         getattr(localedata, 'locale_identifiers'))
     locale_identifiers = list_identifiers()
+
     for i in locale_identifiers:
         if find(i) is not None:
             language_list.append(i)
+
+    # NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
+    # locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
+    # are perfectly legitimate locales:
+    #     https://github.com/mitsuhiko/babel/issues/37
+    # In Babel 1.3 they fixed the bug and they support these locales, but
+    # they are still not explicitly "listed" by locale_identifiers().
+    # That is  why we add the locales here explicitly if necessary so that
+    # they are listed as supported.
+    aliases = {'zh': 'zh_CN',
+               'zh_Hant_HK': 'zh_HK',
+               'zh_Hant': 'zh_TW',
+               'fil': 'tl_PH'}
+    for (locale, alias) in six.iteritems(aliases):
+        if locale in language_list and alias not in language_list:
+            language_list.append(alias)
+
     _AVAILABLE_LANGUAGES[domain] = language_list
     return copy.copy(language_list)
 
index 046b35272b5afa5c346ab7d835604f362891fbbc..77578e64272fb8161168cf8598896f6e152e583c 100644 (file)
@@ -23,13 +23,9 @@ For some wrappers that add message versioning to rpc, see:
     rpc.proxy
 """
 
-import inspect
-
 from oslo.config import cfg
 
-from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import importutils
-from neutron.openstack.common import local
 from neutron.openstack.common import log as logging
 
 
@@ -93,24 +89,7 @@ def create_connection(new=True):
     return _get_impl().create_connection(CONF, new=new)
 
 
-def _check_for_lock():
-    if not CONF.debug:
-        return None
-
-    if ((hasattr(local.strong_store, 'locks_held')
-         and local.strong_store.locks_held)):
-        stack = ' :: '.join([frame[3] for frame in inspect.stack()])
-        LOG.warn(_('A RPC is being made while holding a lock. The locks '
-                   'currently held are %(locks)s. This is probably a bug. '
-                   'Please report it. Include the following: [%(stack)s].'),
-                 {'locks': local.strong_store.locks_held,
-                  'stack': stack})
-        return True
-
-    return False
-
-
-def call(context, topic, msg, timeout=None, check_for_lock=False):
+def call(context, topic, msg, timeout=None):
     """Invoke a remote method that returns something.
 
     :param context: Information that identifies the user that has made this
@@ -124,16 +103,12 @@ def call(context, topic, msg, timeout=None, check_for_lock=False):
                                              "args" : dict_of_kwargs }
     :param timeout: int, number of seconds to use for a response timeout.
                     If set, this overrides the rpc_response_timeout option.
-    :param check_for_lock: if True, a warning is emitted if a RPC call is made
-                    with a lock held.
 
     :returns: A dict from the remote method.
 
     :raises: openstack.common.rpc.common.Timeout if a complete response
              is not received before the timeout is reached.
     """
-    if check_for_lock:
-        _check_for_lock()
     return _get_impl().call(CONF, context, topic, msg, timeout)
 
 
@@ -176,7 +151,7 @@ def fanout_cast(context, topic, msg):
     return _get_impl().fanout_cast(CONF, context, topic, msg)
 
 
-def multicall(context, topic, msg, timeout=None, check_for_lock=False):
+def multicall(context, topic, msg, timeout=None):
     """Invoke a remote method and get back an iterator.
 
     In this case, the remote method will be returning multiple values in
@@ -194,8 +169,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
                                              "args" : dict_of_kwargs }
     :param timeout: int, number of seconds to use for a response timeout.
                     If set, this overrides the rpc_response_timeout option.
-    :param check_for_lock: if True, a warning is emitted if a RPC call is made
-                    with a lock held.
 
     :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
               an index that starts at 0 and increases by one for each value
@@ -205,8 +178,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
     :raises: openstack.common.rpc.common.Timeout if a complete response
              is not received before the timeout is reached.
     """
-    if check_for_lock:
-        _check_for_lock()
     return _get_impl().multicall(CONF, context, topic, msg, timeout)
 
 
index a0fa8cef22aea4fbb2d4887635a2aade99765442..b0f52395a438bcefce500c7987624b862db49e16 100644 (file)
@@ -37,7 +37,7 @@ import six
 
 
 from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE
 from neutron.openstack.common import local
 from neutron.openstack.common import log as logging
 from neutron.openstack.common.rpc import common as rpc_common
@@ -72,7 +72,7 @@ class Pool(pools.Pool):
 
     # TODO(comstud): Timeout connections not used in a while
     def create(self):
-        LOG.debug(_('Pool creating new connection'))
+        LOG.debug('Pool creating new connection')
         return self.connection_cls(self.conf)
 
     def empty(self):
@@ -287,7 +287,7 @@ def unpack_context(conf, msg):
     context_dict['reply_q'] = msg.pop('_reply_q', None)
     context_dict['conf'] = conf
     ctx = RpcContext.from_dict(context_dict)
-    rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+    rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
     return ctx
 
 
@@ -339,7 +339,7 @@ def _add_unique_id(msg):
     """Add unique_id for checking duplicate messages."""
     unique_id = uuid.uuid4().hex
     msg.update({UNIQUE_ID: unique_id})
-    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+    LOG.debug('UNIQUE_ID is %s.' % (unique_id))
 
 
 class _ThreadPoolWithWait(object):
@@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait):
         # the previous context is stored in local.store.context
         if hasattr(local.store, 'context'):
             del local.store.context
-        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+        rpc_common._safe_log(LOG.debug, 'received %s', message_data)
         self.msg_id_cache.check_duplicate_message(message_data)
         ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
@@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait):
             # This final None tells multicall that it is done.
             ctxt.reply(ending=True, connection_pool=self.connection_pool)
         except rpc_common.ClientException as e:
-            LOG.debug(_('Expected exception during message handling (%s)') %
+            LOG.debug('Expected exception during message handling (%s)' %
                       e._exc_info[1])
             ctxt.reply(None, e._exc_info,
                        connection_pool=self.connection_pool,
@@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait):
         except Exception:
             # sys.exc_info() is deleted by LOG.exception().
             exc_info = sys.exc_info()
-            LOG.error(_('Exception during message handling'),
+            LOG.error(_LE('Exception during message handling'),
                       exc_info=exc_info)
             ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
 
@@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore()
 
 def multicall(conf, context, topic, msg, timeout, connection_pool):
     """Make a call that returns multiple times."""
-    LOG.debug(_('Making synchronous call on %s ...'), topic)
+    LOG.debug('Making synchronous call on %s ...', topic)
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
-    LOG.debug(_('MSG_ID is %s') % (msg_id))
+    LOG.debug('MSG_ID is %s' % (msg_id))
     _add_unique_id(msg)
     pack_context(msg, context)
 
@@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
 
 def cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a topic without waiting for a response."""
-    LOG.debug(_('Making asynchronous cast on %s...'), topic)
+    LOG.debug('Making asynchronous cast on %s...', topic)
     _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
@@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool):
 
 def fanout_cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a fanout exchange without waiting for a response."""
-    LOG.debug(_('Making asynchronous fanout cast...'))
+    LOG.debug('Making asynchronous fanout cast...')
     _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
@@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
 
 def notify(conf, context, topic, msg, connection_pool, envelope):
     """Sends a notification event on a topic."""
-    LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+    LOG.debug('Sending %(event_type)s on %(topic)s',
               dict(event_type=msg.get('event_type'),
                    topic=topic))
     _add_unique_id(msg)
index aa772fb95bd23cebedac41137b31aa1f6c6a668a..34d484649e94e584e8cd9017f5486a3506bc7849 100644 (file)
@@ -22,7 +22,7 @@ import traceback
 from oslo.config import cfg
 import six
 
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import local
@@ -85,7 +85,7 @@ class RPCException(Exception):
             except Exception:
                 # kwargs doesn't match a variable in the message
                 # log the issue and the kwargs
-                LOG.exception(_('Exception in string format operation'))
+                LOG.exception(_LE('Exception in string format operation'))
                 for name, value in six.iteritems(kwargs):
                     LOG.error("%s: %s" % (name, value))
                 # at least get the core message out if something happened
@@ -269,6 +269,10 @@ def _safe_log(log_func, msg, msg_data):
                 d[k] = '<SANITIZED>'
             elif k.lower() in SANITIZE:
                 d[k] = '<SANITIZED>'
+            elif isinstance(d[k], list):
+                for e in d[k]:
+                    if isinstance(e, dict):
+                        _fix_passwords(e)
             elif isinstance(d[k], dict):
                 _fix_passwords(d[k])
         return d
@@ -285,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
     tb = traceback.format_exception(*failure_info)
     failure = failure_info[1]
     if log_failure:
-        LOG.error(_("Returning exception %s to caller"),
+        LOG.error(_LE("Returning exception %s to caller"),
                   six.text_type(failure))
         LOG.error(tb)
 
index f6eea936d2489cd887e7b96bae0f020b20052c65..0c8e119aa70ed4220cf3eb27c5472c85017faeae 100644 (file)
@@ -11,6 +11,7 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+
 """Fake RPC implementation which calls proxy methods directly with no
 queues.  Casts will block, but this is very useful for tests.
 """
@@ -139,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None):
     if not method:
         return
     args = msg.get('args', {})
-    version = msg.get('version', None)
-    namespace = msg.get('namespace', None)
+    version = msg.get('version')
+    namespace = msg.get('namespace')
 
     try:
         consumer = CONSUMERS[topic][0]
@@ -184,8 +185,8 @@ def fanout_cast(conf, context, topic, msg):
     if not method:
         return
     args = msg.get('args', {})
-    version = msg.get('version', None)
-    namespace = msg.get('namespace', None)
+    version = msg.get('version')
+    namespace = msg.get('namespace')
 
     for consumer in CONSUMERS.get(topic, []):
         try:
index b0cb70f9eae1568254d0542be506c23d710ad463..d17f64d78bb0df3d16e489bc6aab576835f15d1e 100644 (file)
@@ -29,7 +29,7 @@ from oslo.config import cfg
 import six
 
 from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
 from neutron.openstack.common import network_utils
 from neutron.openstack.common.rpc import amqp as rpc_amqp
 from neutron.openstack.common.rpc import common as rpc_common
@@ -38,9 +38,9 @@ from neutron.openstack.common import sslutils
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
                default='',
-               help='SSL version to use (valid only if SSL enabled). '
-                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
-                    'be available on some distributions'
+               help='If SSL is enabled, the SSL version to use. Valid '
+                    'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
+                    'be available on some distributions.'
                ),
     cfg.StrOpt('kombu_ssl_keyfile',
                default='',
@@ -63,33 +63,33 @@ kombu_opts = [
                 help='RabbitMQ HA cluster host:port pairs'),
     cfg.BoolOpt('rabbit_use_ssl',
                 default=False,
-                help='connect over SSL for RabbitMQ'),
+                help='Connect over SSL for RabbitMQ'),
     cfg.StrOpt('rabbit_userid',
                default='guest',
-               help='the RabbitMQ userid'),
+               help='The RabbitMQ userid'),
     cfg.StrOpt('rabbit_password',
                default='guest',
-               help='the RabbitMQ password',
+               help='The RabbitMQ password',
                secret=True),
     cfg.StrOpt('rabbit_virtual_host',
                default='/',
-               help='the RabbitMQ virtual host'),
+               help='The RabbitMQ virtual host'),
     cfg.IntOpt('rabbit_retry_interval',
                default=1,
-               help='how frequently to retry connecting with RabbitMQ'),
+               help='How frequently to retry connecting with RabbitMQ'),
     cfg.IntOpt('rabbit_retry_backoff',
                default=2,
-               help='how long to backoff for between retries when connecting '
+               help='How long to backoff for between retries when connecting '
                     'to RabbitMQ'),
     cfg.IntOpt('rabbit_max_retries',
                default=0,
-               help='maximum retries with trying to connect to RabbitMQ '
-                    '(the default of 0 implies an infinite retry count)'),
+               help='Maximum number of RabbitMQ connection retries. '
+                    'Default is 0 (infinite retry count)'),
     cfg.BoolOpt('rabbit_ha_queues',
                 default=False,
-                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
-                     'You need to wipe RabbitMQ database when '
-                     'changing this option.'),
+                help='Use HA queues in RabbitMQ (x-ha-policy: all). '
+                     'If you change this option, you must wipe the '
+                     'RabbitMQ database.'),
 
 ]
 
@@ -153,12 +153,12 @@ class ConsumerBase(object):
             callback(msg)
         except Exception:
             if self.ack_on_error:
-                LOG.exception(_("Failed to process message"
-                                " ... skipping it."))
+                LOG.exception(_LE("Failed to process message"
+                                  " ... skipping it."))
                 message.ack()
             else:
-                LOG.exception(_("Failed to process message"
-                                " ... will requeue."))
+                LOG.exception(_LE("Failed to process message"
+                                  " ... will requeue."))
                 message.requeue()
         else:
             message.ack()
@@ -458,6 +458,9 @@ class Connection(object):
 
         self.params_list = params_list
 
+        brokers_count = len(self.params_list)
+        self.next_broker_indices = itertools.cycle(range(brokers_count))
+
         self.memory_transport = self.conf.fake_rabbit
 
         self.connection = None
@@ -492,7 +495,7 @@ class Connection(object):
         be handled by the caller.
         """
         if self.connection:
-            LOG.info(_("Reconnecting to AMQP server on "
+            LOG.info(_LI("Reconnecting to AMQP server on "
                      "%(hostname)s:%(port)d") % params)
             try:
                 self.connection.release()
@@ -514,7 +517,7 @@ class Connection(object):
             self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
             consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+        LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
                  params)
 
     def reconnect(self):
@@ -528,7 +531,7 @@ class Connection(object):
 
         attempt = 0
         while True:
-            params = self.params_list[attempt % len(self.params_list)]
+            params = self.params_list[next(self.next_broker_indices)]
             attempt += 1
             try:
                 self._connect(params)
@@ -565,9 +568,9 @@ class Connection(object):
                 sleep_time = min(sleep_time, self.interval_max)
 
             log_info['sleep_time'] = sleep_time
-            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
-                        'unreachable: %(err_str)s. Trying again in '
-                        '%(sleep_time)d seconds.') % log_info)
+            LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
+                          'unreachable: %(err_str)s. Trying again in '
+                          '%(sleep_time)d seconds.') % log_info)
             time.sleep(sleep_time)
 
     def ensure(self, error_callback, method, *args, **kwargs):
@@ -619,7 +622,7 @@ class Connection(object):
 
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
-            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+            LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
                       "%(err_str)s") % log_info)
 
         def _declare_consumer():
@@ -637,11 +640,11 @@ class Connection(object):
 
         def _error_callback(exc):
             if isinstance(exc, socket.timeout):
-                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                LOG.debug('Timed out waiting for RPC response: %s' %
                           str(exc))
                 raise rpc_common.Timeout()
             else:
-                LOG.exception(_('Failed to consume message from queue: %s') %
+                LOG.exception(_LE('Failed to consume message from queue: %s') %
                               str(exc))
                 info['do_consume'] = True
 
@@ -680,7 +683,7 @@ class Connection(object):
 
         def _error_callback(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
-            LOG.exception(_("Failed to publish message to topic "
+            LOG.exception(_LE("Failed to publish message to topic "
                           "'%(topic)s': %(err_str)s") % log_info)
 
         def _publish():
index 03b12e5d4d8aafca3806a35dcfd4e03d74bb6b1d..6f2a3dc6c396634883aa2bf4b8d5c45edd06bf29 100644 (file)
@@ -23,7 +23,7 @@ from oslo.config import cfg
 import six
 
 from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import log as logging
@@ -188,7 +188,7 @@ class ConsumerBase(object):
             msg = rpc_common.deserialize_msg(message.content)
             self.callback(msg)
         except Exception:
-            LOG.exception(_("Failed to process message... skipping it."))
+            LOG.exception(_LE("Failed to process message... skipping it."))
         finally:
             # TODO(sandy): Need support for optional ack_on_error.
             self.session.acknowledge(message)
@@ -467,6 +467,10 @@ class Connection(object):
         self.brokers = params['qpid_hosts']
         self.username = params['username']
         self.password = params['password']
+
+        brokers_count = len(self.brokers)
+        self.next_broker_indices = itertools.cycle(range(brokers_count))
+
         self.connection_create(self.brokers[0])
         self.reconnect()
 
@@ -494,7 +498,6 @@ class Connection(object):
 
     def reconnect(self):
         """Handles reconnecting and re-establishing sessions and queues."""
-        attempt = 0
         delay = 1
         while True:
             # Close the session if necessary
@@ -504,21 +507,20 @@ class Connection(object):
                 except qpid_exceptions.ConnectionError:
                     pass
 
-            broker = self.brokers[attempt % len(self.brokers)]
-            attempt += 1
+            broker = self.brokers[next(self.next_broker_indices)]
 
             try:
                 self.connection_create(broker)
                 self.connection.open()
             except qpid_exceptions.ConnectionError as e:
                 msg_dict = dict(e=e, delay=delay)
-                msg = _("Unable to connect to AMQP server: %(e)s. "
-                        "Sleeping %(delay)s seconds") % msg_dict
+                msg = _LE("Unable to connect to AMQP server: %(e)s. "
+                          "Sleeping %(delay)s seconds") % msg_dict
                 LOG.error(msg)
                 time.sleep(delay)
-                delay = min(2 * delay, 60)
+                delay = min(delay + 1, 5)
             else:
-                LOG.info(_('Connected to AMQP server on %s'), broker)
+                LOG.info(_LI('Connected to AMQP server on %s'), broker)
                 break
 
         self.session = self.connection.session()
@@ -531,7 +533,7 @@ class Connection(object):
                 consumer.reconnect(self.session)
                 self._register_consumer(consumer)
 
-            LOG.debug(_("Re-established AMQP queues"))
+            LOG.debug("Re-established AMQP queues")
 
     def ensure(self, error_callback, method, *args, **kwargs):
         while True:
@@ -570,7 +572,7 @@ class Connection(object):
         """
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
-            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+            LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
                       "%(err_str)s") % log_info)
 
         def _declare_consumer():
@@ -585,11 +587,11 @@ class Connection(object):
 
         def _error_callback(exc):
             if isinstance(exc, qpid_exceptions.Empty):
-                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                LOG.debug('Timed out waiting for RPC response: %s' %
                           str(exc))
                 raise rpc_common.Timeout()
             else:
-                LOG.exception(_('Failed to consume message from queue: %s') %
+                LOG.exception(_LE('Failed to consume message from queue: %s') %
                               str(exc))
 
         def _consume():
@@ -597,7 +599,7 @@ class Connection(object):
             try:
                 self._lookup_consumer(nxt_receiver).consume()
             except Exception:
-                LOG.exception(_("Error processing message.  Skipping it."))
+                LOG.exception(_LE("Error processing message.  Skipping it."))
 
         for iteration in itertools.count(0):
             if limit and iteration >= limit:
@@ -624,7 +626,7 @@ class Connection(object):
 
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
-            LOG.exception(_("Failed to publish message to topic "
+            LOG.exception(_LE("Failed to publish message to topic "
                           "'%(topic)s': %(err_str)s") % log_info)
 
         def _publisher_send():
index 33fa95dd0252cf327e17137df7b4abe72d55f24c..fc4b92f05a4405357ffc6fa6a7ca09f44ae056ba 100644 (file)
@@ -27,7 +27,7 @@ import six
 from six import moves
 
 from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common.rpc import common as rpc_common
@@ -80,7 +80,7 @@ CONF = cfg.CONF
 CONF.register_opts(zmq_opts)
 
 ZMQ_CTX = None  # ZeroMQ Context, must be global.
-matchmaker = None  # memoized matchmaker object
+matchmaker = None  # memorized matchmaker object
 
 
 def _serialize(data):
@@ -93,12 +93,12 @@ def _serialize(data):
         return jsonutils.dumps(data, ensure_ascii=True)
     except TypeError:
         with excutils.save_and_reraise_exception():
-            LOG.error(_("JSON serialization failed."))
+            LOG.error(_LE("JSON serialization failed."))
 
 
 def _deserialize(data):
     """Deserialization wrapper."""
-    LOG.debug(_("Deserializing: %s"), data)
+    LOG.debug("Deserializing: %s", data)
     return jsonutils.loads(data)
 
 
@@ -133,9 +133,9 @@ class ZmqSocket(object):
         str_data = {'addr': addr, 'type': self.socket_s(),
                     'subscribe': subscribe, 'bind': bind}
 
-        LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
-        LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
-        LOG.debug(_("-> bind: %(bind)s"), str_data)
+        LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
+        LOG.debug("-> Subscribed to %(subscribe)s", str_data)
+        LOG.debug("-> bind: %(bind)s", str_data)
 
         try:
             if bind:
@@ -155,7 +155,7 @@ class ZmqSocket(object):
         """Subscribe."""
         if not self.can_sub:
             raise RPCException("Cannot subscribe on this socket.")
-        LOG.debug(_("Subscribing to %s"), msg_filter)
+        LOG.debug("Subscribing to %s", msg_filter)
 
         try:
             self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
@@ -192,7 +192,7 @@ class ZmqSocket(object):
             # it would be much worse if some of the code calling this
             # were to fail. For now, lets log, and later evaluate
             # if we can safely raise here.
-            LOG.error(_("ZeroMQ socket could not be closed."))
+            LOG.error(_LE("ZeroMQ socket could not be closed."))
         self.sock = None
 
     def recv(self, **kwargs):
@@ -264,7 +264,7 @@ class InternalContext(object):
 
     def _get_response(self, ctx, proxy, topic, data):
         """Process a curried message and cast the result to topic."""
-        LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+        LOG.debug("Running func with context: %s", ctx.to_dict())
         data.setdefault('version', None)
         data.setdefault('args', {})
 
@@ -277,13 +277,13 @@ class InternalContext(object):
             # ignore these since they are just from shutdowns
             pass
         except rpc_common.ClientException as e:
-            LOG.debug(_("Expected exception during message handling (%s)") %
+            LOG.debug("Expected exception during message handling (%s)" %
                       e._exc_info[1])
             return {'exc':
                     rpc_common.serialize_remote_exception(e._exc_info,
                                                           log_failure=False)}
         except Exception:
-            LOG.error(_("Exception during message handling"))
+            LOG.error(_LE("Exception during message handling"))
             return {'exc':
                     rpc_common.serialize_remote_exception(sys.exc_info())}
 
@@ -302,7 +302,7 @@ class InternalContext(object):
             self._get_response(ctx, proxy, topic, payload),
             ctx.replies)
 
-        LOG.debug(_("Sending reply"))
+        LOG.debug("Sending reply")
         _multi_send(_cast, ctx, topic, {
             'method': '-process_reply',
             'args': {
@@ -336,7 +336,7 @@ class ConsumerBase(object):
         # processed internally. (non-valid method name)
         method = data.get('method')
         if not method:
-            LOG.error(_("RPC message did not include method."))
+            LOG.error(_LE("RPC message did not include method."))
             return
 
         # Internal method
@@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase):
     def register(self, proxy, in_addr, zmq_type_in,
                  in_bind=True, subscribe=None):
 
-        LOG.info(_("Registering reactor"))
+        LOG.info(_LI("Registering reactor"))
 
         if zmq_type_in not in (zmq.PULL, zmq.SUB):
             raise RPCException("Bad input socktype")
@@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase):
         self.proxies[inq] = proxy
         self.sockets.append(inq)
 
-        LOG.info(_("In reactor registered"))
+        LOG.info(_LI("In reactor registered"))
 
     def consume_in_thread(self):
         @excutils.forever_retry_uncaught_exceptions
         def _consume(sock):
-            LOG.info(_("Consuming socket"))
+            LOG.info(_LI("Consuming socket"))
             while True:
                 self.consume(sock)
 
@@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
 
         if topic not in self.topic_proxy:
             def publisher(waiter):
-                LOG.info(_("Creating proxy for topic: %s"), topic)
+                LOG.info(_LI("Creating proxy for topic: %s"), topic)
 
                 try:
                     # The topic is received over the network,
@@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor):
             try:
                 wait_sock_creation.wait()
             except RPCException:
-                LOG.error(_("Topic socket file creation failed."))
+                LOG.error(_LE("Topic socket file creation failed."))
                 return
 
         try:
             self.topic_proxy[topic].put_nowait(data)
         except eventlet.queue.Full:
-            LOG.error(_("Local per-topic backlog buffer full for topic "
-                        "%(topic)s. Dropping message.") % {'topic': topic})
+            LOG.error(_LE("Local per-topic backlog buffer full for topic "
+                          "%(topic)s. Dropping message.") % {'topic': topic})
 
     def consume_in_thread(self):
         """Runs the ZmqProxy service."""
@@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor):
         except os.error:
             if not os.path.isdir(ipc_dir):
                 with excutils.save_and_reraise_exception():
-                    LOG.error(_("Required IPC directory does not exist at"
-                                " %s") % (ipc_dir, ))
+                    LOG.error(_LE("Required IPC directory does not exist at"
+                                  " %s") % (ipc_dir, ))
         try:
             self.register(consumption_proxy,
                           consume_in,
@@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor):
         except zmq.ZMQError:
             if os.access(ipc_dir, os.X_OK):
                 with excutils.save_and_reraise_exception():
-                    LOG.error(_("Permission denied to IPC directory at"
-                                " %s") % (ipc_dir, ))
+                    LOG.error(_LE("Permission denied to IPC directory at"
+                                  " %s") % (ipc_dir, ))
             with excutils.save_and_reraise_exception():
-                LOG.error(_("Could not create ZeroMQ receiver daemon. "
-                            "Socket may already be in use."))
+                LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
+                              "Socket may already be in use."))
 
         super(ZmqProxy, self).consume_in_thread()
 
@@ -541,7 +541,7 @@ class ZmqReactor(ZmqBaseReactor):
     def consume(self, sock):
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
-        LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+        LOG.debug("CONSUMER RECEIVED DATA: %s", data)
 
         proxy = self.proxies[sock]
 
@@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor):
             # Unmarshal only after verifying the message.
             ctx = RpcContext.unmarshal(data[3])
         else:
-            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+            LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
             return
 
         self.pool.spawn_n(self.process, proxy, ctx, request)
@@ -588,14 +588,14 @@ class Connection(rpc_common.Connection):
             topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
 
         if topic in self.topics:
-            LOG.info(_("Skipping topic registration. Already registered."))
+            LOG.info(_LI("Skipping topic registration. Already registered."))
             return
 
         # Receive messages from (local) proxy
         inaddr = "ipc://%s/zmq_topic_%s" % \
             (CONF.rpc_zmq_ipc_dir, topic)
 
-        LOG.debug(_("Consumer is a zmq.%s"),
+        LOG.debug("Consumer is a zmq.%s",
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
 
         self.reactor.register(proxy, inaddr, sock_type,
@@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None,
     # Replies always come into the reply service.
     reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
 
-    LOG.debug(_("Creating payload"))
+    LOG.debug("Creating payload")
     # Curry the original request into a reply method.
     mcontext = RpcContext.marshal(context)
     payload = {
@@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None,
         }
     }
 
-    LOG.debug(_("Creating queue socket for reply waiter"))
+    LOG.debug("Creating queue socket for reply waiter")
 
     # Messages arriving async.
     # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
@@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None,
                 zmq.SUB, subscribe=msg_id, bind=False
             )
 
-            LOG.debug(_("Sending cast"))
+            LOG.debug("Sending cast")
             _cast(addr, context, topic, payload, envelope)
 
-            LOG.debug(_("Cast sent; Waiting reply"))
+            LOG.debug("Cast sent; Waiting reply")
             # Blocks until receives reply
             msg = msg_waiter.recv()
-            LOG.debug(_("Received message: %s"), msg)
-            LOG.debug(_("Unpacking response"))
+            LOG.debug("Received message: %s", msg)
+            LOG.debug("Unpacking response")
 
             if msg[2] == 'cast':  # Legacy version
                 raw_msg = _deserialize(msg[-1])[-1]
@@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None,
     Dispatches to the matchmaker and sends message to all relevant hosts.
     """
     conf = CONF
-    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+    LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
 
     queues = _get_matchmaker().queues(topic)
-    LOG.debug(_("Sending message(s) to: %s"), queues)
+    LOG.debug("Sending message(s) to: %s", queues)
 
     # Don't stack if we have no matchmaker results
     if not queues:
index 26b6b9f76fca78ff1d10b288ca731206c1b3ac73..21c60c90bab0d6dfcb9f4f6be50eb90175627387 100644 (file)
@@ -11,6 +11,7 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+
 """
 The MatchMaker classes should except a Topic or Fanout exchange key and
 return keys for direct exchanges, per (approximate) AMQP parlance.
@@ -21,7 +22,7 @@ import contextlib
 import eventlet
 from oslo.config import cfg
 
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LI
 from neutron.openstack.common import log as logging
 
 
@@ -212,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         self.hosts.discard(host)
         self.backend_unregister(key, '.'.join((key, host)))
 
-        LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+        LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
                  {'key': key, 'host': host})
 
     def start_heartbeat(self):
index 0458060bf3de540962725eaac1629a191df6fe90..7b1fdc3c658a3f8f2ee58a2a0c7579c67663b2ec 100644 (file)
@@ -11,6 +11,7 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+
 """
 The MatchMaker classes should accept a Topic or Fanout exchange key and
 return keys for direct exchanges, per (approximate) AMQP parlance.
index 831741922ea03a7397c0ad1374ad5a14ebefe94e..9d106edbd93da351791137f32a64117e835bbb39 100644 (file)
@@ -11,6 +11,7 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+
 """
 The MatchMaker classes should except a Topic or Fanout exchange key and
 return keys for direct exchanges, per (approximate) AMQP parlance.
@@ -21,7 +22,7 @@ import json
 
 from oslo.config import cfg
 
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _LW
 from neutron.openstack.common import log as logging
 from neutron.openstack.common.rpc import matchmaker as mm
 
@@ -52,9 +53,8 @@ class RingExchange(mm.Exchange):
         if ring:
             self.ring = ring
         else:
-            fh = open(CONF.matchmaker_ring.ringfile, 'r')
-            self.ring = json.load(fh)
-            fh.close()
+            with open(CONF.matchmaker_ring.ringfile, 'r') as fh:
+                self.ring = json.load(fh)
 
         self.ring0 = {}
         for k in self.ring.keys():
@@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange):
     def run(self, key):
         if not self._ring_has(key):
             LOG.warn(
-                _("No key defining hosts for topic '%s', "
-                  "see ringfile") % (key, )
+                _LW("No key defining hosts for topic '%s', "
+                    "see ringfile") % (key, )
             )
             return []
         host = next(self.ring0[key])
@@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange):
         nkey = key.split('fanout~')[1:][0]
         if not self._ring_has(nkey):
             LOG.warn(
-                _("No key defining hosts for topic '%s', "
-                  "see ringfile") % (nkey, )
+                _LW("No key defining hosts for topic '%s', "
+                    "see ringfile") % (nkey, )
             )
             return []
         return map(lambda x: (key + '.' + x, x), self.ring[nkey])
index 4479dcc3ac9e1994bf5fac6cd2b1d7aec65a426d..ae8c56120f5a86deac8f594ea9bebe67c83db1e5 100644 (file)
@@ -15,7 +15,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import rpc
 from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
@@ -44,7 +43,7 @@ class Service(service.Service):
         super(Service, self).start()
 
         self.conn = rpc.create_connection(new=True)
-        LOG.debug(_("Creating Consumer connection for Service %s") %
+        LOG.debug("Creating Consumer connection for Service %s" %
                   self.topic)
 
         dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],