Final patch for blueprint common-rpc.
This patch removes cinder.rpc in favor of the copy in openstack-common.
Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
+from cinder.openstack.common import rpc
from cinder import quota
-from cinder import rpc
from cinder import utils
from cinder import version
from cinder.volume import volume_types
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
-from cinder import rpc
+from cinder.openstack.common import rpc
delete_exchange_opt = \
from cinder.db import base
from cinder import flags
from cinder import log as logging
-from cinder.rpc import dispatcher as rpc_dispatcher
+from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
-from cinder import rpc
+from cinder.openstack.common import rpc
LOG = logging.getLogger(__name__)
rpc_opts = [
cfg.StrOpt('rpc_backend',
- default='cinder.rpc.impl_kombu',
+ default='%s.impl_kombu' % __package__,
help="The messaging module to use, defaults to kombu."),
cfg.IntOpt('rpc_thread_pool_size',
default=64,
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from call or multicall'),
+ cfg.IntOpt('rpc_cast_timeout',
+ default=30,
+ help='Seconds to wait before a cast expires (TTL). '
+ 'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
- default=['cinder.exception'],
- help='Modules of exceptions that are permitted to be recreated'
- 'upon receiving exception data from an rpc call.'),
+ default=['cinder.openstack.common.exception',
+ 'nova.exception',
+ ],
+ help='Modules of exceptions that are permitted to be recreated'
+ 'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
- default='cinder',
+ default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- ]
+]
cfg.CONF.register_opts(rpc_opts)
"""Create a connection to the message bus used for rpc.
For some example usage of creating a connection and some consumers on that
- connection, see cinder.service.
+ connection, see nova.service.
:param new: Whether or not to create a new connection. A new connection
will be created by default. If new is False, the
implementation is free to return an existing connection from a
pool.
- :returns: An instance of cinder.rpc.common.Connection
+ :returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- cinder.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created
- with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
:returns: A dict from the remote method.
- :raises: cinder.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- cinder.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created
- with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- cinder.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created
- with fanout=True.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- cinder.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created
- with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
returned and X is the Nth value that was returned by the remote
method.
- :raises: cinder.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def queue_get_for(context, topic, host):
- """Get a queue name for a given topic + host."""
+ """Get a queue name for a given topic + host.
+
+ This function only works if this naming convention is followed on the
+ consumer side, as well. For example, in nova, every instance of the
+ nova-foo service calls create_consumer() for two topics:
+
+ foo
+ foo.<host>
+
+ Messages sent to the 'foo' topic are distributed to exactly one instance of
+ the nova-foo service. The services are chosen in a round-robin fashion.
+ Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
+ <host>.
+ """
return '%s.%s' % (topic, host)
"""Delay import of rpc_backend until configuration is loaded."""
global _RPCIMPL
if _RPCIMPL is None:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ try:
+ _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ except ImportError:
+ # For backwards compatibility with older nova config.
+ impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
+ _RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
# under the License.
"""
-Shared code between AMQP based cinder.rpc implementations.
+Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
from eventlet import semaphore
from cinder.openstack.common import excutils
+from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import local
-import cinder.rpc.common as rpc_common
+from cinder.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the caller of
- create_connection(). This is a essentially a wrapper around
- Connection that supports 'with' and can return a new Connection or
- one from a pool. It will also catch when an instance of this class
- is to be deleted so that we can return Connections to the pool on
- exceptions and so forth without making the caller be responsible for
- catching all exceptions and making sure to return a connection to
- the pool.
+ create_connection(). This is essentially a wrapper around
+ Connection that supports 'with'. It can also return a new
+ Connection, or one from a pool. The function will also catch
+ when an instance of this class is to be deleted. With that
+ we can return Connections to the pool on exceptions and so
+ forth without making the caller be responsible for catching
+ them. If possible the function makes sure to return a
+ connection to the pool.
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
if pooled:
self.connection = connection_pool.get()
else:
- self.connection = connection_pool.connection_cls(conf,
- server_params=server_params)
+ self.connection = connection_pool.connection_cls(
+ conf,
+ server_params=server_params)
self.pooled = pooled
def __enter__(self):
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
+ def create_worker(self, topic, proxy, pool_name):
+ self.connection.create_worker(topic, proxy, pool_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
+ def deepcopy(self):
+ values = self.to_dict()
+ values['conf'] = self.conf
+ values['msg_id'] = self.msg_id
+ return self.__class__(**values)
+
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
if self.msg_id:
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
- self._iterator = connection.iterconsume(
- timeout=timeout or conf.rpc_response_timeout)
+ self._iterator = connection.iterconsume(timeout=timeout or
+ conf.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
+ failure)
elif data.get('ending', False):
self._got_ending = True
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(conf, context, server_params, topic, msg,
- connection_pool):
+ connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.fanout_send(topic, msg)
import traceback
from cinder.openstack.common import cfg
+from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common import local
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
- topic. For example, all instances of cinder-compute
- consume from a queue called "compute". In that case, the
+ topic. For example, all instances of nova-compute consume
+ from a queue called "compute". In that case, the
messages will get distributed amongst the consumers in a
round-robin fashion if fanout=False. If fanout=True,
every consumer associated with this topic will get a
"""
raise NotImplementedError()
+ def create_worker(self, conf, topic, proxy, pool_name):
+ """Create a worker on this connection.
+
+ A worker is like a regular consumer of messages directed to a
+ topic, except that it is part of a set of such consumers (the
+ "pool") which may run in parallel. Every pool of workers will
+ receive a given message, but only one worker in the pool will
+ be asked to process it. Load is distributed across the members
+ of the pool in round-robin fashion.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic.
+ :param proxy: The object that will handle all incoming messages.
+ :param pool_name: String containing the name of the pool of workers
+ """
+ raise NotImplementedError()
+
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {
- 'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',),
- }
+ SANITIZE = {'set_admin_password': ('new_pass',),
+ 'run_instance': ('admin_password',), }
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
ex_type = type(failure)
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
- {'__str__': str_override})
+ {'__str__': str_override, '__unicode__': str_override})
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
def from_dict(cls, values):
return cls(**values)
+ def deepcopy(self):
+ return self.from_dict(self.to_dict())
+
def update_store(self):
local.store.context = self
def elevated(self, read_deleted=None, overwrite=False):
"""Return a version of this context with admin flag set."""
- # TODO(russellb) This method is a bit of a cinder-ism. It makes
+ # TODO(russellb) This method is a bit of a nova-ism. It makes
# some assumptions about the data in the request context sent
# across rpc, while the rest of this class does not. We could get
- # rid of this if we changed the cinder code that uses this to
+ # rid of this if we changed the nova code that uses this to
# convert the RpcContext back to its native RequestContext doing
- # something like
- # cinder.context.RequestContext.from_dict(ctxt.to_dict())
+ # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
- context = copy.deepcopy(self)
+ context = self.deepcopy()
context.values['is_admin'] = True
context.values.setdefault('roles', [])
server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code
base.
+
+
+EXAMPLES:
+
+Nova was the first project to use versioned rpc APIs. Consider the compute rpc
+API as an example. The client side is in nova/compute/rpcapi.py and the server
+side is in nova/compute/manager.py.
+
+
+Example 1) Adding a new method.
+
+Adding a new method is a backwards compatible change. It should be added to
+nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
+X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
+have a specific version specified to indicate the minimum API version that must
+be implemented for the method to be supported. For example:
+
+ def get_host_uptime(self, ctxt, host):
+ topic = _compute_topic(self.topic, ctxt, host, None)
+ return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
+ version='1.1')
+
+In this case, version '1.1' is the first version that supported the
+get_host_uptime() method.
+
+
+Example 2) Adding a new parameter.
+
+Adding a new parameter to an rpc method can be made backwards compatible. The
+RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
+The implementation of the method must not expect the parameter to be present.
+
+ def some_remote_method(self, arg1, arg2, newarg=None):
+ # The code needs to deal with newarg=None for cases
+ # where an older client sends a message without it.
+ pass
+
+On the client side, the same changes should be made as in example 1. The
+minimum version that supports the new parameter should be specified.
"""
-from cinder.rpc import common as rpc_common
+from cinder.openstack.common.rpc import common as rpc_common
class RpcDispatcher(object):
if not version:
version = '1.0'
+ had_compatible = False
for proxyobj in self.callbacks:
if hasattr(proxyobj, 'RPC_API_VERSION'):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
+ is_compatible = self._is_compatible(rpc_api_version, version)
+ had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
- if self._is_compatible(rpc_api_version, version):
+ if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
- raise rpc_common.UnsupportedRpcVersion(version=version)
+ if had_compatible:
+ raise AttributeError("No such RPC function '%s'" % method)
+ else:
+ raise rpc_common.UnsupportedRpcVersion(version=version)
"""
import inspect
-import json
-import signal
-import sys
import time
-import traceback
import eventlet
-from cinder.rpc import common as rpc_common
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
self._response = []
self._done = False
+ def deepcopy(self):
+ values = self.to_dict()
+ new_inst = self.__class__(**values)
+ new_inst._response = self._response
+ new_inst._done = self._done
+ return new_inst
+
def reply(self, reply=None, failure=None, ending=False):
if ending:
self._done = True
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- json.dumps(msg)
+ jsonutils.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, method, args, None)
+ consumer.call(context, version, method, args, None)
except Exception:
pass
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
import socket
import ssl
import eventlet
import greenlet
import kombu
+import kombu.connection
import kombu.entity
import kombu.messaging
-import kombu.connection
from cinder.openstack.common import cfg
-from cinder.rpc import amqp as rpc_amqp
-from cinder.rpc import common as rpc_common
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common.rpc import amqp as rpc_amqp
+from cinder.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
help=('SSL certification authority file '
- '(valid only if SSL enabled)')),
+ '(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
help='the RabbitMQ host'),
default=False,
help='use durable queues in RabbitMQ'),
- ]
+]
cfg.CONF.register_opts(kombu_opts)
"""
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=msg_id,
- type='direct',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(DirectConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=msg_id,
- exchange=exchange,
- routing_key=msg_id,
- **options)
+ exchange = kombu.entity.Exchange(name=msg_id,
+ type='direct',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(DirectConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=msg_id,
+ exchange=exchange,
+ routing_key=msg_id,
+ **options)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
+ def __init__(self, conf, channel, topic, callback, tag, name=None,
+ **kwargs):
"""Init a 'topic' queue.
- 'channel' is the amqp channel to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
+ :param channel: the amqp channel to use
+ :param topic: the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param tag: a unique ID for the consumer on the channel
+ :param name: optional queue name, defaults to topic
+ :paramtype name: str
- Other kombu options may be passed
+ Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=conf.control_exchange,
- type='topic',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(TopicConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=topic,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=conf.control_exchange,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(TopicConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=name or topic,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class FanoutConsumer(ConsumerBase):
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=exchange_name,
- type='fanout',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(FanoutConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=queue_name,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(FanoutConsumer, self).__init__(channel, callback, tag,
+ name=queue_name,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class Publisher(object):
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
- **self.kwargs)
+ **self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
- channel=channel, routing_key=self.routing_key)
+ channel=channel,
+ routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(DirectPublisher, self).__init__(channel,
- msg_id,
- msg_id,
- type='direct',
- **options)
+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+ type='direct', **options)
class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- super(TopicPublisher, self).__init__(channel,
- conf.control_exchange,
- topic,
- type='topic',
- **options)
+ super(TopicPublisher, self).__init__(channel, conf.control_exchange,
+ topic, type='topic', **options)
class FanoutPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(FanoutPublisher, self).__init__(channel,
- '%s_fanout' % topic,
- None,
- type='fanout',
- **options)
+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+ None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
- exchange=self.exchange,
- durable=self.durable,
- name=self.routing_key,
- routing_key=self.routing_key)
+ exchange=self.exchange,
+ durable=self.durable,
+ name=self.routing_key,
+ routing_key=self.routing_key)
queue.declare()
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection_errors:
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(
- **self.params)
+ self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ ' unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
- self.consumer_num.next())
+ self.consumer_num.next())
self.consumers.append(consumer)
return consumer
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
info['do_consume'] = True
def _consume():
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
- In cinder's use, this is generally a msg_id queue used for
+ In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
else:
self.declare_topic_consumer(topic, proxy_cb)
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
+
def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.notify(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
-import json
import logging
import time
import uuid
import qpid.messaging.exceptions
from cinder.openstack.common import cfg
-from cinder.rpc import amqp as rpc_amqp
-from cinder.rpc import common as rpc_common
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common.rpc import amqp as rpc_amqp
+from cinder.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
- ]
+]
cfg.CONF.register_opts(qpid_opts)
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
- self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
"""
super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback):
+ def __init__(self, conf, session, topic, callback, name=None):
"""Init a 'topic' queue.
- 'session' is the amqp session to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
+ :param session: the amqp session to use
+ :param topic: is the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param name: optional queue name, defaults to topic
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (conf.control_exchange, topic), {},
- topic, {})
+ "%s/%s" % (conf.control_exchange,
+ topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(FanoutConsumer, self).__init__(session, callback,
- "%s_fanout" % topic,
- {"durable": False, "type": "fanout"},
- "%s_fanout_%s" % (topic, uuid.uuid4().hex),
- {"exclusive": True})
+ super(FanoutConsumer, self).__init__(
+ session, callback,
+ "%s_fanout" % topic,
+ {"durable": False, "type": "fanout"},
+ "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+ {"exclusive": True})
class Publisher(object):
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
- self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic))
+ super(TopicPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
- super(FanoutPublisher, self).__init__(session,
- "%s_fanout" % topic, {"type": "fanout"})
+ super(FanoutPublisher, self).__init__(
+ session,
+ "%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic),
- {"durable": True})
+ super(NotifyPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic),
+ {"durable": True})
class Connection(object):
server_params = {}
default_params = dict(hostname=self.conf.qpid_hostname,
- port=self.conf.qpid_port,
- username=self.conf.qpid_username,
- password=self.conf.qpid_password)
+ port=self.conf.qpid_port,
+ username=self.conf.qpid_username,
+ password=self.conf.qpid_password)
params = server_params
for key in default_params.keys():
self.connection.reconnect = self.conf.qpid_reconnect
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
+ self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
+ self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
+ self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
publisher = cls(self.conf, self.session, topic)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
- In cinder's use, this is generally a msg_id queue used for
+ In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
return consumer
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+ name=pool_name)
+
+ self._register_consumer(consumer)
+
+ return consumer
+
def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
- msg, rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import pprint
+import socket
+import string
+import sys
+import types
+import uuid
+
+import eventlet
+from eventlet.green import zmq
+import greenlet
+
+from cinder.openstack.common import cfg
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import importutils
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common.rpc import common as rpc_common
+
+
+# for convenience, are not modified.
+pformat = pprint.pformat
+Timeout = eventlet.timeout.Timeout
+LOG = rpc_common.LOG
+RemoteError = rpc_common.RemoteError
+RPCException = rpc_common.RPCException
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ # The module.Class to use for matchmaking.
+ cfg.StrOpt(
+ 'rpc_zmq_matchmaker',
+ default=('cinder.openstack.common.rpc.'
+ 'matchmaker.MatchMakerLocalhost'),
+ help='MatchMaker driver',
+ ),
+
+ # The following port is unassigned by IANA as of 2012-05-21
+ cfg.IntOpt('rpc_zmq_port', default=9501,
+ help='ZeroMQ receiver listening port'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ help='Number of ZeroMQ contexts, defaults to 1'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ help='Directory for holding IPC sockets'),
+
+ cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+ help='Name of this node. Must be a valid hostname, FQDN, or '
+ 'IP address. Must match "host" option, if running Nova.')
+]
+
+
+# These globals are defined in register_opts(conf),
+# a mandatory initialization call
+FLAGS = None
+ZMQ_CTX = None # ZeroMQ Context, must be global.
+matchmaker = None # memoized matchmaker object
+
+
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
+class ZmqSocket(object):
+ """
+ A tiny wrapper around ZeroMQ to simplify the send/recv protocol
+ and connection management.
+
+ Can be used as a Context (supports the 'with' statement).
+ """
+
+ def __init__(self, addr, zmq_type, bind=True, subscribe=None):
+ self.sock = ZMQ_CTX.socket(zmq_type)
+ self.addr = addr
+ self.type = zmq_type
+ self.subscriptions = []
+
+ # Support failures on sending/receiving on wrong socket type.
+ self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
+ self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
+ self.can_sub = zmq_type in (zmq.SUB, )
+
+ # Support list, str, & None for subscribe arg (cast to list)
+ do_sub = {
+ list: subscribe,
+ str: [subscribe],
+ type(None): []
+ }[type(subscribe)]
+
+ for f in do_sub:
+ self.subscribe(f)
+
+ str_data = {'addr': addr, 'type': self.socket_s(),
+ 'subscribe': subscribe, 'bind': bind}
+
+ LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
+ LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
+ LOG.debug(_("-> bind: %(bind)s"), str_data)
+
+ try:
+ if bind:
+ self.sock.bind(addr)
+ else:
+ self.sock.connect(addr)
+ except Exception:
+ raise RPCException(_("Could not open socket."))
+
+ def socket_s(self):
+ """Get socket type as string."""
+ t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
+ 'DEALER')
+ return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
+
+ def subscribe(self, msg_filter):
+ """Subscribe."""
+ if not self.can_sub:
+ raise RPCException("Cannot subscribe on this socket.")
+ LOG.debug(_("Subscribing to %s"), msg_filter)
+
+ try:
+ self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
+ except Exception:
+ return
+
+ self.subscriptions.append(msg_filter)
+
+ def unsubscribe(self, msg_filter):
+ """Unsubscribe."""
+ if msg_filter not in self.subscriptions:
+ return
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
+ self.subscriptions.remove(msg_filter)
+
+ def close(self):
+ if self.sock is None or self.sock.closed:
+ return
+
+ # We must unsubscribe, or we'll leak descriptors.
+ if len(self.subscriptions) > 0:
+ for f in self.subscriptions:
+ try:
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
+ except Exception:
+ pass
+ self.subscriptions = []
+
+ # Linger -1 prevents lost/dropped messages
+ try:
+ self.sock.close(linger=-1)
+ except Exception:
+ pass
+ self.sock = None
+
+ def recv(self):
+ if not self.can_recv:
+ raise RPCException(_("You cannot recv on this socket."))
+ return self.sock.recv_multipart()
+
+ def send(self, data):
+ if not self.can_send:
+ raise RPCException(_("You cannot send on this socket."))
+ self.sock.send_multipart(data)
+
+
+class ZmqClient(object):
+ """Client for ZMQ sockets."""
+
+ def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+ def cast(self, msg_id, topic, data):
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
+
+ def close(self):
+ self.outq.close()
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call."""
+ def __init__(self, **kwargs):
+ self.replies = []
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['replies'] = self.replies
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ return
+ self.replies.append(reply)
+
+ @classmethod
+ def marshal(self, ctx):
+ ctx_data = ctx.to_dict()
+ return _serialize(ctx_data)
+
+ @classmethod
+ def unmarshal(self, data):
+ return RpcContext.from_dict(_deserialize(data))
+
+
+class InternalContext(object):
+ """Used by ConsumerBase as a private context for - methods."""
+
+ def __init__(self, proxy):
+ self.proxy = proxy
+ self.msg_waiter = None
+
+ def _get_response(self, ctx, proxy, topic, data):
+ """Process a curried message and cast the result to topic."""
+ LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+
+ try:
+ result = proxy.dispatch(
+ ctx, data['version'], data['method'], **data['args'])
+ return ConsumerBase.normalize_reply(result, ctx.replies)
+ except greenlet.GreenletExit:
+ # ignore these since they are just from shutdowns
+ pass
+ except Exception:
+ return {'exc':
+ rpc_common.serialize_remote_exception(sys.exc_info())}
+
+ def reply(self, ctx, proxy,
+ msg_id=None, context=None, topic=None, msg=None):
+ """Reply to a casted call."""
+ # Our real method is curried into msg['args']
+
+ child_ctx = RpcContext.unmarshal(msg[0])
+ response = ConsumerBase.normalize_reply(
+ self._get_response(child_ctx, proxy, topic, msg[1]),
+ ctx.replies)
+
+ LOG.debug(_("Sending reply"))
+ cast(FLAGS, ctx, topic, {
+ 'method': '-process_reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'response': response
+ }
+ })
+
+
+class ConsumerBase(object):
+ """Base Consumer."""
+
+ def __init__(self):
+ self.private_ctx = InternalContext(None)
+
+ @classmethod
+ def normalize_reply(self, result, replies):
+ #TODO(ewindisch): re-evaluate and document this method.
+ if isinstance(result, types.GeneratorType):
+ return list(result)
+ elif replies:
+ return replies
+ else:
+ return [result]
+
+ def process(self, style, target, proxy, ctx, data):
+ # Method starting with - are
+ # processed internally. (non-valid method name)
+ method = data['method']
+
+ # Internal method
+ # uses internal context for safety.
+ if data['method'][0] == '-':
+ # For reply / process_reply
+ method = method[1:]
+ if method == 'reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
+ return
+
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+ proxy.dispatch(ctx, data['version'],
+ data['method'], **data['args'])
+
+
+class ZmqBaseReactor(ConsumerBase):
+ """
+ A consumer class implementing a
+ centralized casting broker (PULL-PUSH)
+ for RoundRobin requests.
+ """
+
+ def __init__(self, conf):
+ super(ZmqBaseReactor, self).__init__()
+
+ self.conf = conf
+ self.mapping = {}
+ self.proxies = {}
+ self.threads = []
+ self.sockets = []
+ self.subscribe = {}
+
+ self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
+
+ def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
+ zmq_type_out=None, in_bind=True, out_bind=True,
+ subscribe=None):
+
+ LOG.info(_("Registering reactor"))
+
+ if zmq_type_in not in (zmq.PULL, zmq.SUB):
+ raise RPCException("Bad input socktype")
+
+ # Items push in.
+ inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
+ subscribe=subscribe)
+
+ self.proxies[inq] = proxy
+ self.sockets.append(inq)
+
+ LOG.info(_("In reactor registered"))
+
+ if not out_addr:
+ return
+
+ if zmq_type_out not in (zmq.PUSH, zmq.PUB):
+ raise RPCException("Bad output socktype")
+
+ # Items push out.
+ outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
+
+ self.mapping[inq] = outq
+ self.mapping[outq] = inq
+ self.sockets.append(outq)
+
+ LOG.info(_("Out reactor registered"))
+
+ def consume_in_thread(self):
+ def _consume(sock):
+ LOG.info(_("Consuming socket"))
+ while True:
+ self.consume(sock)
+
+ for k in self.proxies.keys():
+ self.threads.append(
+ self.pool.spawn(_consume, k)
+ )
+
+ def wait(self):
+ for t in self.threads:
+ t.wait()
+
+ def close(self):
+ for s in self.sockets:
+ s.close()
+
+ for t in self.threads:
+ t.kill()
+
+
+class ZmqProxy(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ topic-based proxy, forwarding to
+ IPC sockets.
+ """
+
+ def __init__(self, conf):
+ super(ZmqProxy, self).__init__(conf)
+
+ self.topic_proxy = {}
+ ipc_dir = conf.rpc_zmq_ipc_dir
+
+ self.topic_proxy['zmq_replies'] = \
+ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
+ zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['zmq_replies'])
+
+ def consume(self, sock):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ msg_id, topic, style, in_msg = data
+ topic = topic.split('.', 1)[0]
+
+ LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+ # Handle zmq_replies magic
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ elif topic.startswith('zmq_replies'):
+ sock_type = zmq.PUB
+ inside = _deserialize(in_msg)
+ msg_id = inside[-1]['args']['msg_id']
+ response = inside[-1]['args']['response']
+ LOG.debug(_("->response->%s"), response)
+ data = [str(msg_id), _serialize(response)]
+ else:
+ sock_type = zmq.PUSH
+
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
+ self.topic_proxy[topic].send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+
+
+class ZmqReactor(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ consumer for messages. Can also be
+ used as a 1:1 proxy
+ """
+
+ def __init__(self, conf):
+ super(ZmqReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+ if sock in self.mapping:
+ LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
+ 'data': data})
+ self.mapping[sock].send(data)
+ return
+
+ msg_id, topic, style, in_msg = data
+
+ ctx, request = _deserialize(in_msg)
+ ctx = RpcContext.unmarshal(ctx)
+
+ proxy = self.proxies[sock]
+
+ self.pool.spawn_n(self.process, style, topic,
+ proxy, ctx, request)
+
+
+class Connection(rpc_common.Connection):
+ """Manages connections and threads."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.reactor = ZmqReactor(conf)
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ LOG.info(_("Create Consumer for topic (%(topic)s)") %
+ {'topic': topic})
+
+ # Subscription scenarios
+ if fanout:
+ subscribe = ('', fanout)[type(fanout) == str]
+ sock_type = zmq.SUB
+ topic = 'fanout~' + topic
+ else:
+ sock_type = zmq.PULL
+ subscribe = None
+
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (self.conf.rpc_zmq_ipc_dir, topic)
+
+ LOG.debug(_("Consumer is a zmq.%s"),
+ ['PULL', 'SUB'][sock_type == zmq.SUB])
+
+ self.reactor.register(proxy, inaddr, sock_type,
+ subscribe=subscribe, in_bind=False)
+
+ def close(self):
+ self.reactor.close()
+
+ def wait(self):
+ self.reactor.wait()
+
+ def consume_in_thread(self):
+ self.reactor.consume_in_thread()
+
+
+def _cast(addr, context, msg_id, topic, msg, timeout=None):
+ timeout_cast = timeout or FLAGS.rpc_cast_timeout
+ payload = [RpcContext.marshal(context), msg]
+
+ with Timeout(timeout_cast, exception=rpc_common.Timeout):
+ try:
+ conn = ZmqClient(addr)
+
+ # assumes cast can't return an exception
+ conn.cast(msg_id, topic, payload)
+ except zmq.ZMQError:
+ raise RPCException("Cast failed. ZMQ Socket Exception")
+ finally:
+ if 'conn' in vars():
+ conn.close()
+
+
+def _call(addr, context, msg_id, topic, msg, timeout=None):
+ # timeout_response is how long we wait for a response
+ timeout = timeout or FLAGS.rpc_response_timeout
+
+ # The msg_id is used to track replies.
+ msg_id = str(uuid.uuid4().hex)
+
+ # Replies always come into the reply service.
+ reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+
+ LOG.debug(_("Creating payload"))
+ # Curry the original request into a reply method.
+ mcontext = RpcContext.marshal(context)
+ payload = {
+ 'method': '-reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'context': mcontext,
+ 'topic': reply_topic,
+ 'msg': [mcontext, msg]
+ }
+ }
+
+ LOG.debug(_("Creating queue socket for reply waiter"))
+
+ # Messages arriving async.
+ # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
+ with Timeout(timeout, exception=rpc_common.Timeout):
+ try:
+ msg_waiter = ZmqSocket(
+ "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+ zmq.SUB, subscribe=msg_id, bind=False
+ )
+
+ LOG.debug(_("Sending cast"))
+ _cast(addr, context, msg_id, topic, payload)
+
+ LOG.debug(_("Cast sent; Waiting reply"))
+ # Blocks until receives reply
+ msg = msg_waiter.recv()
+ LOG.debug(_("Received message: %s"), msg)
+ LOG.debug(_("Unpacking response"))
+ responses = _deserialize(msg[-1])
+ # ZMQError trumps the Timeout error.
+ except zmq.ZMQError:
+ raise RPCException("ZMQ Socket Error")
+ finally:
+ if 'msg_waiter' in vars():
+ msg_waiter.close()
+
+ # It seems we don't need to do all of the following,
+ # but perhaps it would be useful for multicall?
+ # One effect of this is that we're checking all
+ # responses for Exceptions.
+ for resp in responses:
+ if isinstance(resp, types.DictType) and 'exc' in resp:
+ raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+
+ return responses[-1]
+
+
+def _multi_send(method, context, topic, msg, timeout=None):
+ """
+ Wraps the sending of messages,
+ dispatches to the matchmaker and sends
+ message to all relevant hosts.
+ """
+ conf = FLAGS
+ LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+
+ queues = matchmaker.queues(topic)
+ LOG.debug(_("Sending message(s) to: %s"), queues)
+
+ # Don't stack if we have no matchmaker results
+ if len(queues) == 0:
+ LOG.warn(_("No matchmaker results. Not casting."))
+ # While not strictly a timeout, callers know how to handle
+ # this exception and a timeout isn't too big a lie.
+ raise rpc_common.Timeout, "No match from matchmaker."
+
+ # This supports brokerless fanout (addresses > 1)
+ for queue in queues:
+ (_topic, ip_addr) = queue
+ _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
+
+ if method.__name__ == '_cast':
+ eventlet.spawn_n(method, _addr, context,
+ _topic, _topic, msg, timeout)
+ return
+ return method(_addr, context, _topic, _topic, msg, timeout)
+
+
+def create_connection(conf, new=True):
+ return Connection(conf)
+
+
+def multicall(conf, *args, **kwargs):
+ """Multiple calls."""
+ register_opts(conf)
+ return _multi_send(_call, *args, **kwargs)
+
+
+def call(conf, *args, **kwargs):
+ """Send a message, expect a response."""
+ register_opts(conf)
+ data = _multi_send(_call, *args, **kwargs)
+ return data[-1]
+
+
+def cast(conf, *args, **kwargs):
+ """Send a message expecting no reply."""
+ register_opts(conf)
+ _multi_send(_cast, *args, **kwargs)
+
+
+def fanout_cast(conf, context, topic, msg, **kwargs):
+ """Send a message to all listening and expect no reply."""
+ register_opts(conf)
+ # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
+ # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+def notify(conf, context, topic, msg, **kwargs):
+ """
+ Send notification event.
+ Notifications are sent to topic-priority.
+ This differs from the AMQP drivers which send to topic.priority.
+ """
+ register_opts(conf)
+ # NOTE(ewindisch): dot-priority in rpc notifier does not
+ # work with our assumptions.
+ topic.replace('.', '-')
+ cast(conf, context, topic, msg, **kwargs)
+
+
+def cleanup():
+ """Clean up resources in use by implementation."""
+ global ZMQ_CTX
+ global matchmaker
+ matchmaker = None
+ ZMQ_CTX.destroy()
+ ZMQ_CTX = None
+
+
+def register_opts(conf):
+ """Registration of options for this driver."""
+ #NOTE(ewindisch): ZMQ_CTX and matchmaker
+ # are initialized here as this is as good
+ # an initialization method as any.
+
+ # We memoize through these globals
+ global ZMQ_CTX
+ global matchmaker
+ global FLAGS
+
+ if not FLAGS:
+ conf.register_opts(zmq_opts)
+ FLAGS = conf
+ # Don't re-set, if this method is called twice.
+ if not ZMQ_CTX:
+ ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ if not matchmaker:
+ # rpc_zmq_matchmaker should be set to a 'module.Class'
+ mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_module = '.'.join(mm_path[:-1])
+ mm_class = mm_path[-1]
+
+ # Only initialize a class.
+ if mm_path[-1][0] not in string.ascii_uppercase:
+ LOG.error(_("Matchmaker could not be loaded.\n"
+ "rpc_zmq_matchmaker is not a class."))
+ raise RPCException(_("Error loading Matchmaker."))
+
+ mm_impl = importutils.import_module(mm_module)
+ mm_constructor = getattr(mm_impl, mm_class)
+ matchmaker = mm_constructor()
+
+
+register_opts(cfg.CONF)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import contextlib
+import itertools
+import json
+import logging
+
+from cinder.openstack.common import cfg
+from cinder.openstack.common.gettextutils import _
+
+
+matchmaker_opts = [
+ # Matchmaker ring file
+ cfg.StrOpt('matchmaker_ringfile',
+ default='/etc/nova/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts)
+LOG = logging.getLogger(__name__)
+contextmanager = contextlib.contextmanager
+
+
+class MatchMakerException(Exception):
+ """Signified a match could not be found."""
+ message = _("Match not found by MatchMaker.")
+
+
+class Exchange(object):
+ """
+ Implements lookups.
+ Subclass this to support hashtables, dns, etc.
+ """
+ def __init__(self):
+ pass
+
+ def run(self, key):
+ raise NotImplementedError()
+
+
+class Binding(object):
+ """
+ A binding on which to perform a lookup.
+ """
+ def __init__(self):
+ pass
+
+ def test(self, key):
+ raise NotImplementedError()
+
+
+class MatchMakerBase(object):
+ """Match Maker Base Class."""
+
+ def __init__(self):
+ # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+ self.bindings = []
+
+ def add_binding(self, binding, rule, last=True):
+ self.bindings.append((binding, rule, False, last))
+
+ #NOTE(ewindisch): kept the following method in case we implement the
+ # underlying support.
+ #def add_negate_binding(self, binding, rule, last=True):
+ # self.bindings.append((binding, rule, True, last))
+
+ def queues(self, key):
+ workers = []
+
+ # bit is for negate bindings - if we choose to implement it.
+ # last stops processing rules if this matches.
+ for (binding, exchange, bit, last) in self.bindings:
+ if binding.test(key):
+ workers.extend(exchange.run(key))
+
+ # Support last.
+ if last:
+ return workers
+ return workers
+
+
+class DirectBinding(Binding):
+ """
+ Specifies a host in the key via a '.' character
+ Although dots are used in the key, the behavior here is
+ that it maps directly to a host, thus direct.
+ """
+ def test(self, key):
+ if '.' in key:
+ return True
+ return False
+
+
+class TopicBinding(Binding):
+ """
+ Where a 'bare' key without dots.
+ AMQP generally considers topic exchanges to be those *with* dots,
+ but we deviate here in terminology as the behavior here matches
+ that of a topic exchange (whereas where there are dots, behavior
+ matches that of a direct exchange.
+ """
+ def test(self, key):
+ if '.' not in key:
+ return True
+ return False
+
+
+class FanoutBinding(Binding):
+ """Match on fanout keys, where key starts with 'fanout.' string."""
+ def test(self, key):
+ if key.startswith('fanout~'):
+ return True
+ return False
+
+
+class StubExchange(Exchange):
+ """Exchange that does nothing."""
+ def run(self, key):
+ return [(key, None)]
+
+
+class RingExchange(Exchange):
+ """
+ Match Maker where hosts are loaded from a static file containing
+ a hashmap (JSON formatted).
+
+ __init__ takes optional ring dictionary argument, otherwise
+ loads the ringfile from CONF.mathcmaker_ringfile.
+ """
+ def __init__(self, ring=None):
+ super(RingExchange, self).__init__()
+
+ if ring:
+ self.ring = ring
+ else:
+ fh = open(CONF.matchmaker_ringfile, 'r')
+ self.ring = json.load(fh)
+ fh.close()
+
+ self.ring0 = {}
+ for k in self.ring.keys():
+ self.ring0[k] = itertools.cycle(self.ring[k])
+
+ def _ring_has(self, key):
+ if key in self.ring0:
+ return True
+ return False
+
+
+class RoundRobinRingExchange(RingExchange):
+ """A Topic Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(RoundRobinRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ if not self._ring_has(key):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
+ )
+ return []
+ host = next(self.ring0[key])
+ return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(FanoutRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "fanout~", strip it for lookup.
+ nkey = key.split('fanout~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class LocalhostExchange(Exchange):
+ """Exchange where all direct topics are local."""
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ return [(key.split('.')[0] + '.localhost', 'localhost')]
+
+
+class DirectExchange(Exchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ b, e = key.split('.', 1)
+ return [(b, e)]
+
+
+class MatchMakerRing(MatchMakerBase):
+ """
+ Match Maker where hosts are loaded from a static hashmap.
+ """
+ def __init__(self, ring=None):
+ super(MatchMakerRing, self).__init__()
+ self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
+
+
+class MatchMakerLocalhost(MatchMakerBase):
+ """
+ Match Maker where all bare topics resolve to localhost.
+ Useful for testing.
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange())
+
+
+class MatchMakerStub(MatchMakerBase):
+ """
+ Match Maker where topics are untouched.
+ Useful for testing, or for AMQP/brokered queues.
+ Will not work where knowledge of hosts is known (i.e. zeromq)
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+
+ self.add_binding(FanoutBinding(), StubExchange())
+ self.add_binding(DirectBinding(), StubExchange())
+ self.add_binding(TopicBinding(), StubExchange())
"""
-from cinder import rpc
+from cinder.openstack.common import rpc
class RpcProxy(object):
rpc.fanout_cast(context, self.topic, msg)
def cast_to_server(self, context, server_params, msg, topic=None,
- version=None):
+ version=None):
"""rpc.cast_to_server() a remote method.
:param context: The request context
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
+from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
-from cinder import rpc
-from cinder.rpc import common as rpc_common
from cinder import utils
"""
from cinder import flags
-import cinder.rpc.proxy
+import cinder.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
-class SchedulerAPI(cinder.rpc.proxy.RpcProxy):
+class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
-from cinder import rpc
+from cinder.openstack.common import rpc
from cinder import utils
from cinder import version
from cinder import wsgi
conf.set_default('volume_driver', 'cinder.volume.driver.FakeISCSIDriver')
conf.set_default('connection_type', 'fake')
conf.set_default('fake_rabbit', True)
- conf.set_default('rpc_backend', 'cinder.rpc.impl_fake')
+ conf.set_default('rpc_backend', 'cinder.openstack.common.rpc.impl_fake')
conf.set_default('iscsi_num_targets', 8)
conf.set_default('verbose', True)
conf.set_default('sql_connection', "sqlite://")
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
-from cinder.tests import *
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls shared between all implementations
-"""
-
-import time
-
-from eventlet import greenthread
-import nose
-
-from cinder import context
-from cinder import exception
-from cinder import flags
-from cinder import log as logging
-from cinder.rpc import amqp as rpc_amqp
-from cinder.rpc import common as rpc_common
-from cinder.rpc import dispatcher as rpc_dispatcher
-from cinder import test
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class BaseRpcTestCase(test.TestCase):
- def setUp(self, supports_timeouts=True):
- super(BaseRpcTestCase, self).setUp()
- self.supports_timeouts = supports_timeouts
- self.context = context.get_admin_context()
- if self.rpc:
- self.conn = self.rpc.create_connection(FLAGS, True)
- receiver = TestReceiver()
- self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver])
- self.conn.create_consumer('test', self.dispatcher, False)
- self.conn.consume_in_thread()
-
- def tearDown(self):
- if self.rpc:
- self.conn.close()
- super(BaseRpcTestCase, self).tearDown()
-
- def test_call_succeed(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
- {"method": "echo", "args": {"value": value}})
- self.assertEqual(value, result)
-
- def test_call_succeed_despite_multiple_returns_yield(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
- {"method": "echo_three_times_yield",
- "args": {"value": value}})
- self.assertEqual(value + 2, result)
-
- def test_multicall_succeed_once(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- 'test',
- {"method": "echo",
- "args": {"value": value}})
- for i, x in enumerate(result):
- if i > 0:
- self.fail('should only receive one response')
- self.assertEqual(value + i, x)
-
- def test_multicall_three_nones(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- 'test',
- {"method": "multicall_three_nones",
- "args": {"value": value}})
- for i, x in enumerate(result):
- self.assertEqual(x, None)
- # i should have been 0, 1, and finally 2:
- self.assertEqual(i, 2)
-
- def test_multicall_succeed_three_times_yield(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- 'test',
- {"method": "echo_three_times_yield",
- "args": {"value": value}})
- for i, x in enumerate(result):
- self.assertEqual(value + i, x)
-
- def test_context_passed(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Makes sure a context is passed through rpc call."""
- value = 42
- result = self.rpc.call(FLAGS, self.context,
- 'test', {"method": "context",
- "args": {"value": value}})
- self.assertEqual(self.context.to_dict(), result)
-
- def test_nested_calls(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Test that we can do an rpc.call inside another call."""
- class Nested(object):
- @staticmethod
- def echo(context, queue, value):
- """Calls echo in the passed queue"""
- LOG.debug(_("Nested received %(queue)s, %(value)s")
- % locals())
- # TODO(comstud):
- # so, it will replay the context and use the same REQID?
- # that's bizarre.
- ret = self.rpc.call(FLAGS, context,
- queue,
- {"method": "echo",
- "args": {"value": value}})
- LOG.debug(_("Nested return %s"), ret)
- return value
-
- nested = Nested()
- dispatcher = rpc_dispatcher.RpcDispatcher([nested])
- conn = self.rpc.create_connection(FLAGS, True)
- conn.create_consumer('nested', dispatcher, False)
- conn.consume_in_thread()
- value = 42
- result = self.rpc.call(FLAGS, self.context,
- 'nested', {"method": "echo",
- "args": {"queue": "test",
- "value": value}})
- conn.close()
- self.assertEqual(value, result)
-
- def test_call_timeout(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Make sure rpc.call will time out"""
- if not self.supports_timeouts:
- raise nose.SkipTest(_("RPC backend does not support timeouts"))
-
- value = 42
- self.assertRaises(rpc_common.Timeout,
- self.rpc.call,
- FLAGS, self.context,
- 'test',
- {"method": "block",
- "args": {"value": value}}, timeout=1)
- try:
- self.rpc.call(FLAGS, self.context,
- 'test',
- {"method": "block",
- "args": {"value": value}},
- timeout=1)
- self.fail("should have thrown Timeout")
- except rpc_common.Timeout as exc:
- pass
-
-
-class BaseRpcAMQPTestCase(BaseRpcTestCase):
- """Base test class for all AMQP-based RPC tests"""
- def test_proxycallback_handles_exceptions(self):
- """Make sure exceptions unpacking messages don't cause hangs."""
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- orig_unpack = rpc_amqp.unpack_context
-
- info = {'unpacked': False}
-
- def fake_unpack_context(*args, **kwargs):
- info['unpacked'] = True
- raise test.TestingException('moo')
-
- self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
-
- value = 41
- self.rpc.cast(FLAGS, self.context, 'test',
- {"method": "echo", "args": {"value": value}})
-
- # Wait for the cast to complete.
- for x in xrange(50):
- if info['unpacked']:
- break
- greenthread.sleep(0.1)
- else:
- self.fail("Timeout waiting for message to be consued")
-
- # Now see if we get a response even though we raised an
- # exception for the cast above.
- self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
- {"method": "echo",
- "args": {"value": value}})
- self.assertEqual(value, result)
-
-
-class TestReceiver(object):
- """Simple Proxy class so the consumer has methods to call.
-
- Uses static methods because we aren't actually storing any state.
-
- """
- @staticmethod
- def echo(context, value):
- """Simply returns whatever value is sent in."""
- LOG.debug(_("Received %s"), value)
- return value
-
- @staticmethod
- def context(context, value):
- """Returns dictionary version of context."""
- LOG.debug(_("Received %s"), context)
- return context.to_dict()
-
- @staticmethod
- def multicall_three_nones(context, value):
- yield None
- yield None
- yield None
-
- @staticmethod
- def echo_three_times_yield(context, value):
- yield value
- yield value + 1
- yield value + 2
-
- @staticmethod
- def fail(context, value):
- """Raises an exception with the value sent in."""
- raise NotImplementedError(value)
-
- @staticmethod
- def fail_converted(context, value):
- """Raises an exception with the value sent in."""
- raise exception.ConvertedException(explanation=value)
-
- @staticmethod
- def block(context, value):
- time.sleep(2)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack, LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for 'common' functons used through rpc code.
-"""
-
-import json
-import sys
-
-from cinder import context
-from cinder import exception
-from cinder import flags
-from cinder import log as logging
-from cinder import test
-from cinder.rpc import amqp as rpc_amqp
-from cinder.rpc import common as rpc_common
-from cinder.tests.rpc import common
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-def raise_exception():
- raise Exception("test")
-
-
-class FakeUserDefinedException(Exception):
- def __init__(self):
- Exception.__init__(self, "Test Message")
-
-
-class RpcCommonTestCase(test.TestCase):
- def test_serialize_remote_exception(self):
- expected = {
- 'class': 'Exception',
- 'module': 'exceptions',
- 'message': 'test',
- }
-
- try:
- raise_exception()
- except Exception as exc:
- failure = rpc_common.serialize_remote_exception(sys.exc_info())
-
- failure = json.loads(failure)
- #assure the traceback was added
- self.assertEqual(expected['class'], failure['class'])
- self.assertEqual(expected['module'], failure['module'])
- self.assertEqual(expected['message'], failure['message'])
-
- def test_serialize_remote_cinder_exception(self):
- def raise_cinder_exception():
- raise exception.CinderException("test", code=500)
-
- expected = {
- 'class': 'CinderException',
- 'module': 'cinder.exception',
- 'kwargs': {'code': 500},
- 'message': 'test'
- }
-
- try:
- raise_cinder_exception()
- except Exception as exc:
- failure = rpc_common.serialize_remote_exception(sys.exc_info())
-
- failure = json.loads(failure)
- #assure the traceback was added
- self.assertEqual(expected['class'], failure['class'])
- self.assertEqual(expected['module'], failure['module'])
- self.assertEqual(expected['kwargs'], failure['kwargs'])
- self.assertEqual(expected['message'], failure['message'])
-
- def test_deserialize_remote_exception(self):
- failure = {
- 'class': 'CinderException',
- 'module': 'cinder.exception',
- 'message': 'test message',
- 'tb': ['raise CinderException'],
- }
- serialized = json.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, exception.CinderException))
- self.assertTrue('test message' in unicode(after_exc))
- #assure the traceback was added
- self.assertTrue('raise CinderException' in unicode(after_exc))
-
- def test_deserialize_remote_exception_bad_module(self):
- failure = {
- 'class': 'popen2',
- 'module': 'os',
- 'kwargs': {'cmd': '/bin/echo failed'},
- 'message': 'foo',
- }
- serialized = json.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
-
- def test_deserialize_remote_exception_user_defined_exception(self):
- """Ensure a user defined exception can be deserialized."""
- self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
- failure = {
- 'class': 'FakeUserDefinedException',
- 'module': self.__class__.__module__,
- 'tb': ['raise FakeUserDefinedException'],
- }
- serialized = json.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, FakeUserDefinedException))
- #assure the traceback was added
- self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc))
-
- def test_deserialize_remote_exception_cannot_recreate(self):
- """Ensure a RemoteError is returned on initialization failure.
-
- If an exception cannot be recreated with it's original class then a
- RemoteError with the exception informations should still be returned.
-
- """
- self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
- failure = {
- 'class': 'FakeIDontExistException',
- 'module': self.__class__.__module__,
- 'tb': ['raise FakeIDontExistException'],
- }
- serialized = json.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
- #assure the traceback was added
- self.assertTrue('raise FakeIDontExistException' in unicode(after_exc))
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unit Tests for rpc.dispatcher
-"""
-
-from cinder import context
-from cinder.rpc import dispatcher
-from cinder.rpc import common as rpc_common
-from cinder import test
-
-
-class RpcDispatcherTestCase(test.TestCase):
- class API1(object):
- RPC_API_VERSION = '1.0'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- class API2(object):
- RPC_API_VERSION = '2.1'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- class API3(object):
- RPC_API_VERSION = '3.5'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- def setUp(self):
- self.ctxt = context.RequestContext('fake_user', 'fake_project')
- super(RpcDispatcherTestCase, self).setUp()
-
- def tearDown(self):
- super(RpcDispatcherTestCase, self).tearDown()
-
- def _test_dispatch(self, version, expectations):
- v2 = self.API2()
- v3 = self.API3()
- disp = dispatcher.RpcDispatcher([v2, v3])
-
- disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
-
- self.assertEqual(v2.test_method_ctxt, expectations[0])
- self.assertEqual(v2.test_method_arg1, expectations[1])
- self.assertEqual(v3.test_method_ctxt, expectations[2])
- self.assertEqual(v3.test_method_arg1, expectations[3])
-
- def test_dispatch(self):
- self._test_dispatch('2.1', (self.ctxt, 1, None, None))
- self._test_dispatch('3.5', (None, None, self.ctxt, 1))
-
- def test_dispatch_lower_minor_version(self):
- self._test_dispatch('2.0', (self.ctxt, 1, None, None))
- self._test_dispatch('3.1', (None, None, self.ctxt, 1))
-
- def test_dispatch_higher_minor_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '2.6', (None, None, None, None))
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '3.6', (None, None, None, None))
-
- def test_dispatch_lower_major_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '1.0', (None, None, None, None))
-
- def test_dispatch_higher_major_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '4.0', (None, None, None, None))
-
- def test_dispatch_no_version_uses_v1(self):
- v1 = self.API1()
- disp = dispatcher.RpcDispatcher([v1])
-
- disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
-
- self.assertEqual(v1.test_method_ctxt, self.ctxt)
- self.assertEqual(v1.test_method_arg1, 1)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using fake_impl
-"""
-
-from cinder import log as logging
-from cinder.rpc import impl_fake
-from cinder.tests.rpc import common
-
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcFakeTestCase(common.BaseRpcTestCase):
- def setUp(self):
- self.rpc = impl_fake
- super(RpcFakeTestCase, self).setUp()
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using kombu
-"""
-
-from cinder import context
-from cinder import exception
-from cinder import flags
-from cinder import log as logging
-from cinder import test
-from cinder.rpc import amqp as rpc_amqp
-from cinder.tests.rpc import common
-
-try:
- import kombu
- from cinder.rpc import impl_kombu
-except ImportError:
- kombu = None
- impl_kombu = None
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class MyException(Exception):
- pass
-
-
-def _raise_exc_stub(stubs, times, obj, method, exc_msg,
- exc_class=MyException):
- info = {'called': 0}
- orig_method = getattr(obj, method)
-
- def _raise_stub(*args, **kwargs):
- info['called'] += 1
- if info['called'] <= times:
- raise exc_class(exc_msg)
- orig_method(*args, **kwargs)
- stubs.Set(obj, method, _raise_stub)
- return info
-
-
-class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
- def setUp(self):
- if kombu:
- self.rpc = impl_kombu
- else:
- self.rpc = None
- super(RpcKombuTestCase, self).setUp()
-
- def tearDown(self):
- if kombu:
- impl_kombu.cleanup()
- super(RpcKombuTestCase, self).tearDown()
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_reusing_connection(self):
- """Test that reusing a connection returns same one."""
- conn_context = self.rpc.create_connection(FLAGS, new=False)
- conn1 = conn_context.connection
- conn_context.close()
- conn_context = self.rpc.create_connection(FLAGS, new=False)
- conn2 = conn_context.connection
- conn_context.close()
- self.assertEqual(conn1, conn2)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_topic_send_receive(self):
- """Test sending to a topic exchange/queue"""
-
- conn = self.rpc.create_connection(FLAGS)
- message = 'topic test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_topic_consumer('a_topic', _callback)
- conn.topic_send('a_topic', message)
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_direct_send_receive(self):
- """Test sending to a direct exchange/queue"""
- conn = self.rpc.create_connection(FLAGS)
- message = 'direct test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_cast_interface_uses_default_options(self):
- """Test kombu rpc.cast"""
-
- ctxt = context.RequestContext('fake_user', 'fake_project')
-
- class MyConnection(impl_kombu.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.params,
- {'hostname': FLAGS.rabbit_host,
- 'userid': FLAGS.rabbit_userid,
- 'password': FLAGS.rabbit_password,
- 'port': FLAGS.rabbit_port,
- 'virtual_host': FLAGS.rabbit_virtual_host,
- 'transport': 'memory'})
-
- def topic_send(_context, topic, msg):
- pass
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_kombu, 'Connection', MyConnection)
-
- impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_cast_to_server_uses_server_params(self):
- """Test kombu rpc.cast"""
-
- ctxt = context.RequestContext('fake_user', 'fake_project')
-
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337,
- 'virtual_host': 'fake_virtual_host'}
-
- class MyConnection(impl_kombu.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.params,
- {'hostname': server_params['hostname'],
- 'userid': server_params['username'],
- 'password': server_params['password'],
- 'port': server_params['port'],
- 'virtual_host': server_params['virtual_host'],
- 'transport': 'memory'})
-
- def topic_send(_context, topic, msg):
- pass
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_kombu, 'Connection', MyConnection)
-
- impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
- 'fake_topic', {'msg': 'fake'})
-
- @test.skip_test("kombu memory transport seems buggy with fanout queues "
- "as this test passes when you use rabbit (fake_rabbit=False)")
- def test_fanout_send_receive(self):
- """Test sending to a fanout exchange and consuming from 2 queues"""
-
- conn = self.rpc.create_connection()
- conn2 = self.rpc.create_connection()
- message = 'fanout test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_fanout_consumer('a_fanout', _callback)
- conn2.declare_fanout_consumer('a_fanout', _callback)
- conn.fanout_send('a_fanout', message)
-
- conn.consume(limit=1)
- conn.close()
- self.assertEqual(self.received_message, message)
-
- self.received_message = None
- conn2.consume(limit=1)
- conn2.close()
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_declare_consumer_errors_will_reconnect(self):
- # Test that any exception with 'timeout' in it causes a
- # reconnection
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
- '__init__', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 3)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- # Test that any exception in transport.connection_errors causes
- # a reconnection
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
- '__init__', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 2)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_declare_consumer_ioerrors_will_reconnect(self):
- """Test that an IOError exception causes a reconnection"""
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
- '__init__', 'Socket closed', exc_class=IOError)
-
- conn = self.rpc.Connection(FLAGS)
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 3)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_publishing_errors_will_reconnect(self):
- # Test that any exception with 'timeout' in it causes a
- # reconnection when declaring the publisher class and when
- # calling send()
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
- '__init__', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 3)
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
- 'send', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 3)
-
- # Test that any exception in transport.connection_errors causes
- # a reconnection when declaring the publisher class and when
- # calling send()
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
- '__init__', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 2)
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
- 'send', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 2)
-
- @test.skip_test("kombu memory transport hangs here on precise")
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_iterconsume_errors_will_reconnect(self):
- conn = self.rpc.Connection(FLAGS)
- message = 'reconnect test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
-
- info = _raise_exc_stub(self.stubs, 1, conn.connection,
- 'drain_events', 'foo timeout foo')
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
- # Only called once, because our stub goes away during reconnection
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_call_exception(self):
- """Test that exception gets passed back properly.
-
- rpc.call returns an Exception object. The value of the
- exception is converted to a string.
-
- """
- self.flags(allowed_rpc_exception_modules=['exceptions'])
- value = "This is the exception message"
- self.assertRaises(NotImplementedError,
- self.rpc.call,
- FLAGS,
- self.context,
- 'test',
- {"method": "fail",
- "args": {"value": value}})
- try:
- self.rpc.call(FLAGS, self.context,
- 'test',
- {"method": "fail",
- "args": {"value": value}})
- self.fail("should have thrown Exception")
- except NotImplementedError as exc:
- self.assertTrue(value in unicode(exc))
- #Traceback should be included in exception message
- self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_call_converted_exception(self):
- """Test that exception gets passed back properly.
-
- rpc.call returns an Exception object. The value of the
- exception is converted to a string.
-
- """
- value = "This is the exception message"
- self.assertRaises(exception.ConvertedException,
- self.rpc.call,
- FLAGS,
- self.context,
- 'test',
- {"method": "fail_converted",
- "args": {"value": value}})
- try:
- self.rpc.call(FLAGS, self.context,
- 'test',
- {"method": "fail_converted",
- "args": {"value": value}})
- self.fail("should have thrown Exception")
- except exception.ConvertedException as exc:
- self.assertTrue(value in unicode(exc))
- #Traceback should be included in exception message
- self.assertTrue('exception.ConvertedException' in unicode(exc))
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using kombu + ssl
-"""
-
-from cinder import flags
-from cinder import test
-
-try:
- import kombu
- from cinder.rpc import impl_kombu
-except ImportError:
- kombu = None
- impl_kombu = None
-
-
-# Flag settings we will ensure get passed to amqplib
-SSL_VERSION = "SSLv2"
-SSL_CERT = "/tmp/cert.blah.blah"
-SSL_CA_CERT = "/tmp/cert.ca.blah.blah"
-SSL_KEYFILE = "/tmp/keyfile.blah.blah"
-
-FLAGS = flags.FLAGS
-
-
-class RpcKombuSslTestCase(test.TestCase):
-
- def setUp(self):
- super(RpcKombuSslTestCase, self).setUp()
- if kombu:
- self.flags(kombu_ssl_keyfile=SSL_KEYFILE,
- kombu_ssl_ca_certs=SSL_CA_CERT,
- kombu_ssl_certfile=SSL_CERT,
- kombu_ssl_version=SSL_VERSION,
- rabbit_use_ssl=True)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_ssl_on_extended(self):
- rpc = impl_kombu
- conn = rpc.create_connection(FLAGS, True)
- c = conn.connection
- #This might be kombu version dependent...
- #Since we are now peaking into the internals of kombu...
- self.assertTrue(isinstance(c.connection.ssl, dict))
- self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version"))
- self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile"))
- self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs"))
- self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile"))
- #That hash then goes into amqplib which then goes
- #Into python ssl creation...
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unit Tests for rpc.proxy
-"""
-
-import copy
-
-from cinder import context
-from cinder import rpc
-from cinder.rpc import proxy
-from cinder import test
-
-
-class RpcProxyTestCase(test.TestCase):
-
- def setUp(self):
- super(RpcProxyTestCase, self).setUp()
-
- def tearDown(self):
- super(RpcProxyTestCase, self).tearDown()
-
- def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
- server_params=None, supports_topic_override=True):
- topic = 'fake_topic'
- timeout = 123
- rpc_proxy = proxy.RpcProxy(topic, '1.0')
- ctxt = context.RequestContext('fake_user', 'fake_project')
- msg = {'method': 'fake_method', 'args': {'x': 'y'}}
- expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
- 'version': '1.0'}
-
- expected_retval = 'hi' if has_retval else None
-
- self.fake_args = None
- self.fake_kwargs = None
-
- def _fake_rpc_method(*args, **kwargs):
- self.fake_args = args
- self.fake_kwargs = kwargs
- if has_retval:
- return expected_retval
-
- self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
-
- args = [ctxt, msg]
- if server_params:
- args.insert(1, server_params)
-
- # Base method usage
- retval = getattr(rpc_proxy, rpc_method)(*args)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- # overriding the version
- retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
- self.assertEqual(retval, expected_retval)
- new_msg = copy.deepcopy(expected_msg)
- new_msg['version'] = '1.1'
- expected_args = [ctxt, topic, new_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- if has_timeout:
- # set a timeout
- retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg, timeout]
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- if supports_topic_override:
- # set a topic
- new_topic = 'foo.bar'
- retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, new_topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- def test_call(self):
- self._test_rpc_method('call', has_timeout=True, has_retval=True)
-
- def test_multicall(self):
- self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
-
- def test_cast(self):
- self._test_rpc_method('cast')
-
- def test_fanout_cast(self):
- self._test_rpc_method('fanout_cast', supports_topic_override=False)
-
- def test_cast_to_server(self):
- self._test_rpc_method('cast_to_server', server_params={'blah': 1})
-
- def test_fanout_cast_to_server(self):
- self._test_rpc_method('fanout_cast_to_server',
- server_params={'blah': 1}, supports_topic_override=False)
-
- def test_make_msg(self):
- self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
- {'method': 'test_method', 'args': {'a': 1, 'b': 2}})
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using qpid
-"""
-
-import mox
-
-from cinder import context
-from cinder import flags
-from cinder import log as logging
-from cinder.rpc import amqp as rpc_amqp
-from cinder import test
-
-try:
- import qpid
- from cinder.rpc import impl_qpid
-except ImportError:
- qpid = None
- impl_qpid = None
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class RpcQpidTestCase(test.TestCase):
- """
- Exercise the public API of impl_qpid utilizing mox.
-
- This set of tests utilizes mox to replace the Qpid objects and ensures
- that the right operations happen on them when the various public rpc API
- calls are exercised. The API calls tested here include:
-
- cinder.rpc.create_connection()
- cinder.rpc.common.Connection.create_consumer()
- cinder.rpc.common.Connection.close()
- cinder.rpc.cast()
- cinder.rpc.fanout_cast()
- cinder.rpc.call()
- cinder.rpc.multicall()
- """
-
- def setUp(self):
- super(RpcQpidTestCase, self).setUp()
-
- self.mock_connection = None
- self.mock_session = None
- self.mock_sender = None
- self.mock_receiver = None
-
- if qpid:
- self.orig_connection = qpid.messaging.Connection
- self.orig_session = qpid.messaging.Session
- self.orig_sender = qpid.messaging.Sender
- self.orig_receiver = qpid.messaging.Receiver
- qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
- qpid.messaging.Session = lambda *_x, **_y: self.mock_session
- qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
- qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
-
- def tearDown(self):
- if qpid:
- qpid.messaging.Connection = self.orig_connection
- qpid.messaging.Session = self.orig_session
- qpid.messaging.Sender = self.orig_sender
- qpid.messaging.Receiver = self.orig_receiver
- if impl_qpid:
- # Need to reset this in case we changed the connection_cls
- # in self._setup_to_server_tests()
- impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
-
- super(RpcQpidTestCase, self).tearDown()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_connection(self):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- self.mock_connection.close()
-
- self.mox.ReplayAll()
-
- connection = impl_qpid.create_connection(FLAGS)
- connection.close()
-
- def _test_create_consumer(self, fanout):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- if fanout:
- # The link name includes a UUID, so match it with a regex.
- expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
- '{"node": {"x-declare": {"auto-delete": true, "durable": '
- 'false, "type": "fanout"}, "type": "topic"}, "create": '
- '"always", "link": {"x-declare": {"auto-delete": true, '
- '"exclusive": true, "durable": false}, "durable": true, '
- '"name": "impl_qpid_test_fanout_.*"}}$')
- else:
- expected_address = (
- 'cinder/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": true}, "type": "topic"}, '
- '"create": "always", "link": {"x-declare": {"auto-delete": '
- 'true, "exclusive": false, "durable": false}, "durable": '
- 'true, "name": "impl_qpid_test"}}')
- self.mock_session.receiver(expected_address).AndReturn(
- self.mock_receiver)
- self.mock_receiver.capacity = 1
- self.mock_connection.close()
-
- self.mox.ReplayAll()
-
- connection = impl_qpid.create_connection(FLAGS)
- connection.create_consumer("impl_qpid_test",
- lambda *_x, **_y: None,
- fanout)
- connection.close()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_consumer(self):
- self._test_create_consumer(fanout=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_consumer_fanout(self):
- self._test_create_consumer(fanout=True)
-
- def _test_cast(self, fanout, server_params=None):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
-
- self.mock_connection.session().AndReturn(self.mock_session)
- if fanout:
- expected_address = ('impl_qpid_test_fanout ; '
- '{"node": {"x-declare": {"auto-delete": true, '
- '"durable": false, "type": "fanout"}, '
- '"type": "topic"}, "create": "always"}')
- else:
- expected_address = (
- 'cinder/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
- if not server_params:
- # This is a pooled connection, so instead of closing it, it
- # gets reset, which is just creating a new session on the
- # connection.
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
-
- self.mox.ReplayAll()
-
- try:
- ctx = context.RequestContext("user", "project")
-
- args = [FLAGS, ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}}]
-
- if server_params:
- args.insert(2, server_params)
- if fanout:
- method = impl_qpid.fanout_cast_to_server
- else:
- method = impl_qpid.cast_to_server
- else:
- if fanout:
- method = impl_qpid.fanout_cast
- else:
- method = impl_qpid.cast
-
- method(*args)
- finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_cast(self):
- self._test_cast(fanout=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_fanout_cast(self):
- self._test_cast(fanout=True)
-
- def _setup_to_server_tests(self, server_params):
- class MyConnection(impl_qpid.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.connection.username,
- server_params['username'])
- self.assertEqual(myself.connection.password,
- server_params['password'])
- self.assertEqual(myself.broker,
- server_params['hostname'] + ':' +
- str(server_params['port']))
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_qpid, 'Connection', MyConnection)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_cast_to_server(self):
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337}
- self._setup_to_server_tests(server_params)
- self._test_cast(fanout=False, server_params=server_params)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_fanout_cast_to_server(self):
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337}
- self._setup_to_server_tests(server_params)
- self._test_cast(fanout=True, server_params=server_params)
-
- def _test_call(self, multi):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
- ' true, "durable": true, "type": "direct"}, "type": '
- '"topic"}, "create": "always", "link": {"x-declare": '
- '{"auto-delete": true, "exclusive": true, "durable": '
- 'false}, "durable": true, "name": ".*"}}')
- self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
- self.mock_receiver.capacity = 1
- send_addr = ('cinder/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
-
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- if multi:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message(
- {"result": "bar", "failure": False,
- "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message(
- {"result": "baz", "failure": False,
- "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
-
- self.mox.ReplayAll()
-
- try:
- ctx = context.RequestContext("user", "project")
-
- if multi:
- method = impl_qpid.multicall
- else:
- method = impl_qpid.call
-
- res = method(FLAGS, ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}})
-
- if multi:
- self.assertEquals(list(res), ["foo", "bar", "baz"])
- else:
- self.assertEquals(res, "foo")
- finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_call(self):
- self._test_call(multi=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_multicall(self):
- self._test_call(multi=True)
-
-
-#
-#from cinder.tests.rpc import common
-#
-# Qpid does not have a handy in-memory transport like kombu, so it's not
-# terribly straight forward to take advantage of the common unit tests.
-# However, at least at the time of this writing, the common unit tests all pass
-# with qpidd running.
-#
-# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
-# def setUp(self):
-# self.rpc = impl_qpid
-# super(RpcQpidCommonTestCase, self).setUp()
-#
-# def tearDown(self):
-# super(RpcQpidCommonTestCase, self).tearDown()
-#
from cinder import context
from cinder import flags
-from cinder import rpc
+from cinder.openstack.common import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test
from cinder import exception
from cinder import flags
from cinder.notifier import api as notifier
-from cinder import rpc
+from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import common as rpc_common
from cinder.openstack.common import timeutils
-from cinder.rpc import common as rpc_common
from cinder.scheduler import driver
from cinder.scheduler import manager
from cinder import test
def mock_notify(cls, *args):
self.mock_notify = True
- self.stubs.Set(cinder.rpc, 'notify', mock_notify)
+ self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify('publisher_id', 'event_type',
cinder.notifier.api.WARN, dict(a=3))
def mock_notify(context, topic, msg):
self.test_topic = topic
- self.stubs.Set(cinder.rpc, 'notify', mock_notify)
+ self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify('publisher_id', 'event_type', 'DEBUG', dict(a=3))
self.assertEqual(self.test_topic, 'testnotify.debug')
def mock_notify(context, topic, data):
msgs.append(data)
- self.stubs.Set(cinder.rpc, 'notify', mock_notify)
+ self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
LOG.error('foo')
self.assertEqual(1, len(msgs))
msg = msgs[0]
from cinder import context
from cinder import db
+from cinder import exception
from cinder import flags
from cinder import quota
-from cinder import exception
-from cinder import rpc
+from cinder.openstack.common import rpc
from cinder import test
from cinder import volume
from cinder.scheduler import driver as scheduler_driver
"""Tests for the testing base code."""
-from cinder import rpc
+from cinder.openstack.common import rpc
from cinder import test
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import importutils
+from cinder.openstack.common import rpc
import cinder.policy
-from cinder import rpc
from cinder import test
import cinder.volume.api
from cinder import exception
from cinder import flags
from cinder import log as logging
+from cinder.openstack.common import rpc
import cinder.policy
from cinder.openstack.common import timeutils
from cinder import quota
-from cinder import rpc
from cinder import utils
from cinder.db import base
from cinder.openstack.common import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
+from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
-from cinder import rpc
from cinder import utils
from cinder.volume import volume_types
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,timeutils
+modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils
# The base module to hold the copy of openstack.common
base=cinder