+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import six
+
+
+six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
"""
import copy
+import functools
import gettext
import locale
from logging import handlers
import os
-import re
from babel import localedata
import six
_localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
_t = gettext.translation('neutron', localedir=_localedir, fallback=True)
+# We use separate translation catalogs for each log level, so set up a
+# mapping between the log level name and the translator. The domain
+# for the log level is project_name + "-log-" + log_level so messages
+# for each level end up in their own catalog.
+_t_log_levels = dict(
+ (level, gettext.translation('neutron' + '-log-' + level,
+ localedir=_localedir,
+ fallback=True))
+ for level in ['info', 'warning', 'error', 'critical']
+)
+
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
return _t.ugettext(msg)
+def _log_translation(msg, level):
+ """Build a single translation of a log message
+ """
+ if USE_LAZY:
+ return Message(msg, domain='neutron' + '-log-' + level)
+ else:
+ translator = _t_log_levels[level]
+ if six.PY3:
+ return translator.gettext(msg)
+ return translator.ugettext(msg)
+
+# Translators for log levels.
+#
+# The abbreviated names are meant to reflect the usual use of a short
+# name like '_'. The "L" is for "log" and the other letter comes from
+# the level.
+_LI = functools.partial(_log_translation, level='info')
+_LW = functools.partial(_log_translation, level='warning')
+_LE = functools.partial(_log_translation, level='error')
+_LC = functools.partial(_log_translation, level='critical')
+
+
def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
and can be treated as such.
"""
- def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
+ def __new__(cls, msgid, msgtext=None, params=None,
+ domain='neutron', *args):
"""Create a new Message object.
In order for translation to work gettext requires a message ID, this
if other is None:
params = (other,)
elif isinstance(other, dict):
- params = self._trim_dictionary_parameters(other)
- else:
- params = self._copy_param(other)
- return params
-
- def _trim_dictionary_parameters(self, dict_param):
- """Return a dict that only has matching entries in the msgid."""
- # NOTE(luisg): Here we trim down the dictionary passed as parameters
- # to avoid carrying a lot of unnecessary weight around in the message
- # object, for example if someone passes in Message() % locals() but
- # only some params are used, and additionally we prevent errors for
- # non-deepcopyable objects by unicoding() them.
-
- # Look for %(param) keys in msgid;
- # Skip %% and deal with the case where % is first character on the line
- keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
-
- # If we don't find any %(param) keys but have a %s
- if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
- # Apparently the full dictionary is the parameter
- params = self._copy_param(dict_param)
- else:
+ # Merge the dictionaries
+ # Copy each item in case one does not support deep copy.
params = {}
- # Save our existing parameters as defaults to protect
- # ourselves from losing values if we are called through an
- # (erroneous) chain that builds a valid Message with
- # arguments, and then does something like "msg % kwds"
- # where kwds is an empty dictionary.
- src = {}
if isinstance(self.params, dict):
- src.update(self.params)
- src.update(dict_param)
- for key in keys:
- params[key] = self._copy_param(src[key])
-
+ for key, val in self.params.items():
+ params[key] = self._copy_param(val)
+ for key, val in other.items():
+ params[key] = self._copy_param(val)
+ else:
+ params = self._copy_param(other)
return params
def _copy_param(self, param):
try:
return copy.deepcopy(param)
- except TypeError:
+ except Exception:
# Fallback to casting to unicode this will handle the
# python code-like objects that can't be deep-copied
return six.text_type(param)
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
+
for i in locale_identifiers:
if find(i) is not None:
language_list.append(i)
+
+ # NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
+ # locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
+ # are perfectly legitimate locales:
+ # https://github.com/mitsuhiko/babel/issues/37
+ # In Babel 1.3 they fixed the bug and they support these locales, but
+ # they are still not explicitly "listed" by locale_identifiers().
+ # That is why we add the locales here explicitly if necessary so that
+ # they are listed as supported.
+ aliases = {'zh': 'zh_CN',
+ 'zh_Hant_HK': 'zh_HK',
+ 'zh_Hant': 'zh_TW',
+ 'fil': 'tl_PH'}
+ for (locale, alias) in six.iteritems(aliases):
+ if locale in language_list and alias not in language_list:
+ language_list.append(alias)
+
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)
rpc.proxy
"""
-import inspect
-
from oslo.config import cfg
-from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
-from neutron.openstack.common import local
from neutron.openstack.common import log as logging
return _get_impl().create_connection(CONF, new=new)
-def _check_for_lock():
- if not CONF.debug:
- return None
-
- if ((hasattr(local.strong_store, 'locks_held')
- and local.strong_store.locks_held)):
- stack = ' :: '.join([frame[3] for frame in inspect.stack()])
- LOG.warn(_('A RPC is being made while holding a lock. The locks '
- 'currently held are %(locks)s. This is probably a bug. '
- 'Please report it. Include the following: [%(stack)s].'),
- {'locks': local.strong_store.locks_held,
- 'stack': stack})
- return True
-
- return False
-
-
-def call(context, topic, msg, timeout=None, check_for_lock=False):
+def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
- :param check_for_lock: if True, a warning is emitted if a RPC call is made
- with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- if check_for_lock:
- _check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
return _get_impl().fanout_cast(CONF, context, topic, msg)
-def multicall(context, topic, msg, timeout=None, check_for_lock=False):
+def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
- :param check_for_lock: if True, a warning is emitted if a RPC call is made
- with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- if check_for_lock:
- _check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import common as rpc_common
# TODO(comstud): Timeout connections not used in a while
def create(self):
- LOG.debug(_('Pool creating new connection'))
+ LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf)
def empty(self):
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
- rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+ rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
return ctx
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
- LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+ LOG.debug('UNIQUE_ID is %s.' % (unique_id))
class _ThreadPoolWithWait(object):
# the previous context is stored in local.store.context
if hasattr(local.store, 'context'):
del local.store.context
- rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ rpc_common._safe_log(LOG.debug, 'received %s', message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
- LOG.debug(_('Expected exception during message handling (%s)') %
+ LOG.debug('Expected exception during message handling (%s)' %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
- LOG.error(_('Exception during message handling'),
+ LOG.error(_LE('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- LOG.debug(_('Making synchronous call on %s ...'), topic)
+ LOG.debug('Making synchronous call on %s ...', topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
- LOG.debug(_('MSG_ID is %s') % (msg_id))
+ LOG.debug('MSG_ID is %s' % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
- LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ LOG.debug('Making asynchronous cast on %s...', topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
- LOG.debug(_('Making asynchronous fanout cast...'))
+ LOG.debug('Making asynchronous fanout cast...')
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
- LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+ LOG.debug('Sending %(event_type)s on %(topic)s',
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
from oslo.config import cfg
import six
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import local
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
- LOG.exception(_('Exception in string format operation'))
+ LOG.exception(_LE('Exception in string format operation'))
for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
+ elif isinstance(d[k], list):
+ for e in d[k]:
+ if isinstance(e, dict):
+ _fix_passwords(e)
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
- LOG.error(_("Returning exception %s to caller"),
+ LOG.error(_LE("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
"""Fake RPC implementation which calls proxy methods directly with no
queues. Casts will block, but this is very useful for tests.
"""
if not method:
return
args = msg.get('args', {})
- version = msg.get('version', None)
- namespace = msg.get('namespace', None)
+ version = msg.get('version')
+ namespace = msg.get('namespace')
try:
consumer = CONSUMERS[topic][0]
if not method:
return
args = msg.get('args', {})
- version = msg.get('version', None)
- namespace = msg.get('namespace', None)
+ version = msg.get('version')
+ namespace = msg.get('namespace')
for consumer in CONSUMERS.get(topic, []):
try:
import six
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import network_utils
from neutron.openstack.common.rpc import amqp as rpc_amqp
from neutron.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
- help='SSL version to use (valid only if SSL enabled). '
- 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
- 'be available on some distributions'
+ help='If SSL is enabled, the SSL version to use. Valid '
+ 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
+ 'be available on some distributions.'
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
- help='connect over SSL for RabbitMQ'),
+ help='Connect over SSL for RabbitMQ'),
cfg.StrOpt('rabbit_userid',
default='guest',
- help='the RabbitMQ userid'),
+ help='The RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
- help='the RabbitMQ password',
+ help='The RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
- help='the RabbitMQ virtual host'),
+ help='The RabbitMQ virtual host'),
cfg.IntOpt('rabbit_retry_interval',
default=1,
- help='how frequently to retry connecting with RabbitMQ'),
+ help='How frequently to retry connecting with RabbitMQ'),
cfg.IntOpt('rabbit_retry_backoff',
default=2,
- help='how long to backoff for between retries when connecting '
+ help='How long to backoff for between retries when connecting '
'to RabbitMQ'),
cfg.IntOpt('rabbit_max_retries',
default=0,
- help='maximum retries with trying to connect to RabbitMQ '
- '(the default of 0 implies an infinite retry count)'),
+ help='Maximum number of RabbitMQ connection retries. '
+ 'Default is 0 (infinite retry count)'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
- help='use H/A queues in RabbitMQ (x-ha-policy: all).'
- 'You need to wipe RabbitMQ database when '
- 'changing this option.'),
+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
+ 'If you change this option, you must wipe the '
+ 'RabbitMQ database.'),
]
callback(msg)
except Exception:
if self.ack_on_error:
- LOG.exception(_("Failed to process message"
- " ... skipping it."))
+ LOG.exception(_LE("Failed to process message"
+ " ... skipping it."))
message.ack()
else:
- LOG.exception(_("Failed to process message"
- " ... will requeue."))
+ LOG.exception(_LE("Failed to process message"
+ " ... will requeue."))
message.requeue()
else:
message.ack()
self.params_list = params_list
+ brokers_count = len(self.params_list)
+ self.next_broker_indices = itertools.cycle(range(brokers_count))
+
self.memory_transport = self.conf.fake_rabbit
self.connection = None
be handled by the caller.
"""
if self.connection:
- LOG.info(_("Reconnecting to AMQP server on "
+ LOG.info(_LI("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
self.connection.release()
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+ LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
params)
def reconnect(self):
attempt = 0
while True:
- params = self.params_list[attempt % len(self.params_list)]
+ params = self.params_list[next(self.next_broker_indices)]
attempt += 1
try:
self._connect(params)
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
- LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
- 'unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
+ 'unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+ LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
def _error_callback(exc):
if isinstance(exc, socket.timeout):
- LOG.debug(_('Timed out waiting for RPC response: %s') %
+ LOG.debug('Timed out waiting for RPC response: %s' %
str(exc))
raise rpc_common.Timeout()
else:
- LOG.exception(_('Failed to consume message from queue: %s') %
+ LOG.exception(_LE('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.exception(_("Failed to publish message to topic "
+ LOG.exception(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publish():
import six
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
+ LOG.exception(_LE("Failed to process message... skipping it."))
finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
+
+ brokers_count = len(self.brokers)
+ self.next_broker_indices = itertools.cycle(range(brokers_count))
+
self.connection_create(self.brokers[0])
self.reconnect()
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues."""
- attempt = 0
delay = 1
while True:
# Close the session if necessary
except qpid_exceptions.ConnectionError:
pass
- broker = self.brokers[attempt % len(self.brokers)]
- attempt += 1
+ broker = self.brokers[next(self.next_broker_indices)]
try:
self.connection_create(broker)
self.connection.open()
except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay)
- msg = _("Unable to connect to AMQP server: %(e)s. "
- "Sleeping %(delay)s seconds") % msg_dict
+ msg = _LE("Unable to connect to AMQP server: %(e)s. "
+ "Sleeping %(delay)s seconds") % msg_dict
LOG.error(msg)
time.sleep(delay)
- delay = min(2 * delay, 60)
+ delay = min(delay + 1, 5)
else:
- LOG.info(_('Connected to AMQP server on %s'), broker)
+ LOG.info(_LI('Connected to AMQP server on %s'), broker)
break
self.session = self.connection.session()
consumer.reconnect(self.session)
self._register_consumer(consumer)
- LOG.debug(_("Re-established AMQP queues"))
+ LOG.debug("Re-established AMQP queues")
def ensure(self, error_callback, method, *args, **kwargs):
while True:
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+ LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
- LOG.debug(_('Timed out waiting for RPC response: %s') %
+ LOG.debug('Timed out waiting for RPC response: %s' %
str(exc))
raise rpc_common.Timeout()
else:
- LOG.exception(_('Failed to consume message from queue: %s') %
+ LOG.exception(_LE('Failed to consume message from queue: %s') %
str(exc))
def _consume():
try:
self._lookup_consumer(nxt_receiver).consume()
except Exception:
- LOG.exception(_("Error processing message. Skipping it."))
+ LOG.exception(_LE("Error processing message. Skipping it."))
for iteration in itertools.count(0):
if limit and iteration >= limit:
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.exception(_("Failed to publish message to topic "
+ LOG.exception(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
from six import moves
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common.rpc import common as rpc_common
CONF.register_opts(zmq_opts)
ZMQ_CTX = None # ZeroMQ Context, must be global.
-matchmaker = None # memoized matchmaker object
+matchmaker = None # memorized matchmaker object
def _serialize(data):
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
with excutils.save_and_reraise_exception():
- LOG.error(_("JSON serialization failed."))
+ LOG.error(_LE("JSON serialization failed."))
def _deserialize(data):
"""Deserialization wrapper."""
- LOG.debug(_("Deserializing: %s"), data)
+ LOG.debug("Deserializing: %s", data)
return jsonutils.loads(data)
str_data = {'addr': addr, 'type': self.socket_s(),
'subscribe': subscribe, 'bind': bind}
- LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
- LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
- LOG.debug(_("-> bind: %(bind)s"), str_data)
+ LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
+ LOG.debug("-> Subscribed to %(subscribe)s", str_data)
+ LOG.debug("-> bind: %(bind)s", str_data)
try:
if bind:
"""Subscribe."""
if not self.can_sub:
raise RPCException("Cannot subscribe on this socket.")
- LOG.debug(_("Subscribing to %s"), msg_filter)
+ LOG.debug("Subscribing to %s", msg_filter)
try:
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
- LOG.error(_("ZeroMQ socket could not be closed."))
+ LOG.error(_LE("ZeroMQ socket could not be closed."))
self.sock = None
def recv(self, **kwargs):
def _get_response(self, ctx, proxy, topic, data):
"""Process a curried message and cast the result to topic."""
- LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+ LOG.debug("Running func with context: %s", ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', {})
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException as e:
- LOG.debug(_("Expected exception during message handling (%s)") %
+ LOG.debug("Expected exception during message handling (%s)" %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception:
- LOG.error(_("Exception during message handling"))
+ LOG.error(_LE("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
self._get_response(ctx, proxy, topic, payload),
ctx.replies)
- LOG.debug(_("Sending reply"))
+ LOG.debug("Sending reply")
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
# processed internally. (non-valid method name)
method = data.get('method')
if not method:
- LOG.error(_("RPC message did not include method."))
+ LOG.error(_LE("RPC message did not include method."))
return
# Internal method
def register(self, proxy, in_addr, zmq_type_in,
in_bind=True, subscribe=None):
- LOG.info(_("Registering reactor"))
+ LOG.info(_LI("Registering reactor"))
if zmq_type_in not in (zmq.PULL, zmq.SUB):
raise RPCException("Bad input socktype")
self.proxies[inq] = proxy
self.sockets.append(inq)
- LOG.info(_("In reactor registered"))
+ LOG.info(_LI("In reactor registered"))
def consume_in_thread(self):
@excutils.forever_retry_uncaught_exceptions
def _consume(sock):
- LOG.info(_("Consuming socket"))
+ LOG.info(_LI("Consuming socket"))
while True:
self.consume(sock)
if topic not in self.topic_proxy:
def publisher(waiter):
- LOG.info(_("Creating proxy for topic: %s"), topic)
+ LOG.info(_LI("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
try:
wait_sock_creation.wait()
except RPCException:
- LOG.error(_("Topic socket file creation failed."))
+ LOG.error(_LE("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
except eventlet.queue.Full:
- LOG.error(_("Local per-topic backlog buffer full for topic "
- "%(topic)s. Dropping message.") % {'topic': topic})
+ LOG.error(_LE("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service."""
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
- LOG.error(_("Required IPC directory does not exist at"
- " %s") % (ipc_dir, ))
+ LOG.error(_LE("Required IPC directory does not exist at"
+ " %s") % (ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
- LOG.error(_("Permission denied to IPC directory at"
- " %s") % (ipc_dir, ))
+ LOG.error(_LE("Permission denied to IPC directory at"
+ " %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception():
- LOG.error(_("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
+ LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
def consume(self, sock):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+ LOG.debug("CONSUMER RECEIVED DATA: %s", data)
proxy = self.proxies[sock]
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
- LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
- LOG.info(_("Skipping topic registration. Already registered."))
+ LOG.info(_LI("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
- LOG.debug(_("Consumer is a zmq.%s"),
+ LOG.debug("Consumer is a zmq.%s",
['PULL', 'SUB'][sock_type == zmq.SUB])
self.reactor.register(proxy, inaddr, sock_type,
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
- LOG.debug(_("Creating payload"))
+ LOG.debug("Creating payload")
# Curry the original request into a reply method.
mcontext = RpcContext.marshal(context)
payload = {
}
}
- LOG.debug(_("Creating queue socket for reply waiter"))
+ LOG.debug("Creating queue socket for reply waiter")
# Messages arriving async.
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
zmq.SUB, subscribe=msg_id, bind=False
)
- LOG.debug(_("Sending cast"))
+ LOG.debug("Sending cast")
_cast(addr, context, topic, payload, envelope)
- LOG.debug(_("Cast sent; Waiting reply"))
+ LOG.debug("Cast sent; Waiting reply")
# Blocks until receives reply
msg = msg_waiter.recv()
- LOG.debug(_("Received message: %s"), msg)
- LOG.debug(_("Unpacking response"))
+ LOG.debug("Received message: %s", msg)
+ LOG.debug("Unpacking response")
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
- LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+ LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = _get_matchmaker().queues(topic)
- LOG.debug(_("Sending message(s) to: %s"), queues)
+ LOG.debug("Sending message(s) to: %s", queues)
# Don't stack if we have no matchmaker results
if not queues:
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
import eventlet
from oslo.config import cfg
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _, _LI
from neutron.openstack.common import log as logging
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
- LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+ LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
from oslo.config import cfg
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import matchmaker as mm
if ring:
self.ring = ring
else:
- fh = open(CONF.matchmaker_ring.ringfile, 'r')
- self.ring = json.load(fh)
- fh.close()
+ with open(CONF.matchmaker_ring.ringfile, 'r') as fh:
+ self.ring = json.load(fh)
self.ring0 = {}
for k in self.ring.keys():
def run(self, key):
if not self._ring_has(key):
LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (key, )
+ _LW("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
+ _LW("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
# License for the specific language governing permissions and limitations
# under the License.
-from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
- LOG.debug(_("Creating Consumer connection for Service %s") %
+ LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],