-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
Exception related utilities.
"""
-import contextlib
import logging
import sys
+import time
import traceback
+import six
+
from cinder.openstack.common.gettextutils import _
-@contextlib.contextmanager
-def save_and_reraise_exception():
+class save_and_reraise_exception(object):
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised.
+
+ In some cases the caller may not want to re-raise the exception, and
+ for those circumstances this context provides a reraise flag that
+ can be used to suppress the exception. For example::
+
+ except Exception:
+ with save_and_reraise_exception() as ctxt:
+ decide_if_need_reraise()
+ if not should_be_reraised:
+ ctxt.reraise = False
"""
- type_, value, tb = sys.exc_info()
- try:
- yield
- except Exception:
- logging.error(_('Original exception being dropped: %s'),
- traceback.format_exception(type_, value, tb))
- raise
- raise type_, value, tb
+ def __init__(self):
+ self.reraise = True
+
+ def __enter__(self):
+ self.type_, self.value, self.tb, = sys.exc_info()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ logging.error(_('Original exception being dropped: %s'),
+ traceback.format_exception(self.type_,
+ self.value,
+ self.tb))
+ return False
+ if self.reraise:
+ six.reraise(self.type_, self.value, self.tb)
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ this_exc_message = six.u(str(exc))
+ if this_exc_message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ this_exc_message != last_exc_message):
+ logging.exception(
+ _('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = this_exc_message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
"""
import inspect
-import logging
from oslo.config import cfg
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import local
+from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
- 'upon receiving exception data from an rpc call.'),
+ ' upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
def cleanup():
- """Clean up resoruces in use by implementation.
+ """Clean up resources in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>.
"""
- return '%s:%s' % (topic, host) if host else topic
+ return '%s.%s' % (topic, host) if host else topic
_RPCIMPL = None
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
"""
Shared code between AMQP based openstack.common.rpc implementations.
-The code in this module is shared between the rpc implemenations based on AMQP.
-Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
-AMQP, but is deprecated and predates this code.
+The code in this module is shared between the rpc implementations based on
+AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
+uses AMQP, but is deprecated and predates this code.
"""
import collections
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
from oslo.config import cfg
+import six
+
from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
amqp_opts = [
- # TODO(pekowski): Remove this option in Havana.
- cfg.BoolOpt('amqp_rpc_single_reply_queue',
- default=False,
- help='Enable a fast single reply queue if using AMQP based '
- 'RPC like RabbitMQ or Qpid.'),
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the teatDown() method. In the run
+ # pool. The unit tests get here via the tearDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
class ConnectionContext(rpc_common.Connection):
- """The class that is actually returned to the caller of
- create_connection(). This is essentially a wrapper around
- Connection that supports 'with'. It can also return a new
- Connection, or one from a pool. The function will also catch
- when an instance of this class is to be deleted. With that
- we can return Connections to the pool on exceptions and so
- forth without making the caller be responsible for catching
- them. If possible the function makes sure to return a
- connection to the pool.
+ """The class that is actually returned to the create_connection() caller.
+
+ This is essentially a wrapper around Connection that supports 'with'.
+ It can also return a new Connection, or one from a pool.
+
+ The function will also catch when an instance of this class is to be
+ deleted. With that we can return Connections to the pool on exceptions
+ and so forth without making the caller be responsible for catching them.
+ If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
- """Create a new connection, or get one from the pool"""
+ """Create a new connection, or get one from the pool."""
self.connection = None
self.conf = conf
self.connection_pool = connection_pool
self.pooled = pooled
def __enter__(self):
- """When with ConnectionContext() is used, return self"""
+ """When with ConnectionContext() is used, return self."""
return self
def _done(self):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+ ack_on_error=True):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
- exchange_name)
+ exchange_name,
+ ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
def __getattr__(self, key):
- """Proxy all other calls to the Connection instance"""
+ """Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
class ReplyProxy(ConnectionContext):
- """ Connection class for RPC replies / callbacks """
+ """Connection class for RPC replies / callbacks."""
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
- self._num_call_waiters_wrn_threshhold = 10
+ self._num_call_waiters_wrn_threshold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %s'
- ', message : %s') % (msg_id, message_data))
+ LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
+ ', message : %(data)s'), {'msg_id': msg_id,
+ 'data': message_data})
+ LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
else:
waiter.put(message_data)
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
- if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
LOG.warn(_('Number of call waiters is greater than warning '
- 'threshhold: %d. There could be a MulticallProxyWaiter '
- 'leak.') % self._num_call_waiters_wrn_threshhold)
- self._num_call_waiters_wrn_threshhold *= 2
+ 'threshold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshold)
+ self._num_call_waiters_wrn_threshold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
- try:
- msg = {'result': reply, 'failure': failure}
- except TypeError:
- msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
- # Otherwise use the msg_id for backward compatibilty.
+ # Otherwise use the msg_id for backward compatibility.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
- """Context that supports replying to a rpc.call"""
+ """Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
for args at some point.
"""
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
+ if isinstance(context, dict):
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in six.iteritems(context)])
+ else:
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in
+ six.iteritems(context.to_dict())])
+
msg.update(context_d)
class _ThreadPoolWithWait(object):
- """Base class for a delayed invocation manager used by
- the Connection class to start up green threads
+ """Base class for a delayed invocation manager.
+
+ Used by the Connection class to start up green threads
to handle incoming messages.
"""
class CallbackWrapper(_ThreadPoolWithWait):
- """Wraps a straight callback to allow it to be invoked in a green
- thread.
+ """Wraps a straight callback.
+
+ Allows it to be invoked in a green thread.
"""
- def __init__(self, conf, callback, connection_pool):
- """
+ def __init__(self, conf, callback, connection_pool,
+ wait_for_consumers=False):
+ """Initiates CallbackWrapper object.
+
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
+ :param wait_for_consumers: wait for all green threads to
+ complete and raise the last
+ caught exception, if any.
+
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
+ self.wait_for_consumers = wait_for_consumers
+ self.exc_info = None
+
+ def _wrap(self, message_data, **kwargs):
+ """Wrap the callback invocation to catch exceptions.
+ """
+ try:
+ self.callback(message_data, **kwargs)
+ except Exception:
+ self.exc_info = sys.exc_info()
def __call__(self, message_data):
- self.pool.spawn_n(self.callback, message_data)
+ self.exc_info = None
+ self.pool.spawn_n(self._wrap, message_data)
+
+ if self.wait_for_consumers:
+ self.pool.waitall()
+ if self.exc_info:
+ six.reraise(self.exc_info[1], None, self.exc_info[2])
class ProxyCallback(_ThreadPoolWithWait):
return result
def __iter__(self):
- """Return a result until we get a reply with an 'ending" flag"""
+ """Return a result until we get a reply with an 'ending' flag."""
if self._done:
raise StopIteration
while True:
yield result
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
- def __init__(self, conf, connection, timeout):
- self._connection = connection
- self._iterator = connection.iterconsume(timeout=timeout or
- conf.rpc_response_timeout)
- self._result = None
- self._done = False
- self._got_ending = False
- self._conf = conf
- self.msg_id_cache = _MsgIdCache()
-
- def done(self):
- if self._done:
- return
- self._done = True
- self._iterator.close()
- self._iterator = None
- self._connection.close()
-
- def __call__(self, data):
- """The consume() callback will call this. Store the result."""
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
-
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._result = data['result']
-
- def __iter__(self):
- """Return a result until we get a 'None' response from consumer"""
- if self._done:
- raise StopIteration
- while True:
- try:
- self._iterator.next()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- result = self._result
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
def create_connection(conf, new, connection_pool):
- """Create a connection"""
+ """Create a connection."""
return ConnectionContext(conf, connection_pool, pooled=not new)
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- # TODO(pekowski): Remove all these comments in Havana.
- # For amqp_rpc_single_reply_queue = False,
- # Can't use 'with' for multicall, as it returns an iterator
- # that will continue to use the connection. When it's done,
- # connection.close() will get called which will put it back into
- # the pool
- # For amqp_rpc_single_reply_queue = True,
- # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
_add_unique_id(msg)
pack_context(msg, context)
- # TODO(pekowski): Remove this flag and the code under the if clause
- # in Havana.
- if not conf.amqp_rpc_single_reply_queue:
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- else:
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
import traceback
from oslo.config import cfg
+import six
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common import local
from cinder.openstack.common import log as logging
+from cinder.openstack.common import versionutils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+_RPC_ENVELOPE_VERSION = '2.0'
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.0) is very simple. It is:
+The current message format (version 2.0) is very simple. It is::
{
'oslo.version': <RPC Envelope Version as a String>,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
+_REMOTE_POSTFIX = '_Remote'
+
class RPCException(Exception):
- message = _("An unknown RPC related exception occurred.")
+ msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
- message = self.message % kwargs
+ message = self.msg_fmt % kwargs
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
- for name, value in kwargs.iteritems():
+ for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
- message = self.message
+ message = self.msg_fmt
super(RPCException, self).__init__(message)
contains all of the relevant info.
"""
- message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+ msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _('Timeout while waiting on RPC response - '
+ msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
def __init__(self, info=None, topic=None, method=None):
- """
+ """Initiates Timeout object.
+
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
class DuplicateMessageError(RPCException):
- message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+ msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
- message = _("Invalid reuse of an RPC connection.")
+ msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException):
- message = _("Specified RPC version, %(version)s, not supported by "
+ msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
- message = _("Specified RPC envelope version, %(version)s, "
+ msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
+class RpcVersionCapError(RPCException):
+ msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
raise NotImplementedError()
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
- """Register as a member of a group of consumers for a given topic from
- the specified exchange.
+ """Register as a member of a group of consumers.
+ Uses given topic from the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': [('args', 'new_pass')],
- 'run_instance': [('args', 'admin_password')],
- 'route_message': [('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'password'),
- ('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'admin_password')]}
-
- has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
- has_context_token = '_context_auth_token' in msg_data
- has_token = 'auth_token' in msg_data
-
- if not any([has_method, has_context_token, has_token]):
- return log_func(msg, msg_data)
-
- msg_data = copy.deepcopy(msg_data)
-
- if has_method:
- for arg in SANITIZE.get(msg_data['method'], []):
- try:
- d = msg_data
- for elem in arg[:-1]:
- d = d[elem]
- d[arg[-1]] = '<SANITIZED>'
- except KeyError, e:
- LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
- {'item': arg,
- 'err': e})
-
- if has_context_token:
- msg_data['_context_auth_token'] = '<SANITIZED>'
-
- if has_token:
- msg_data['auth_token'] = '<SANITIZED>'
-
- return log_func(msg, msg_data)
+ SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
+
+ def _fix_passwords(d):
+ """Sanitizes the password fields in the dictionary."""
+ for k in six.iterkeys(d):
+ if k.lower().find('password') != -1:
+ d[k] = '<SANITIZED>'
+ elif k.lower() in SANITIZE:
+ d[k] = '<SANITIZED>'
+ elif isinstance(d[k], list):
+ for e in d[k]:
+ if isinstance(e, dict):
+ _fix_passwords(e)
+ elif isinstance(d[k], dict):
+ _fix_passwords(d[k])
+ return d
+
+ return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(_("Returning exception %s to caller"),
+ six.text_type(failure))
LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
kwargs = failure.kwargs
+ # NOTE(matiu): With cells, it's possible to re-raise remote, remote
+ # exceptions. Lets turn it back into the original exception type.
+ cls_name = str(failure.__class__.__name__)
+ mod_name = str(failure.__class__.__module__)
+ if (cls_name.endswith(_REMOTE_POSTFIX) and
+ mod_name.endswith(_REMOTE_POSTFIX)):
+ cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
+ mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
+
data = {
- 'class': str(failure.__class__.__name__),
- 'module': str(failure.__class__.__module__),
- 'message': unicode(failure),
+ 'class': cls_name,
+ 'module': mod_name,
+ 'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs
ex_type = type(failure)
str_override = lambda self: message
- new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+ new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
{'__str__': str_override, '__unicode__': str_override})
+ new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
class ClientException(Exception):
- """This encapsulates some actual exception that is expected to be
- hit by an RPC proxy object. Merely instantiating it records the
- current exception information, which will be passed back to the
- RPC client without exceptional logging."""
+ """Encapsulates actual exception expected to be hit by a RPC proxy object.
+
+ Merely instantiating it records the current exception information, which
+ will be passed back to the RPC client without exceptional logging.
+ """
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
- except Exception, e:
+ except Exception as e:
if type(e) in exceptions:
raise ClientException()
else:
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
+
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
- ClientException, which is used internally by the RPC layer."""
+ ClientException, which is used internally by the RPC layer.
+ """
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return outer
+# TODO(sirp): we should deprecate this in favor of
+# using `versionutils.is_compatible` directly
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
- version_parts = version.split('.')
- imp_version_parts = imp_version.split('.')
- if int(version_parts[0]) != int(imp_version_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
- return False
- return True
+ return versionutils.is_compatible(version, imp_version)
def serialize_msg(raw_msg):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
minimum version that supports the new parameter should be specified.
"""
+import six
+
from cinder.openstack.common.rpc import common as rpc_common
+from cinder.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
- def __init__(self, callbacks):
+ def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
+ :param serializer: The Serializer object that will be used to
+ deserialize arguments before the method call and
+ to serialize the result after it returns.
"""
self.callbacks = callbacks
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcDispatcher, self).__init__()
+ def _deserialize_args(self, context, kwargs):
+ """Helper method called to deserialize args before dispatch.
+
+ This calls our serializer on each argument, returning a new set of
+ args that have been deserialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to be deserialized
+ :returns: A new set of deserialized args
+ """
+ new_kwargs = dict()
+ for argname, arg in six.iteritems(kwargs):
+ new_kwargs[argname] = self.serializer.deserialize_entity(context,
+ arg)
+ return new_kwargs
+
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
if not hasattr(proxyobj, method):
continue
if is_compatible:
- return getattr(proxyobj, method)(ctxt, **kwargs)
+ kwargs = self._deserialize_args(ctxt, kwargs)
+ result = getattr(proxyobj, method)(ctxt, **kwargs)
+ return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import time
import eventlet
+import six
from cinder.openstack.common.rpc import common as rpc_common
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
- raise failure[0], failure[1], failure[2]
+ six.reraise(failure[0], failure[1], failure[2])
res.append(reply)
# if ending not 'sent'...we might have more data to
# return from the function itself
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return Connection()
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
- return iter([None])
+ raise rpc_common.Timeout("No consumers available")
else:
return consumer.call(context, version, method, namespace, args,
timeout)
def fanout_cast(conf, context, topic, msg):
- """Cast to all consumers of a topic"""
+ """Cast to all consumers of a topic."""
check_serialize(msg)
method = msg.get('method')
if not method:
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import itertools
import socket
import ssl
-import sys
import time
import uuid
import kombu.entity
import kombu.messaging
from oslo.config import cfg
+import six
+from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import network_utils
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
+from cinder.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
- help='SSL version to use (valid only if SSL enabled)'),
+ help='SSL version to use (valid only if SSL enabled). '
+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+ 'be available on some distributions'
+ ),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
+ self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
- """Re-declare the queue after a rabbit reconnect"""
+ """Re-declare the queue after a rabbit reconnect."""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
+ def _callback_handler(self, message, callback):
+ """Call callback with deserialized message.
+
+ Messages that are processed without exception are ack'ed.
+
+ If the message processing generates an exception, it will be
+ ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
+ """
+
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+ except Exception:
+ if self.ack_on_error:
+ LOG.exception(_("Failed to process message"
+ " ... skipping it."))
+ message.ack()
+ else:
+ LOG.exception(_("Failed to process message"
+ " ... will requeue."))
+ message.requeue()
+ else:
+ message.ack()
+
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
If kwargs['nowait'] is True, then this call will block until
a message is read.
- Messages will automatically be acked if the callback doesn't
- raise an exception
"""
options = {'consumer_tag': self.tag}
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
+ self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
def cancel(self):
- """Cancel the consuming from the queue, if it has started"""
+ """Cancel the consuming from the queue, if it has started."""
try:
self.queue.cancel(self.tag)
- except KeyError, e:
+ except KeyError as e:
# NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'"""
+ """Queue/consumer class for 'direct'."""
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'"""
+ """Consumer class for 'topic'."""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name=None, **kwargs):
class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'"""
+ """Consumer class for 'fanout'."""
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
class Publisher(object):
- """Base Publisher class"""
+ """Base Publisher class."""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
"""Init the Publisher class with the exchange_name, routing_key,
self.reconnect(channel)
def reconnect(self, channel):
- """Re-establish the Producer after a rabbit reconnection"""
+ """Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
routing_key=self.routing_key)
def send(self, msg, timeout=None):
- """Send a message"""
+ """Send a message."""
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
class DirectPublisher(Publisher):
- """Publisher class for 'direct'"""
+ """Publisher class for 'direct'."""
def __init__(self, conf, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
class TopicPublisher(Publisher):
- """Publisher class for 'topic'"""
+ """Publisher class for 'topic'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'topic' publisher.
class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'"""
+ """Publisher class for 'fanout'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'fanout' publisher.
class NotifyPublisher(TopicPublisher):
- """Publisher class for 'notify'"""
+ """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
'virtual_host': self.conf.rabbit_virtual_host,
}
- for sp_key, value in server_params.iteritems():
+ for sp_key, value in six.iteritems(server_params):
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
self.reconnect()
def _fetch_ssl_params(self):
- """Handles fetching what ssl params
- should be used for the connection (if any)"""
+ """Handles fetching what ssl params should be used for the connection
+ (if any).
+ """
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
- ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+ ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+ self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
- if not ssl_params:
- # Just have the default behavior
- return True
- else:
- # Return the extended behavior
- return ssl_params
+ # Return the extended behavior or just have the default behavior
+ return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
return
except (IOError, self.connection_errors) as e:
pass
- except Exception, e:
+ except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.error(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
- # NOTE(comstud): Copied from original code. There's
- # really no better recourse because if this was a queue we
- # need to consume on, we have no way to consume anymore.
- sys.exit(1)
+ msg = _('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info
+ LOG.error(msg)
+ raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1
while True:
try:
return method(*args, **kwargs)
- except (self.connection_errors, socket.timeout, IOError), e:
+ except (self.connection_errors, socket.timeout, IOError) as e:
if error_callback:
error_callback(e)
- except Exception, e:
+ except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
self.reconnect()
def get_channel(self):
- """Convenience call for bin/clear_rabbit_queues"""
+ """Convenience call for bin/clear_rabbit_queues."""
return self.channel
def close(self):
- """Close/release this connection"""
+ """Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
- """Reset a connection so it can be used again"""
+ """Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close()
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
- self.consumer_num.next())
+ six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers"""
+ """Return an iterator that will consume from all queues/consumers."""
info = {'do_consume': True}
def _consume():
if info['do_consume']:
- queues_head = self.consumers[:-1]
- queues_tail = self.consumers[-1]
+ queues_head = self.consumers[:-1] # not fanout.
+ queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
+ """Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
- """Send to a publisher based on the publisher class"""
+ """Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
+ ack_on_error=ack_on_error,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer"""
+ """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
- """Send a 'direct' message"""
+ """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message"""
+ """Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
- """Send a 'fanout' message"""
+ """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic"""
+ """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
- """Consume from all queues/consumers"""
+ """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
- it.next()
+ six.next(it)
except StopIteration:
return
def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
+ """Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
+ """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.declare_topic_consumer(topic, proxy_cb)
def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object"""
+ """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
+ wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
+ ack_on_error=ack_on_error,
)
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
import eventlet
import greenlet
from oslo.config import cfg
+import six
+from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
+qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
cfg.CONF.register_opts(qpid_opts)
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- self.reconnect(session)
+ self.connect(session)
+
+ def connect(self, session):
+ """Declare the receiver on connect."""
+ self._declare_receiver(session)
def reconnect(self, session):
- """Re-declare the receiver after a qpid reconnect"""
+ """Re-declare the receiver after a qpid reconnect."""
+ self._declare_receiver(session)
+
+ def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
+ def _unpack_json_msg(self, msg):
+ """Load the JSON data in msg if msg.content_type indicates that it
+ is necessary. Put the loaded data back into msg.content and
+ update msg.content_type appropriately.
+
+ A Qpid Message containing a dict will have a content_type of
+ 'amqp/map', whereas one containing a string that needs to be converted
+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+ :param msg: a Qpid Message object
+ :returns: None
+ """
+ if msg.content_type == JSON_CONTENT_TYPE:
+ msg.content = jsonutils.loads(msg.content)
+ msg.content_type = 'amqp/map'
+
def consume(self):
- """Fetch the message and pass it to the callback object"""
+ """Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
+ self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
+ # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
+ def get_node_name(self):
+ return self.address.split(';')[0]
+
class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'"""
+ """Queue/consumer class for 'direct'."""
def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'"""
+ """Consumer class for 'topic'."""
def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'"""
+ """Consumer class for 'fanout'."""
def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
+ self.conf = conf
link_opts = {"exclusive": True}
class Publisher(object):
- """Base Publisher class"""
+ """Base Publisher class."""
def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
self.reconnect(session)
def reconnect(self, session):
- """Re-establish the Sender after a reconnection"""
+ """Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
+ def _pack_json_msg(self, msg):
+ """Qpid cannot serialize dicts containing strings longer than 65535
+ characters. This function dumps the message content to a JSON
+ string, which Qpid is able to handle.
+
+ :param msg: May be either a Qpid Message object or a bare dict.
+ :returns: A Qpid Message with its content field JSON encoded.
+ """
+ try:
+ msg.content = jsonutils.dumps(msg.content)
+ except AttributeError:
+ # Need to have a Qpid message so we can set the content_type.
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
+ msg.content_type = JSON_CONTENT_TYPE
+ return msg
+
def send(self, msg):
- """Send a message"""
+ """Send a message."""
+ try:
+ # Check if Qpid can encode the message
+ check_msg = msg
+ if not hasattr(check_msg, 'content_type'):
+ check_msg = qpid_messaging.Message(msg)
+ content_type = check_msg.content_type
+ enc, dec = qpid_messaging.message.get_codec(content_type)
+ enc(check_msg.content)
+ except qpid_codec.CodecException:
+ # This means the message couldn't be serialized as a dict.
+ msg = self._pack_json_msg(msg)
self.sender.send(msg)
class DirectPublisher(Publisher):
- """Publisher class for 'direct'"""
+ """Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
+
if conf.qpid_topology_version == 1:
node_name = msg_id
node_opts = {"type": "direct"}
class TopicPublisher(Publisher):
- """Publisher class for 'topic'"""
+ """Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
- """init a 'topic' publisher.
+ """Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'"""
+ """Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
- """init a 'fanout' publisher.
+ """Init a 'fanout' publisher.
"""
if conf.qpid_topology_version == 1:
class NotifyPublisher(Publisher):
- """Publisher class for notifications"""
+ """Publisher class for notifications."""
def __init__(self, conf, session, topic):
- """init a 'topic' publisher.
+ """Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
node_opts = {"durable": True}
return self.consumers[str(receiver)]
def reconnect(self):
- """Handles reconnecting and re-establishing sessions and queues"""
+ """Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
try:
self.connection_create(broker)
self.connection.open()
- except qpid_exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
consumers = self.consumers
self.consumers = {}
- for consumer in consumers.itervalues():
+ for consumer in six.itervalues(consumers):
consumer.reconnect(self.session)
self._register_consumer(consumer)
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
- qpid_exceptions.ConnectionError), e:
+ qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()
def close(self):
- """Close/release this connection"""
+ """Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
- self.connection.close()
+ try:
+ self.connection.close()
+ except Exception:
+ # NOTE(dripton) Logging exceptions that happen during cleanup just
+ # causes confusion; there's really nothing useful we can do with
+ # them.
+ pass
self.connection = None
def reset(self):
- """Reset a connection so it can be used again"""
+ """Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers"""
+ """Return an iterator that will consume from all queues/consumers."""
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
+ """Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg):
- """Send to a publisher based on the publisher class"""
+ """Send to a publisher based on the publisher class."""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
topic, callback)
def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer"""
+ """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
- """Send a 'direct' message"""
+ """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message"""
+ """Send a 'topic' message."""
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
- """Send a 'fanout' message"""
+ """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic"""
+ """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg)
def consume(self, limit=None):
- """Consume from all queues/consumers"""
+ """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
- it.next()
+ six.next(it)
except StopIteration:
return
def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
+ """Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
+ """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
return consumer
def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object"""
+ """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
+ wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import eventlet
import greenlet
from oslo.config import cfg
+import six
+from six import moves
from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
- LOG.error("ZeroMQ socket could not be closed.")
+ LOG.error(_("ZeroMQ socket could not be closed."))
self.sock = None
def recv(self, **kwargs):
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
- zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
LOG.info(_("In reactor registered"))
def consume_in_thread(self):
+ @excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
h = {}
try:
while True:
- k = i.next()
- h[k] = i.next()
+ k = six.next(i)
+ h[k] = six.next(i)
except StopIteration:
return h
else:
sock_type = zmq.PULL
subscribe = None
- topic = '.'.join((topic, CONF.rpc_zmq_host))
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
"""Send a message to all listening and expect no reply."""
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
- LOG.error(_('topic is %s.') % topic)
- if topic:
- _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, envelope):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
"""
import contextlib
-import itertools
-import json
import eventlet
from oslo.config import cfg
matchmaker_opts = [
- # Matchmaker ring file
- cfg.StrOpt('matchmaker_ringfile',
- default='/etc/nova/matchmaker_ring.json',
- help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
class Exchange(object):
- """
- Implements lookups.
+ """Implements lookups.
+
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
class Binding(object):
- """
- A binding on which to perform a lookup.
- """
+ """A binding on which to perform a lookup."""
def __init__(self):
pass
class MatchMakerBase(object):
- """
- Match Maker Base Class.
- Build off HeartbeatMatchMakerBase if building a
- heartbeat-capable MatchMaker.
+ """Match Maker Base Class.
+
+ Build off HeartbeatMatchMakerBase if building a heartbeat-capable
+ MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
'registration or heartbeat.')
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
- """
- Acknowledge that a key.host is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a key.host is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publicly to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
pass
def is_alive(self, topic, host):
- """
- Checks if a host is alive.
- """
+ """Checks if a host is alive."""
pass
def expire(self, topic, host):
- """
- Explicitly expire a host's registration.
- """
+ """Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
pass
def start_heartbeat(self):
- """
- Spawn heartbeat greenthread.
- """
+ """Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
class HeartbeatMatchMakerBase(MatchMakerBase):
- """
- Base for a heart-beat capable MatchMaker.
- Provides common methods for registering,
- unregistering, and maintaining heartbeats.
+ """Base for a heart-beat capable MatchMaker.
+
+ Provides common methods for registering, unregistering, and maintaining
+ heartbeats.
"""
def __init__(self):
self.hosts = set()
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
self.ack_alive(key, host)
def ack_alive(self, key, host):
- """
- Acknowledge that a host.topic is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a host.topic is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publicly to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
- """
- Implements registration logic.
+ """Implements registration logic.
+
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
- """
- Implements de-registration logic.
+ """Implements de-registration logic.
+
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.ack_alive(key, host)
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
- LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+ LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+ {'key': key, 'host': host})
def start_heartbeat(self):
- """
- Implementation of MatchMakerBase.start_heartbeat
+ """Implementation of MatchMakerBase.start_heartbeat.
+
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
- if len(self.hosts) == 0:
+ if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
- """
- Specifies a host in the key via a '.' character
+ """Specifies a host in the key via a '.' character.
+
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
def test(self, key):
- if '.' in key:
- return True
- return False
+ return '.' in key
class TopicBinding(Binding):
- """
- Where a 'bare' key without dots.
+ """Where a 'bare' key without dots.
+
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
matches that of a direct exchange.
"""
def test(self, key):
- if '.' not in key:
- return True
- return False
+ return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
- if key.startswith('fanout~'):
- return True
- return False
+ return key.startswith('fanout~')
class StubExchange(Exchange):
return [(key, None)]
-class RingExchange(Exchange):
- """
- Match Maker where hosts are loaded from a static file containing
- a hashmap (JSON formatted).
-
- __init__ takes optional ring dictionary argument, otherwise
- loads the ringfile from CONF.mathcmaker_ringfile.
- """
- def __init__(self, ring=None):
- super(RingExchange, self).__init__()
-
- if ring:
- self.ring = ring
- else:
- fh = open(CONF.matchmaker_ringfile, 'r')
- self.ring = json.load(fh)
- fh.close()
-
- self.ring0 = {}
- for k in self.ring.keys():
- self.ring0[k] = itertools.cycle(self.ring[k])
-
- def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
-
-
-class RoundRobinRingExchange(RingExchange):
- """A Topic Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(RoundRobinRingExchange, self).__init__(ring)
-
- def run(self, key):
- if not self._ring_has(key):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (key, )
- )
- return []
- host = next(self.ring0[key])
- return [(key + '.' + host, host)]
-
-
-class FanoutRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(FanoutRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "fanout~", strip it for lookup.
- nkey = key.split('fanout~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
class DirectExchange(Exchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
return [(key, e)]
-class MatchMakerRing(MatchMakerBase):
- """
- Match Maker where hosts are loaded from a static hashmap.
- """
- def __init__(self, ring=None):
- super(MatchMakerRing, self).__init__()
- self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
- self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
-
-
class MatchMakerLocalhost(MatchMakerBase):
- """
- Match Maker where all bare topics resolve to localhost.
+ """Match Maker where all bare topics resolve to localhost.
+
Useful for testing.
"""
def __init__(self, host='localhost'):
class MatchMakerStub(MatchMakerBase):
- """
- Match Maker where topics are untouched.
+ """Match Maker where topics are untouched.
+
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
- super(MatchMakerLocalhost, self).__init__()
+ super(MatchMakerStub, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
class RedisTopicExchange(RedisExchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
class RedisFanoutExchange(RedisExchange):
- """
- Return a list of all hosts.
- """
+ """Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
- """
- MatchMaker registering and looking-up hosts with a Redis server.
- """
+ """MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
- self.redis = redis.StrictRedis(
+ self.redis = redis.Redis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
from oslo.config import cfg
-from cinder.openstack.common.gettextutils import _ # noqa
+from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import log as logging
from cinder.openstack.common.rpc import matchmaker as mm
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
+ return key in self.ring0
class RoundRobinRingExchange(RingExchange):
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 Red Hat, Inc.
+# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
rpc/dispatcher.py
"""
+import six
from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import common as rpc_common
+from cinder.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
rpc API.
"""
- def __init__(self, topic, default_version):
+ # The default namespace, which can be overridden in a subclass.
+ RPC_API_NAMESPACE = None
+
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
+ :param version_cap: Optionally cap the maximum version used for sent
+ messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
+ self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
- msg['version'] = vers if vers else self.default_version
+ v = vers if vers else self.default_version
+ if (self.version_cap and not
+ rpc_common.version_is_compatible(self.version_cap, v)):
+ raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
+ msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
return topic if topic else self.topic
+ def can_send_version(self, version):
+ """Check to see if a version is compatible with the version cap."""
+ return (not self.version_cap or
+ rpc_common.version_is_compatible(self.version_cap, version))
+
@staticmethod
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
- @staticmethod
- def make_msg(method, **kwargs):
- return RpcProxy.make_namespaced_msg(method, None, **kwargs)
+ def make_msg(self, method, **kwargs):
+ return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
+ **kwargs)
+
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in six.iteritems(kwargs):
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.call(context, real_topic, msg, timeout)
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.multicall(context, real_topic, msg, timeout)
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Serializer(object):
+ """Generic (de-)serialization definition base class."""
+
+ @abc.abstractmethod
+ def serialize_entity(self, context, entity):
+ """Serialize something to primitive form.
+
+ :param context: Security context
+ :param entity: Entity to be serialized
+ :returns: Serialized form of entity
+ """
+ pass
+
+ @abc.abstractmethod
+ def deserialize_entity(self, context, entity):
+ """Deserialize something from primitive form.
+
+ :param context: Security context
+ :param entity: Primitive to be deserialized
+ :returns: Deserialized form of entity
+ """
+ pass
+
+
+class NoOpSerializer(Serializer):
+ """A serializer that does nothing."""
+
+ def serialize_entity(self, context, entity):
+ return entity
+
+ def deserialize_entity(self, context, entity):
+ return entity
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
class Service(service.Service):
"""Service object for binaries running on hosts.
- A service enables rpc by listening to queues based on topic and host."""
- def __init__(self, host, topic, manager=None):
+ A service enables rpc by listening to queues based on topic and host.
+ """
+ def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
+ self.serializer = serializer
if manager is None:
self.manager = self
else:
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+ self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
- node_topic = '%s:%s' % (self.topic, self.host)
+ node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
host = kwargs['host']
else:
host = kwargs['volume']['host']
- expected_topic = '%s:%s' % (CONF.volume_topic, host)
+ expected_topic = '%s.%s' % (CONF.volume_topic, host)
self.fake_args = None
self.fake_kwargs = None
# by impl_zmq. (integer value)
#rpc_cast_timeout=30
-# Modules of exceptions that are permitted to be recreatedupon
-# receiving exception data from an rpc call. (list value)
+# Modules of exceptions that are permitted to be recreated
+# upon receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
# If passed, use a fake RabbitMQ provider (boolean value)
# Options defined in cinder.openstack.common.rpc.amqp
#
-# Enable a fast single reply queue if using AMQP based RPC
-# like RabbitMQ or Qpid. (boolean value)
-#amqp_rpc_single_reply_queue=false
-
# Use durable queues in amqp. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues=false
# Options defined in cinder.openstack.common.rpc.impl_kombu
#
-# SSL version to use (valid only if SSL enabled) (string
-# value)
+# SSL version to use (valid only if SSL enabled). valid values
+# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
+# distributions (string value)
#kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value)
# Options defined in cinder.openstack.common.rpc.matchmaker
#
-# Matchmaker ring file (JSON) (string value)
-#matchmaker_ringfile=/etc/nova/matchmaker_ring.json
-
# Heartbeat frequency (integer value)
#matchmaker_heartbeat_freq=300