--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import os
+import sys
+
+# If ../cinder/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'cinder', '__init__.py')):
+ sys.path.insert(0, POSSIBLE_TOPDIR)
+
+from oslo.config import cfg
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+ CONF(sys.argv[1:], project='cinder')
+ logging.setup("cinder")
+
+ with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+ reactor.consume_in_thread()
+ reactor.wait()
+
+if __name__ == '__main__':
+ main()
default='noauth',
help='The strategy to use for auth. Supports noauth, keystone, '
'and deprecated.'),
- cfg.StrOpt('control_exchange',
- default='cinder',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.ListOpt('enabled_backends',
default=None,
help='A list of backend names to use. These backend names '
rpc.proxy
"""
+import inspect
+import logging
+
from oslo.config import cfg
+from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
+from cinder.openstack.common import local
+
+
+LOG = logging.getLogger(__name__)
+
rpc_opts = [
cfg.StrOpt('rpc_backend',
default=['cinder.openstack.common.exception',
'nova.exception',
'cinder.exception',
+ 'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- #
- # The following options are not registered here, but are expected to be
- # present. The project using this library must register these options with
- # the configuration so that project-specific defaults may be defined.
- #
- #cfg.StrOpt('control_exchange',
- # default='nova',
- # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.StrOpt('control_exchange',
+ default='openstack',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
-cfg.CONF.register_opts(rpc_opts)
+CONF = cfg.CONF
+CONF.register_opts(rpc_opts)
+
+
+def set_defaults(control_exchange):
+ cfg.set_defaults(rpc_opts,
+ control_exchange=control_exchange)
def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection
"""
- return _get_impl().create_connection(cfg.CONF, new=new)
+ return _get_impl().create_connection(CONF, new=new)
+
+
+def _check_for_lock():
+ if not CONF.debug:
+ return None
+
+ if ((hasattr(local.strong_store, 'locks_held')
+ and local.strong_store.locks_held)):
+ stack = ' :: '.join([frame[3] for frame in inspect.stack()])
+ LOG.warn(_('A RPC is being made while holding a lock. The locks '
+ 'currently held are %(locks)s. This is probably a bug. '
+ 'Please report it. Include the following: [%(stack)s].'),
+ {'locks': local.strong_store.locks_held,
+ 'stack': stack})
+ return True
+
+ return False
-def call(context, topic, msg, timeout=None):
+def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
:returns: None
"""
- return _get_impl().cast(cfg.CONF, context, topic, msg)
+ return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
+ return _get_impl().fanout_cast(CONF, context, topic, msg)
-def multicall(context, topic, msg, timeout=None):
+def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().multicall(CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
:returns: None
"""
- return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
:returns: None
"""
- return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>.
"""
- return '%s.%s' % (topic, host)
+ return '%s.%s' % (topic, host) if host else topic
_RPCIMPL = None
global _RPCIMPL
if _RPCIMPL is None:
try:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
- impl = cfg.CONF.rpc_backend.replace('nova.rpc',
- 'nova.openstack.common.rpc')
+ impl = CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
AMQP, but is deprecated and predates this code.
"""
+import collections
import inspect
-import logging
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
+from eventlet import queue
from eventlet import semaphore
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
from oslo.config import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import local
+from cinder.openstack.common import log as logging
from cinder.openstack.common.rpc import common as rpc_common
+
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+ cfg.BoolOpt('amqp_rpc_single_reply_queue',
+ default=False,
+ help='Enable a fast single reply queue if using AMQP based '
+ 'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
+UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
+ self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
- LOG.debug('Pool creating new connection')
+ LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf)
def empty(self):
while self.free_items:
self.get().close()
+ # Force a new connection pool to be created.
+ # Note that this was added due to failing unit test cases. The issue
+ # is the above "while loop" gets all the cached connections from the
+ # pool and closes them, but never returns them to the pool, a pool
+ # leak. The unit tests hang waiting for an item to be returned to the
+ # pool. The unit tests get here via the teatDown() method. In the run
+ # time code, it gets here via cleanup() and only appears in service.py
+ # just before doing a sys.exit(), so cleanup() only happens once and
+ # the leakage is not a problem.
+ self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ self.connection.join_consumer_pool(callback,
+ pool_name,
+ topic,
+ exchange_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False):
+class ReplyProxy(ConnectionContext):
+ """ Connection class for RPC replies / callbacks """
+ def __init__(self, conf, connection_pool):
+ self._call_waiters = {}
+ self._num_call_waiters = 0
+ self._num_call_waiters_wrn_threshhold = 10
+ self._reply_q = 'reply_' + uuid.uuid4().hex
+ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+ self.declare_direct_consumer(self._reply_q, self._process_data)
+ self.consume_in_thread()
+
+ def _process_data(self, message_data):
+ msg_id = message_data.pop('_msg_id', None)
+ waiter = self._call_waiters.get(msg_id)
+ if not waiter:
+ LOG.warn(_('no calling threads waiting for msg_id : %s'
+ ', message : %s') % (msg_id, message_data))
+ else:
+ waiter.put(message_data)
+
+ def add_call_waiter(self, waiter, msg_id):
+ self._num_call_waiters += 1
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ LOG.warn(_('Number of call waiters is greater than warning '
+ 'threshhold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshhold)
+ self._num_call_waiters_wrn_threshhold *= 2
+ self._call_waiters[msg_id] = waiter
+
+ def del_call_waiter(self, msg_id):
+ self._num_call_waiters -= 1
+ del self._call_waiters[msg_id]
+
+ def get_reply_q(self):
+ return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+ failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
- failure = rpc_common.serialize_remote_exception(failure)
+ failure = rpc_common.serialize_remote_exception(failure,
+ log_failure)
try:
msg = {'result': reply, 'failure': failure}
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ _add_unique_id(msg)
+ # If a reply_q exists, add the msg_id to the reply and pass the
+ # reply_q to direct_send() to use it as the response queue.
+ # Otherwise use the msg_id for backward compatibilty.
+ if reply_q:
+ msg['_msg_id'] = msg_id
+ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+ else:
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
+ self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
+ values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
- connection_pool=None):
+ connection_pool=None, log_failure=True):
if self.msg_id:
- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending)
+ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+ reply, failure, ending, log_failure)
if ending:
self.msg_id = None
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
msg.update(context_d)
-class ProxyCallback(object):
- """Calls methods on a proxy object based on method and args."""
+class _MsgIdCache(object):
+ """This class checks any duplicate messages."""
- def __init__(self, conf, proxy, connection_pool):
- self.proxy = proxy
+ # NOTE: This value is considered can be a configuration item, but
+ # it is not necessary to change its value in most cases,
+ # so let this value as static for now.
+ DUP_MSG_CHECK_SIZE = 16
+
+ def __init__(self, **kwargs):
+ self.prev_msgids = collections.deque([],
+ maxlen=self.DUP_MSG_CHECK_SIZE)
+
+ def check_duplicate_message(self, message_data):
+ """AMQP consumers may read same message twice when exceptions occur
+ before ack is returned. This method prevents doing it.
+ """
+ if UNIQUE_ID in message_data:
+ msg_id = message_data[UNIQUE_ID]
+ if msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
+ else:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+ """Add unique_id for checking duplicate messages."""
+ unique_id = uuid.uuid4().hex
+ msg.update({UNIQUE_ID: unique_id})
+ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
+class _ThreadPoolWithWait(object):
+ """Base class for a delayed invocation manager used by
+ the Connection class to start up green threads
+ to handle incoming messages.
+ """
+
+ def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+ """Wraps a straight callback to allow it to be invoked in a green
+ thread.
+ """
+
+ def __init__(self, conf, callback, connection_pool):
+ """
+ :param conf: cfg.CONF instance
+ :param callback: a callable (probably a function)
+ :param connection_pool: connection pool as returned by
+ get_connection_pool()
+ """
+ super(CallbackWrapper, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.callback = callback
+
+ def __call__(self, message_data):
+ self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, conf, proxy, connection_pool):
+ super(ProxyCallback, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.proxy = proxy
+ self.msg_id_cache = _MsgIdCache()
+
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
- except Exception as e:
- LOG.exception('Exception during message handling')
+ except rpc_common.ClientException as e:
+ LOG.debug(_('Expected exception during message handling (%s)') %
+ e._exc_info[1])
+ ctxt.reply(None, e._exc_info,
+ connection_pool=self.connection_pool,
+ log_failure=False)
+ except Exception:
+ LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+class MulticallProxyWaiter(object):
+ def __init__(self, conf, msg_id, timeout, connection_pool):
+ self._msg_id = msg_id
+ self._timeout = timeout or conf.rpc_response_timeout
+ self._reply_proxy = connection_pool.reply_proxy
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+ self._dataqueue = queue.LightQueue()
+ # Add this caller to the reply proxy's call_waiters
+ self._reply_proxy.add_call_waiter(self, self._msg_id)
+ self.msg_id_cache = _MsgIdCache()
+
+ def put(self, data):
+ self._dataqueue.put(data)
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ # Remove this caller from reply proxy's call_waiters
+ self._reply_proxy.del_call_waiter(self._msg_id)
+
+ def _process_data(self, data):
+ result = None
+ self.msg_id_cache.check_duplicate_message(data)
+ if data['failure']:
+ failure = data['failure']
+ result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ result = data['result']
+ return result
+
+ def __iter__(self):
+ """Return a result until we get a reply with an 'ending" flag"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ data = self._dataqueue.get(timeout=self._timeout)
+ result = self._process_data(data)
+ except queue.Empty:
+ LOG.exception(_('Timed out waiting for RPC response.'))
+ self.done()
+ raise rpc_common.Timeout()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+
+
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
self._done = False
self._got_ending = False
self._conf = conf
+ self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
return ConnectionContext(conf, connection_pool, pooled=not new)
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
+ # TODO(pekowski): Remove all these comments in Havana.
+ # For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ # For amqp_rpc_single_reply_queue = True,
+ # The 'with' statement is mandatory for closing the connection
+ LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _add_unique_id(msg)
pack_context(msg, context)
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+ if not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ else:
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
- event_type = msg.get('event_type')
- LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+ dict(event_type=msg.get('event_type'),
+ topic=topic))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
def get_control_exchange(conf):
- try:
- return conf.control_exchange
- except cfg.NoSuchOptError:
- return 'openstack'
+ return conf.control_exchange
# under the License.
import copy
-import logging
+import sys
import traceback
+from oslo.config import cfg
+
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common import local
+from cinder.openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'oslo.version': <RPC Envelope Version as a String>,
+ 'oslo.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
try:
message = self.message % kwargs
- except Exception as e:
+ except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
message = _("Timeout while waiting on RPC response.")
+class DuplicateMessageError(RPCException):
+ message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+
+
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
"""
raise NotImplementedError()
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+
+ :param callback: Callable to be invoked for each message.
+ :type callback: callable accepting one argument
+ :param pool_name: The name of the consumer pool.
+ :type pool_name: str
+ :param topic: The routing topic for desired messages.
+ :type topic: str
+ :param exchange_name: The name of the message exchange where
+ the client should attach. Defaults to
+ the configured exchange.
+ :type exchange_name: str
+ """
+ raise NotImplementedError()
+
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',), }
+ SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+ 'run_instance': [('args', 'admin_password')],
+ 'route_message': [('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'password'),
+ ('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
msg_data = copy.deepcopy(msg_data)
if has_method:
- method = msg_data['method']
- if method in SANITIZE:
- args_to_sanitize = SANITIZE[method]
- for arg in args_to_sanitize:
- try:
- msg_data['args'][arg] = "<SANITIZED>"
- except KeyError:
- pass
+ for arg in SANITIZE.get(msg_data['method'], []):
+ try:
+ d = msg_data
+ for elem in arg[:-1]:
+ d = d[elem]
+ d[arg[-1]] = '<SANITIZED>'
+ except KeyError, e:
+ LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+ {'item': arg,
+ 'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
- LOG.error(tb)
+ if log_failure:
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
- if not module in conf.allowed_rpc_exception_modules:
+ if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
- except TypeError as e:
+ except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
context.values['read_deleted'] = read_deleted
return context
+
+
+class ClientException(Exception):
+ """This encapsulates some actual exception that is expected to be
+ hit by an RPC proxy object. Merely instantiating it records the
+ current exception information, which will be passed back to the
+ RPC client without exceptional logging."""
+ def __init__(self):
+ self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, e:
+ if type(e) in exceptions:
+ raise ClientException()
+ else:
+ raise
+
+
+def client_exceptions(*exceptions):
+ """Decorator for manager methods that raise expected exceptions.
+ Marking a Manager method with this decorator allows the declaration
+ of expected exceptions that the RPC layer should not consider fatal,
+ and not log as if they were generated in a real error scenario. Note
+ that this will cause listed exceptions to be wrapped in a
+ ClientException, which is used internally by the RPC layer."""
+ def outer(func):
+ def inner(*args, **kwargs):
+ return catch_client_exception(exceptions, func, *args, **kwargs)
+ return inner
+ return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
there can be both versioned and unversioned APIs implemented in the same code
base.
-
-EXAMPLES:
+EXAMPLES
+========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
Example 1) Adding a new method.
+-------------------------------
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
-be implemented for the method to be supported. For example:
+be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
Example 2) Adding a new parameter.
+----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
-The implementation of the method must not expect the parameter to be present.
+The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
"""
import inspect
+# NOTE(russellb): We specifically want to use json, not our own jsonutils.
+# jsonutils has some extra logic to automatically convert objects to primitive
+# types so that they can be serialized. We want to catch all cases where
+# non-primitive types make it into this code and treat it as an error.
+import json
import time
import eventlet
-from cinder.openstack.common import jsonutils
from cinder.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
else:
res.append(rval)
done.send(res)
+ except rpc_common.ClientException as e:
+ done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- jsonutils.dumps(msg)
+ json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg):
+ check_serialize(msg)
try:
call(conf, context, topic, msg)
except Exception:
pass
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
check_serialize(msg)
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
- help='the RabbitMQ password'),
+ help='the RabbitMQ password',
+ secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
- message.ack()
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
+ finally:
+ message.ack()
self.queue.consume(*args, callback=_callback, **options)
"""
# Default options
options = {'durable': False,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
# Default options
options = {'durable': False,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
channel=channel,
routing_key=self.routing_key)
- def send(self, msg):
+ def send(self, msg, timeout=None):
"""Send a message"""
- self.producer.publish(msg)
+ if timeout:
+ #
+ # AMQP TTL is in milliseconds when set in the header.
+ #
+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+ else:
+ self.producer.publish(msg)
class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
"""
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
- params = {}
+ params = {
+ 'hostname': hostname,
+ 'port': port,
+ 'userid': self.conf.rabbit_userid,
+ 'password': self.conf.rabbit_password,
+ 'virtual_host': self.conf.rabbit_virtual_host,
+ }
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
- params.setdefault('hostname', hostname)
- params.setdefault('port', port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
-
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
- self.connection.close()
+ self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
def _error_callback(exc):
if isinstance(exc, socket.timeout):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
pass
self.consumer_thread = None
- def publisher_send(self, cls, topic, msg, **kwargs):
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg)
+ publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
- self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+ self.declare_topic_consumer(
+ queue_name=pool_name,
+ topic=topic,
+ exchange_name=exchange_name,
+ callback=callback_wrapper,
+ )
+
def create_connection(conf, new=True):
"""Create a connection"""
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.cast_to_server(
+ return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
import functools
import itertools
-import logging
import time
import uuid
import eventlet
import greenlet
from oslo.config import cfg
-import qpid.messaging
-import qpid.messaging.exceptions
from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
+from cinder.openstack.common import log as logging
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+ help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
- help='Password for qpid connection'),
+ help='Password for qpid connection',
+ secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
- cfg.BoolOpt('qpid_reconnect',
- default=True,
- help='Automatically reconnect'),
- cfg.IntOpt('qpid_reconnect_timeout',
- default=0,
- help='Reconnection timeout in seconds'),
- cfg.IntOpt('qpid_reconnect_limit',
- default=0,
- help='Max reconnections before giving up'),
- cfg.IntOpt('qpid_reconnect_interval_min',
- default=0,
- help='Minimum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval_max',
- default=0,
- help='Maximum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval',
- default=0,
- help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=60,
help='Seconds between connection keepalive heartbeats'),
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
- if server_params is None:
- server_params = {}
-
- default_params = dict(hostname=self.conf.qpid_hostname,
- port=self.conf.qpid_port,
- username=self.conf.qpid_username,
- password=self.conf.qpid_password)
+ if server_params and 'hostname' in server_params:
+ # NOTE(russellb) This enables support for cast_to_server.
+ server_params['qpid_hosts'] = [
+ '%s:%d' % (server_params['hostname'],
+ server_params.get('port', 5672))
+ ]
+
+ params = {
+ 'qpid_hosts': self.conf.qpid_hosts,
+ 'username': self.conf.qpid_username,
+ 'password': self.conf.qpid_password,
+ }
+ params.update(server_params or {})
- params = server_params
- for key in default_params.keys():
- params.setdefault(key, default_params[key])
+ self.brokers = params['qpid_hosts']
+ self.username = params['username']
+ self.password = params['password']
+ self.connection_create(self.brokers[0])
+ self.reconnect()
- self.broker = params['hostname'] + ":" + str(params['port'])
+ def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(self.broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
- self.connection.username = params['username']
- self.connection.password = params['password']
+ self.connection.username = self.username
+ self.connection.password = self.password
+
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
- self.connection.reconnect = self.conf.qpid_reconnect
- if self.conf.qpid_reconnect_timeout:
- self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
- if self.conf.qpid_reconnect_limit:
- self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
- if self.conf.qpid_reconnect_interval_max:
- self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
- if self.conf.qpid_reconnect_interval_min:
- self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
- if self.conf.qpid_reconnect_interval:
- self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ # Reconnection is done by self.reconnect()
+ self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
- # Open is part of reconnect -
- # NOTE(WGH) not sure we need this with the reconnect flags
- self.reconnect()
-
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
+ attempt = 0
+ delay = 1
while True:
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
try:
+ self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
- LOG.error(_('Unable to connect to AMQP server: %s'), e)
- time.sleep(self.conf.qpid_reconnect_interval or 1)
+ except qpid_exceptions.ConnectionError, e:
+ msg_dict = dict(e=e, delay=delay)
+ msg = _("Unable to connect to AMQP server: %(e)s. "
+ "Sleeping %(delay)s seconds") % msg_dict
+ LOG.error(msg)
+ time.sleep(delay)
+ delay = min(2 * delay, 60)
else:
+ LOG.info(_('Connected to AMQP server on %s'), broker)
break
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
self.session = self.connection.session()
- for consumer in self.consumers.itervalues():
- consumer.reconnect(self.session)
-
if self.consumers:
+ consumers = self.consumers
+ self.consumers = {}
+
+ for consumer in consumers.itervalues():
+ consumer.reconnect(self.session)
+ self._register_consumer(consumer)
+
LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ if isinstance(exc, qpid_exceptions.Empty):
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ #
+ # We want to create a message with attributes, e.g. a TTL. We
+ # don't really need to keep 'msg' in its JSON format any longer
+ # so let's create an actual qpid message here and get some
+ # value-add on the go.
+ #
+ # WARNING: Request timeout happens to be in the same units as
+ # qpid's TTL (seconds). If this changes in the future, then this
+ # will need to be altered accordingly.
+ #
+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+ self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
return consumer
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+
+ consumer = TopicConsumer(conf=self.conf,
+ session=self.session,
+ topic=topic,
+ callback=callback_wrapper,
+ name=pool_name,
+ exchange_name=exchange_name)
+
+ self._register_consumer(consumer)
+ return consumer
+
def create_connection(conf, new=True):
"""Create a connection"""
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
# License for the specific language governing permissions and limitations
# under the License.
+import os
import pprint
import socket
-import string
import sys
import types
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from oslo.config import cfg
+from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
+from cinder.openstack.common import processutils as utils
from cinder.openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
+
# for convenience, are not modified.
pformat = pprint.pformat
Timeout = eventlet.timeout.Timeout
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
]
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
Error if a developer passes us bad data.
"""
try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
+ return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("JSON serialization failed."))
def _deserialize(data):
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = ZMQ_CTX.socket(zmq_type)
+ self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
pass
self.subscriptions = []
- # Linger -1 prevents lost/dropped messages
try:
- self.sock.close(linger=-1)
+ # Default is to linger
+ self.sock.close()
except Exception:
- pass
+ # While this is a bad thing to happen,
+ # it would be much worse if some of the code calling this
+ # were to fail. For now, lets log, and later evaluate
+ # if we can safely raise here.
+ LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ def cast(self, msg_id, topic, data, envelope=False):
+ msg_id = msg_id or 0
+
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
try:
result = proxy.dispatch(
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
+ except rpc_common.ClientException, e:
+ LOG.debug(_("Expected exception during message handling (%s)") %
+ e._exc_info[1])
+ return {'exc':
+ rpc_common.serialize_remote_exception(e._exc_info,
+ log_failure=False)}
except Exception:
+ LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
+ data.setdefault('version', None)
+ data.setdefault('args', {})
+
# Method starting with - are
# processed internally. (non-valid method name)
- method = data['method']
+ method = data.get('method')
+ if not method:
+ LOG.error(_("RPC message did not include method."))
+ return
# Internal method
# uses internal context for safety.
- if data['method'][0] == '-':
- # For reply / process_reply
- method = method[1:]
- if method == 'reply':
- self.private_ctx.reply(ctx, proxy, **data['args'])
+ if method == '-reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
return
- data.setdefault('version', None)
- data.setdefault('args', [])
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
- topic = topic.split('.', 1)[0]
+ topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ if topic not in self.topic_proxy:
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
+
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
+
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+
+ super(ZmqProxy, self).consume_in_thread()
+
+
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
+
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- ctx, request = _deserialize(in_msg)
- ctx = RpcContext.unmarshal(ctx)
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
- proxy = self.proxies[sock]
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
+ # Register with matchmaker.
+ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
+ _get_matchmaker().stop_heartbeat()
+ for topic in self.topics:
+ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
+ _get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, topic, msg, timeout=None,
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies.
- msg_id = str(uuid.uuid4().hex)
+ msg_id = uuid.uuid4().hex
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
- queues = matchmaker.queues(topic)
+ queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
- raise rpc_common.Timeout, "No match from matchmaker."
+ raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, msg, timeout,
+ envelope)
def create_connection(conf, new=True):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
-def notify(conf, context, topic, msg, **kwargs):
+def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
- topic.replace('.', '-')
- cast(conf, context, topic, msg, **kwargs)
+ topic = topic.replace('.', '-')
+ cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
+ if ZMQ_CTX:
+ ZMQ_CTX.term()
+ ZMQ_CTX = None
+
global matchmaker
matchmaker = None
- ZMQ_CTX.term()
- ZMQ_CTX = None
-def register_opts(conf):
- """Registration of options for this driver."""
- #NOTE(ewindisch): ZMQ_CTX and matchmaker
- # are initialized here as this is as good
- # an initialization method as any.
+def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
- # We memoize through these globals
global ZMQ_CTX
- global matchmaker
- global CONF
-
- if not CONF:
- conf.register_opts(zmq_opts)
- CONF = conf
- # Don't re-set, if this method is called twice.
if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
- if not matchmaker:
- # rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = conf.rpc_zmq_matchmaker.split('.')
- mm_module = '.'.join(mm_path[:-1])
- mm_class = mm_path[-1]
-
- # Only initialize a class.
- if mm_path[-1][0] not in string.ascii_uppercase:
- LOG.error(_("Matchmaker could not be loaded.\n"
- "rpc_zmq_matchmaker is not a class."))
- raise RPCException(_("Error loading Matchmaker."))
+ ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+ return ZMQ_CTX
- mm_impl = importutils.import_module(mm_module)
- mm_constructor = getattr(mm_impl, mm_class)
- matchmaker = mm_constructor()
-
-register_opts(cfg.CONF)
+def _get_matchmaker(*args, **kwargs):
+ global matchmaker
+ if not matchmaker:
+ matchmaker = importutils.import_object(
+ CONF.rpc_zmq_matchmaker, *args, **kwargs)
+ return matchmaker
import contextlib
import itertools
import json
-import logging
+import eventlet
from oslo.config import cfg
from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default='300',
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default='600',
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if len(self.hosts) == 0:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
- def __init__(self):
+ def __init__(self, host='localhost'):
+ self.host = host
super(Exchange, self).__init__()
def run(self, key):
- return [(key.split('.')[0] + '.localhost', 'localhost')]
+ return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
- i.e. "compute.host" sends a message to "compute" running on "host"
+ i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
- b, e = key.split('.', 1)
- return [(b, e)]
+ e = key.split('.', 1)[1]
+ return [(key, e)]
class MatchMakerRing(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
- def __init__(self):
+ def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), LocalhostExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+ cfg.StrOpt('host',
+ default='127.0.0.1',
+ help='Host to locate redis'),
+ cfg.IntOpt('port',
+ default=6379,
+ help='Use this port to connect to redis host.'),
+ cfg.StrOpt('password',
+ default=None,
+ help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+ title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+ def __init__(self, matchmaker):
+ self.matchmaker = matchmaker
+ self.redis = matchmaker.redis
+ super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def run(self, topic):
+ while True:
+ member_name = self.redis.srandmember(topic)
+
+ if not member_name:
+ # If this happens, there are no
+ # longer any members.
+ break
+
+ if not self.matchmaker.is_alive(topic, member_name):
+ continue
+
+ host = member_name.split('.', 1)[1]
+ return [(member_name, host)]
+ return []
+
+
+class RedisFanoutExchange(RedisExchange):
+ """
+ Return a list of all hosts.
+ """
+ def run(self, topic):
+ topic = topic.split('~', 1)[1]
+ hosts = self.redis.smembers(topic)
+ good_hosts = filter(
+ lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+ return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+ """
+ MatchMaker registering and looking-up hosts with a Redis server.
+ """
+ def __init__(self):
+ super(MatchMakerRedis, self).__init__()
+
+ if not redis:
+ raise ImportError("Failed to import module redis.")
+
+ self.redis = redis.StrictRedis(
+ host=CONF.matchmaker_redis.host,
+ port=CONF.matchmaker_redis.port,
+ password=CONF.matchmaker_redis.password)
+
+ self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+ self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+ self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+ def ack_alive(self, key, host):
+ topic = "%s.%s" % (key, host)
+ if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+ # If we could not update the expiration, the key
+ # might have been pruned. Re-register, creating a new
+ # key in Redis.
+ self.register(self.topic_host[host], host)
+
+ def is_alive(self, topic, host):
+ if self.redis.ttl(host) == -1:
+ self.expire(topic, host)
+ return False
+ return True
+
+ def expire(self, topic, host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.delete(host)
+ pipe.srem(topic, host)
+ pipe.execute()
+
+ def backend_register(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.sadd(key, key_host)
+
+ # No value is needed, we just
+ # care if it exists. Sets aren't viable
+ # because only keys can expire.
+ pipe.set(key_host, '')
+
+ pipe.execute()
+
+ def backend_unregister(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.srem(key, key_host)
+ pipe.delete(key_host)
+ pipe.execute()
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+ # Hook to allow the manager to do other initializations after
+ # the rpc connection is created.
+ if callable(getattr(self.manager, 'initialize_service_hook', None)):
+ self.manager.initialize_service_hook(self)
+
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.context,
volume_id)
+ @test.skip_test
def test_preattach_status_volume(self):
"""Ensure volume goes into pre-attaching state"""
instance_uuid = '12345678-1234-5678-1234-567812345678'