From: Russell Bryant Date: Wed, 13 Jun 2012 14:48:54 +0000 (-0400) Subject: Use rpc from openstack-common. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=15f971de5bce79648f812209df0f82622a3711d0;p=openstack-build%2Fcinder-build.git Use rpc from openstack-common. Final patch for blueprint common-rpc. This patch removes cinder.rpc in favor of the copy in openstack-common. Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd --- diff --git a/bin/cinder-manage b/bin/cinder-manage index 93f3f862a..8e13305ce 100755 --- a/bin/cinder-manage +++ b/bin/cinder-manage @@ -84,8 +84,8 @@ from cinder import flags from cinder import log as logging from cinder.openstack.common import cfg from cinder.openstack.common import importutils +from cinder.openstack.common import rpc from cinder import quota -from cinder import rpc from cinder import utils from cinder import version from cinder.volume import volume_types diff --git a/bin/clear_rabbit_queues b/bin/clear_rabbit_queues index d1b0b4a01..257de6337 100755 --- a/bin/clear_rabbit_queues +++ b/bin/clear_rabbit_queues @@ -45,7 +45,7 @@ from cinder import exception from cinder import flags from cinder import log as logging from cinder.openstack.common import cfg -from cinder import rpc +from cinder.openstack.common import rpc delete_exchange_opt = \ diff --git a/cinder/manager.py b/cinder/manager.py index b7f2a0bde..670f56130 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -56,7 +56,7 @@ This module provides Manager, a base class for managers. from cinder.db import base from cinder import flags from cinder import log as logging -from cinder.rpc import dispatcher as rpc_dispatcher +from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import version diff --git a/cinder/notifier/rabbit_notifier.py b/cinder/notifier/rabbit_notifier.py index 0b2942a28..7dcaace4d 100644 --- a/cinder/notifier/rabbit_notifier.py +++ b/cinder/notifier/rabbit_notifier.py @@ -19,7 +19,7 @@ import cinder.context from cinder import flags from cinder import log as logging from cinder.openstack.common import cfg -from cinder import rpc +from cinder.openstack.common import rpc LOG = logging.getLogger(__name__) diff --git a/cinder/rpc/__init__.py b/cinder/openstack/common/rpc/__init__.py similarity index 77% rename from cinder/rpc/__init__.py rename to cinder/openstack/common/rpc/__init__.py index 07e4364b0..d34aae698 100644 --- a/cinder/rpc/__init__.py +++ b/cinder/openstack/common/rpc/__init__.py @@ -31,7 +31,7 @@ from cinder.openstack.common import importutils rpc_opts = [ cfg.StrOpt('rpc_backend', - default='cinder.rpc.impl_kombu', + default='%s.impl_kombu' % __package__, help="The messaging module to use, defaults to kombu."), cfg.IntOpt('rpc_thread_pool_size', default=64, @@ -42,17 +42,23 @@ rpc_opts = [ cfg.IntOpt('rpc_response_timeout', default=60, help='Seconds to wait for a response from call or multicall'), + cfg.IntOpt('rpc_cast_timeout', + default=30, + help='Seconds to wait before a cast expires (TTL). ' + 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['cinder.exception'], - help='Modules of exceptions that are permitted to be recreated' - 'upon receiving exception data from an rpc call.'), + default=['cinder.openstack.common.exception', + 'nova.exception', + ], + help='Modules of exceptions that are permitted to be recreated' + 'upon receiving exception data from an rpc call.'), cfg.StrOpt('control_exchange', - default='cinder', + default='nova', help='AMQP exchange to connect to if using RabbitMQ or Qpid'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), - ] +] cfg.CONF.register_opts(rpc_opts) @@ -61,14 +67,14 @@ def create_connection(new=True): """Create a connection to the message bus used for rpc. For some example usage of creating a connection and some consumers on that - connection, see cinder.service. + connection, see nova.service. :param new: Whether or not to create a new connection. A new connection will be created by default. If new is False, the implementation is free to return an existing connection from a pool. - :returns: An instance of cinder.rpc.common.Connection + :returns: An instance of openstack.common.rpc.common.Connection """ return _get_impl().create_connection(cfg.CONF, new=new) @@ -80,9 +86,9 @@ def call(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - cinder.rpc.common.Connection.create_consumer() - and only applies when the consumer was created - with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -90,8 +96,8 @@ def call(context, topic, msg, timeout=None): :returns: A dict from the remote method. - :raises: cinder.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ return _get_impl().call(cfg.CONF, context, topic, msg, timeout) @@ -103,9 +109,9 @@ def cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - cinder.rpc.common.Connection.create_consumer() - and only applies when the consumer was created - with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } @@ -124,9 +130,9 @@ def fanout_cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - cinder.rpc.common.Connection.create_consumer() - and only applies when the consumer was created - with fanout=True. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=True. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } @@ -146,9 +152,9 @@ def multicall(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - cinder.rpc.common.Connection.create_consumer() - and only applies when the consumer was created - with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -159,8 +165,8 @@ def multicall(context, topic, msg, timeout=None): returned and X is the Nth value that was returned by the remote method. - :raises: cinder.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) @@ -224,7 +230,20 @@ def fanout_cast_to_server(context, server_params, topic, msg): def queue_get_for(context, topic, host): - """Get a queue name for a given topic + host.""" + """Get a queue name for a given topic + host. + + This function only works if this naming convention is followed on the + consumer side, as well. For example, in nova, every instance of the + nova-foo service calls create_consumer() for two topics: + + foo + foo. + + Messages sent to the 'foo' topic are distributed to exactly one instance of + the nova-foo service. The services are chosen in a round-robin fashion. + Messages sent to the 'foo.' topic are sent to the nova-foo service on + . + """ return '%s.%s' % (topic, host) @@ -235,5 +254,11 @@ def _get_impl(): """Delay import of rpc_backend until configuration is loaded.""" global _RPCIMPL if _RPCIMPL is None: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + try: + _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + except ImportError: + # For backwards compatibility with older nova config. + impl = cfg.CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') + _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/cinder/rpc/amqp.py b/cinder/openstack/common/rpc/amqp.py similarity index 89% rename from cinder/rpc/amqp.py rename to cinder/openstack/common/rpc/amqp.py index c7cfb8afb..1f226971e 100644 --- a/cinder/rpc/amqp.py +++ b/cinder/openstack/common/rpc/amqp.py @@ -18,7 +18,7 @@ # under the License. """ -Shared code between AMQP based cinder.rpc implementations. +Shared code between AMQP based openstack.common.rpc implementations. The code in this module is shared between the rpc implemenations based on AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses @@ -35,8 +35,9 @@ from eventlet import pools from eventlet import semaphore from cinder.openstack.common import excutils +from cinder.openstack.common.gettextutils import _ from cinder.openstack.common import local -import cinder.rpc.common as rpc_common +from cinder.openstack.common.rpc import common as rpc_common LOG = logging.getLogger(__name__) @@ -74,13 +75,14 @@ def get_connection_pool(conf, connection_cls): class ConnectionContext(rpc_common.Connection): """The class that is actually returned to the caller of - create_connection(). This is a essentially a wrapper around - Connection that supports 'with' and can return a new Connection or - one from a pool. It will also catch when an instance of this class - is to be deleted so that we can return Connections to the pool on - exceptions and so forth without making the caller be responsible for - catching all exceptions and making sure to return a connection to - the pool. + create_connection(). This is essentially a wrapper around + Connection that supports 'with'. It can also return a new + Connection, or one from a pool. The function will also catch + when an instance of this class is to be deleted. With that + we can return Connections to the pool on exceptions and so + forth without making the caller be responsible for catching + them. If possible the function makes sure to return a + connection to the pool. """ def __init__(self, conf, connection_pool, pooled=True, server_params=None): @@ -91,8 +93,9 @@ class ConnectionContext(rpc_common.Connection): if pooled: self.connection = connection_pool.get() else: - self.connection = connection_pool.connection_cls(conf, - server_params=server_params) + self.connection = connection_pool.connection_cls( + conf, + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -131,6 +134,9 @@ class ConnectionContext(rpc_common.Connection): def create_consumer(self, topic, proxy, fanout=False): self.connection.create_consumer(topic, proxy, fanout) + def create_worker(self, topic, proxy, pool_name): + self.connection.create_worker(topic, proxy, pool_name) + def consume_in_thread(self): self.connection.consume_in_thread() @@ -157,8 +163,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, msg = {'result': reply, 'failure': failure} except TypeError: msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + for k, v in reply.__dict__.iteritems()), + 'failure': failure} if ending: msg['ending'] = True conn.direct_send(msg_id, msg) @@ -171,6 +177,12 @@ class RpcContext(rpc_common.CommonRpcContext): self.conf = kwargs.pop('conf') super(RpcContext, self).__init__(**kwargs) + def deepcopy(self): + values = self.to_dict() + values['conf'] = self.conf + values['msg_id'] = self.msg_id + return self.__class__(**values) + def reply(self, reply=None, failure=None, ending=False, connection_pool=None): if self.msg_id: @@ -278,8 +290,8 @@ class ProxyCallback(object): class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection - self._iterator = connection.iterconsume( - timeout=timeout or conf.rpc_response_timeout) + self._iterator = connection.iterconsume(timeout=timeout or + conf.rpc_response_timeout) self._result = None self._done = False self._got_ending = False @@ -298,7 +310,7 @@ class MulticallWaiter(object): if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, - failure) + failure) elif data.get('ending', False): self._got_ending = True @@ -379,16 +391,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.topic_send(topic, msg) def fanout_cast_to_server(conf, context, server_params, topic, msg, - connection_pool): + connection_pool): """Sends a message on a fanout exchange to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.fanout_send(topic, msg) diff --git a/cinder/rpc/common.py b/cinder/openstack/common/rpc/common.py similarity index 85% rename from cinder/rpc/common.py rename to cinder/openstack/common/rpc/common.py index 984a14226..5674490fc 100644 --- a/cinder/rpc/common.py +++ b/cinder/openstack/common/rpc/common.py @@ -23,6 +23,7 @@ import sys import traceback from cinder.openstack.common import cfg +from cinder.openstack.common.gettextutils import _ from cinder.openstack.common import importutils from cinder.openstack.common import jsonutils from cinder.openstack.common import local @@ -119,8 +120,8 @@ class Connection(object): :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same - topic. For example, all instances of cinder-compute - consume from a queue called "compute". In that case, the + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the messages will get distributed amongst the consumers in a round-robin fashion if fanout=False. If fanout=True, every consumer associated with this topic will get a @@ -132,6 +133,25 @@ class Connection(object): """ raise NotImplementedError() + def create_worker(self, conf, topic, proxy, pool_name): + """Create a worker on this connection. + + A worker is like a regular consumer of messages directed to a + topic, except that it is part of a set of such consumers (the + "pool") which may run in parallel. Every pool of workers will + receive a given message, but only one worker in the pool will + be asked to process it. Load is distributed across the members + of the pool in round-robin fashion. + + :param conf: An openstack.common.cfg configuration object. + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. + :param proxy: The object that will handle all incoming messages. + :param pool_name: String containing the name of the pool of workers + """ + raise NotImplementedError() + def consume_in_thread(self): """Spawn a thread to handle incoming messages. @@ -148,10 +168,8 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = { - 'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), - } + SANITIZE = {'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), } has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data @@ -236,7 +254,7 @@ def deserialize_remote_exception(conf, data): ex_type = type(failure) str_override = lambda self: message new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,), - {'__str__': str_override}) + {'__str__': str_override, '__unicode__': str_override}) try: # NOTE(ameade): Dynamically create a new exception type and swap it in # as the new type for the exception. This only works on user defined @@ -268,20 +286,22 @@ class CommonRpcContext(object): def from_dict(cls, values): return cls(**values) + def deepcopy(self): + return self.from_dict(self.to_dict()) + def update_store(self): local.store.context = self def elevated(self, read_deleted=None, overwrite=False): """Return a version of this context with admin flag set.""" - # TODO(russellb) This method is a bit of a cinder-ism. It makes + # TODO(russellb) This method is a bit of a nova-ism. It makes # some assumptions about the data in the request context sent # across rpc, while the rest of this class does not. We could get - # rid of this if we changed the cinder code that uses this to + # rid of this if we changed the nova code that uses this to # convert the RpcContext back to its native RequestContext doing - # something like - # cinder.context.RequestContext.from_dict(ctxt.to_dict()) + # something like nova.context.RequestContext.from_dict(ctxt.to_dict()) - context = copy.deepcopy(self) + context = self.deepcopy() context.values['is_admin'] = True context.values.setdefault('roles', []) diff --git a/cinder/rpc/dispatcher.py b/cinder/openstack/common/rpc/dispatcher.py similarity index 65% rename from cinder/rpc/dispatcher.py rename to cinder/openstack/common/rpc/dispatcher.py index 95c3ff859..9f8a9085e 100644 --- a/cinder/rpc/dispatcher.py +++ b/cinder/openstack/common/rpc/dispatcher.py @@ -40,9 +40,48 @@ The conversion over to a versioned API must be done on both the client side and server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. + + +EXAMPLES: + +Nova was the first project to use versioned rpc APIs. Consider the compute rpc +API as an example. The client side is in nova/compute/rpcapi.py and the server +side is in nova/compute/manager.py. + + +Example 1) Adding a new method. + +Adding a new method is a backwards compatible change. It should be added to +nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to +X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should +have a specific version specified to indicate the minimum API version that must +be implemented for the method to be supported. For example: + + def get_host_uptime(self, ctxt, host): + topic = _compute_topic(self.topic, ctxt, host, None) + return self.call(ctxt, self.make_msg('get_host_uptime'), topic, + version='1.1') + +In this case, version '1.1' is the first version that supported the +get_host_uptime() method. + + +Example 2) Adding a new parameter. + +Adding a new parameter to an rpc method can be made backwards compatible. The +RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. +The implementation of the method must not expect the parameter to be present. + + def some_remote_method(self, arg1, arg2, newarg=None): + # The code needs to deal with newarg=None for cases + # where an older client sends a message without it. + pass + +On the client side, the same changes should be made as in example 1. The +minimum version that supports the new parameter should be specified. """ -from cinder.rpc import common as rpc_common +from cinder.openstack.common.rpc import common as rpc_common class RpcDispatcher(object): @@ -92,14 +131,20 @@ class RpcDispatcher(object): if not version: version = '1.0' + had_compatible = False for proxyobj in self.callbacks: if hasattr(proxyobj, 'RPC_API_VERSION'): rpc_api_version = proxyobj.RPC_API_VERSION else: rpc_api_version = '1.0' + is_compatible = self._is_compatible(rpc_api_version, version) + had_compatible = had_compatible or is_compatible if not hasattr(proxyobj, method): continue - if self._is_compatible(rpc_api_version, version): + if is_compatible: return getattr(proxyobj, method)(ctxt, **kwargs) - raise rpc_common.UnsupportedRpcVersion(version=version) + if had_compatible: + raise AttributeError("No such RPC function '%s'" % method) + else: + raise rpc_common.UnsupportedRpcVersion(version=version) diff --git a/cinder/rpc/impl_fake.py b/cinder/openstack/common/rpc/impl_fake.py similarity index 91% rename from cinder/rpc/impl_fake.py rename to cinder/openstack/common/rpc/impl_fake.py index d8de6f749..a47b5b7e4 100644 --- a/cinder/rpc/impl_fake.py +++ b/cinder/openstack/common/rpc/impl_fake.py @@ -18,15 +18,12 @@ queues. Casts will block, but this is very useful for tests. """ import inspect -import json -import signal -import sys import time -import traceback import eventlet -from cinder.rpc import common as rpc_common +from cinder.openstack.common import jsonutils +from cinder.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -37,6 +34,13 @@ class RpcContext(rpc_common.CommonRpcContext): self._response = [] self._done = False + def deepcopy(self): + values = self.to_dict() + new_inst = self.__class__(**values) + new_inst._response = self._response + new_inst._done = self._done + return new_inst + def reply(self, reply=None, failure=None, ending=False): if ending: self._done = True @@ -117,7 +121,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - json.dumps(msg) + jsonutils.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): @@ -171,9 +175,10 @@ def fanout_cast(conf, context, topic, msg): if not method: return args = msg.get('args', {}) + version = msg.get('version', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, method, args, None) + consumer.call(context, version, method, args, None) except Exception: pass diff --git a/cinder/rpc/impl_kombu.py b/cinder/openstack/common/rpc/impl_kombu.py similarity index 77% rename from cinder/rpc/impl_kombu.py rename to cinder/openstack/common/rpc/impl_kombu.py index 71dfc12f4..9483f3b24 100644 --- a/cinder/rpc/impl_kombu.py +++ b/cinder/openstack/common/rpc/impl_kombu.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools import socket import ssl @@ -24,13 +25,14 @@ import uuid import eventlet import greenlet import kombu +import kombu.connection import kombu.entity import kombu.messaging -import kombu.connection from cinder.openstack.common import cfg -from cinder.rpc import amqp as rpc_amqp -from cinder.rpc import common as rpc_common +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common.rpc import amqp as rpc_amqp +from cinder.openstack.common.rpc import common as rpc_common kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -45,7 +47,7 @@ kombu_opts = [ cfg.StrOpt('kombu_ssl_ca_certs', default='', help=('SSL certification authority file ' - '(valid only if SSL enabled)')), + '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', help='the RabbitMQ host'), @@ -79,7 +81,7 @@ kombu_opts = [ default=False, help='use durable queues in RabbitMQ'), - ] +] cfg.CONF.register_opts(kombu_opts) @@ -170,55 +172,55 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__( - channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, channel, topic, callback, tag, **kwargs): + def __init__(self, conf, channel, topic, callback, tag, name=None, + **kwargs): """Init a 'topic' queue. - 'channel' is the amqp channel to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel + :param channel: the amqp channel to use + :param topic: the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param tag: a unique ID for the consumer on the channel + :param name: optional queue name, defaults to topic + :paramtype name: str - Other kombu options may be passed + Other kombu options may be passed as keyword arguments """ # Default options options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=conf.control_exchange, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__( - channel, - callback, - tag, - name=topic, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=conf.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) class FanoutConsumer(ConsumerBase): @@ -240,22 +242,17 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=exchange_name, - type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__( - channel, - callback, - tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) class Publisher(object): @@ -273,9 +270,10 @@ class Publisher(object): def reconnect(self, channel): """Re-establish the Producer after a rabbit reconnection""" self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) + **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, routing_key=self.routing_key) + channel=channel, + routing_key=self.routing_key) def send(self, msg): """Send a message""" @@ -291,14 +289,11 @@ class DirectPublisher(Publisher): """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(DirectPublisher, self).__init__(channel, - msg_id, - msg_id, - type='direct', - **options) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) class TopicPublisher(Publisher): @@ -309,14 +304,11 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - conf.control_exchange, - topic, - type='topic', - **options) + super(TopicPublisher, self).__init__(channel, conf.control_exchange, + topic, type='topic', **options) class FanoutPublisher(Publisher): @@ -327,14 +319,11 @@ class FanoutPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, - '%s_fanout' % topic, - None, - type='fanout', - **options) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) class NotifyPublisher(TopicPublisher): @@ -351,10 +340,10 @@ class NotifyPublisher(TopicPublisher): # we do this to ensure that messages don't get dropped if the # consumer is started after we do queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - name=self.routing_key, - routing_key=self.routing_key) + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) queue.declare() @@ -440,7 +429,7 @@ class Connection(object): """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() except self.connection_errors: @@ -448,8 +437,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection( - **self.params) + self.connection = kombu.connection.BrokerConnection(**self.params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -499,8 +487,8 @@ class Connection(object): if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -515,8 +503,8 @@ class Connection(object): log_info['sleep_time'] = sleep_time LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -566,11 +554,11 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.channel, topic, callback, - self.consumer_num.next()) + self.consumer_num.next()) self.consumers.append(consumer) return consumer @@ -584,11 +572,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) info['do_consume'] = True def _consume(): @@ -622,7 +610,7 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) @@ -632,14 +620,17 @@ class Connection(object): def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. - In cinder's use, this is generally a msg_id queue used for + In nova's use, this is generally a msg_id queue used for responses for call/multicall """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -683,61 +674,77 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: self.declare_fanout_consumer(topic, proxy_cb) else: self.declare_topic_consumer(topic, proxy_cb) + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.declare_topic_consumer(topic, proxy_cb, pool_name) + def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/cinder/rpc/impl_qpid.py b/cinder/openstack/common/rpc/impl_qpid.py similarity index 79% rename from cinder/rpc/impl_qpid.py rename to cinder/openstack/common/rpc/impl_qpid.py index 878ab9dd3..e7b6a0b86 100644 --- a/cinder/rpc/impl_qpid.py +++ b/cinder/openstack/common/rpc/impl_qpid.py @@ -15,8 +15,8 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools -import json import logging import time import uuid @@ -27,8 +27,10 @@ import qpid.messaging import qpid.messaging.exceptions from cinder.openstack.common import cfg -from cinder.rpc import amqp as rpc_amqp -from cinder.rpc import common as rpc_common +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common import jsonutils +from cinder.openstack.common.rpc import amqp as rpc_amqp +from cinder.openstack.common.rpc import common as rpc_common LOG = logging.getLogger(__name__) @@ -75,7 +77,7 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), - ] +] cfg.CONF.register_opts(qpid_opts) @@ -123,7 +125,7 @@ class ConsumerBase(object): addr_opts["node"]["x-declare"].update(node_opts) addr_opts["link"]["x-declare"].update(link_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -159,26 +161,29 @@ class DirectConsumer(ConsumerBase): """ super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback): + def __init__(self, conf, session, topic, callback, name=None): """Init a 'topic' queue. - 'session' is the amqp session to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received + :param session: the amqp session to use + :param topic: is the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param name: optional queue name, defaults to topic """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, topic), {}, - topic, {}) + "%s/%s" % (conf.control_exchange, + topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -192,11 +197,12 @@ class FanoutConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__(session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + super(FanoutConsumer, self).__init__( + session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) class Publisher(object): @@ -224,7 +230,7 @@ class Publisher(object): if node_opts: addr_opts["node"]["x-declare"].update(node_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -250,8 +256,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic)) + super(TopicPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic)) class FanoutPublisher(Publisher): @@ -259,8 +266,9 @@ class FanoutPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__(session, - "%s_fanout" % topic, {"type": "fanout"}) + super(FanoutPublisher, self).__init__( + session, + "%s_fanout" % topic, {"type": "fanout"}) class NotifyPublisher(Publisher): @@ -268,9 +276,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic), + {"durable": True}) class Connection(object): @@ -288,9 +297,9 @@ class Connection(object): server_params = {} default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) + port=self.conf.qpid_port, + username=self.conf.qpid_username, + password=self.conf.qpid_password) params = server_params for key in default_params.keys(): @@ -308,18 +317,18 @@ class Connection(object): self.connection.reconnect = self.conf.qpid_reconnect if self.conf.qpid_reconnect_timeout: self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) + self.conf.qpid_reconnect_timeout) if self.conf.qpid_reconnect_limit: self.connection.reconnect_limit = self.conf.qpid_reconnect_limit if self.conf.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) + self.conf.qpid_reconnect_interval_max) if self.conf.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) + self.conf.qpid_reconnect_interval_min) if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + self.conf.qpid_reconnect_interval) self.connection.hearbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay @@ -391,7 +400,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.session, topic, callback) @@ -406,11 +415,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid.messaging.exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) @@ -440,7 +449,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): publisher = cls(self.conf, self.session, topic) @@ -450,14 +459,17 @@ class Connection(object): def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. - In cinder's use, this is generally a msg_id queue used for + In nova's use, this is generally a msg_id queue used for responses for call/multicall """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -501,8 +513,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -513,53 +526,73 @@ class Connection(object): return consumer + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, + name=pool_name) + + self._register_consumer(consumer) + + return consumer + def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic, - msg, rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/cinder/openstack/common/rpc/impl_zmq.py b/cinder/openstack/common/rpc/impl_zmq.py new file mode 100644 index 000000000..ff56cbd93 --- /dev/null +++ b/cinder/openstack/common/rpc/impl_zmq.py @@ -0,0 +1,725 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pprint +import socket +import string +import sys +import types +import uuid + +import eventlet +from eventlet.green import zmq +import greenlet + +from cinder.openstack.common import cfg +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common import importutils +from cinder.openstack.common import jsonutils +from cinder.openstack.common.rpc import common as rpc_common + + +# for convenience, are not modified. +pformat = pprint.pformat +Timeout = eventlet.timeout.Timeout +LOG = rpc_common.LOG +RemoteError = rpc_common.RemoteError +RPCException = rpc_common.RPCException + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), + + # The module.Class to use for matchmaking. + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('cinder.openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), + + # The following port is unassigned by IANA as of 2012-05-21 + cfg.IntOpt('rpc_zmq_port', default=9501, + help='ZeroMQ receiver listening port'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + help='Number of ZeroMQ contexts, defaults to 1'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + help='Directory for holding IPC sockets'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.') +] + + +# These globals are defined in register_opts(conf), +# a mandatory initialization call +FLAGS = None +ZMQ_CTX = None # ZeroMQ Context, must be global. +matchmaker = None # memoized matchmaker object + + +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(jsonutils.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return jsonutils.loads(data) + + +class ZmqSocket(object): + """ + A tiny wrapper around ZeroMQ to simplify the send/recv protocol + and connection management. + + Can be used as a Context (supports the 'with' statement). + """ + + def __init__(self, addr, zmq_type, bind=True, subscribe=None): + self.sock = ZMQ_CTX.socket(zmq_type) + self.addr = addr + self.type = zmq_type + self.subscriptions = [] + + # Support failures on sending/receiving on wrong socket type. + self.can_recv = zmq_type in (zmq.PULL, zmq.SUB) + self.can_send = zmq_type in (zmq.PUSH, zmq.PUB) + self.can_sub = zmq_type in (zmq.SUB, ) + + # Support list, str, & None for subscribe arg (cast to list) + do_sub = { + list: subscribe, + str: [subscribe], + type(None): [] + }[type(subscribe)] + + for f in do_sub: + self.subscribe(f) + + str_data = {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind} + + LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) + LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) + LOG.debug(_("-> bind: %(bind)s"), str_data) + + try: + if bind: + self.sock.bind(addr) + else: + self.sock.connect(addr) + except Exception: + raise RPCException(_("Could not open socket.")) + + def socket_s(self): + """Get socket type as string.""" + t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER', + 'DEALER') + return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type] + + def subscribe(self, msg_filter): + """Subscribe.""" + if not self.can_sub: + raise RPCException("Cannot subscribe on this socket.") + LOG.debug(_("Subscribing to %s"), msg_filter) + + try: + self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) + except Exception: + return + + self.subscriptions.append(msg_filter) + + def unsubscribe(self, msg_filter): + """Unsubscribe.""" + if msg_filter not in self.subscriptions: + return + self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) + self.subscriptions.remove(msg_filter) + + def close(self): + if self.sock is None or self.sock.closed: + return + + # We must unsubscribe, or we'll leak descriptors. + if len(self.subscriptions) > 0: + for f in self.subscriptions: + try: + self.sock.setsockopt(zmq.UNSUBSCRIBE, f) + except Exception: + pass + self.subscriptions = [] + + # Linger -1 prevents lost/dropped messages + try: + self.sock.close(linger=-1) + except Exception: + pass + self.sock = None + + def recv(self): + if not self.can_recv: + raise RPCException(_("You cannot recv on this socket.")) + return self.sock.recv_multipart() + + def send(self, data): + if not self.can_send: + raise RPCException(_("You cannot send on this socket.")) + self.sock.send_multipart(data) + + +class ZmqClient(object): + """Client for ZMQ sockets.""" + + def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + self.outq = ZmqSocket(addr, socket_type, bind=bind) + + def cast(self, msg_id, topic, data): + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) + + def close(self): + self.outq.close() + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.replies = [] + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['replies'] = self.replies + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False): + if ending: + return + self.replies.append(reply) + + @classmethod + def marshal(self, ctx): + ctx_data = ctx.to_dict() + return _serialize(ctx_data) + + @classmethod + def unmarshal(self, data): + return RpcContext.from_dict(_deserialize(data)) + + +class InternalContext(object): + """Used by ConsumerBase as a private context for - methods.""" + + def __init__(self, proxy): + self.proxy = proxy + self.msg_waiter = None + + def _get_response(self, ctx, proxy, topic, data): + """Process a curried message and cast the result to topic.""" + LOG.debug(_("Running func with context: %s"), ctx.to_dict()) + data.setdefault('version', None) + data.setdefault('args', []) + + try: + result = proxy.dispatch( + ctx, data['version'], data['method'], **data['args']) + return ConsumerBase.normalize_reply(result, ctx.replies) + except greenlet.GreenletExit: + # ignore these since they are just from shutdowns + pass + except Exception: + return {'exc': + rpc_common.serialize_remote_exception(sys.exc_info())} + + def reply(self, ctx, proxy, + msg_id=None, context=None, topic=None, msg=None): + """Reply to a casted call.""" + # Our real method is curried into msg['args'] + + child_ctx = RpcContext.unmarshal(msg[0]) + response = ConsumerBase.normalize_reply( + self._get_response(child_ctx, proxy, topic, msg[1]), + ctx.replies) + + LOG.debug(_("Sending reply")) + cast(FLAGS, ctx, topic, { + 'method': '-process_reply', + 'args': { + 'msg_id': msg_id, + 'response': response + } + }) + + +class ConsumerBase(object): + """Base Consumer.""" + + def __init__(self): + self.private_ctx = InternalContext(None) + + @classmethod + def normalize_reply(self, result, replies): + #TODO(ewindisch): re-evaluate and document this method. + if isinstance(result, types.GeneratorType): + return list(result) + elif replies: + return replies + else: + return [result] + + def process(self, style, target, proxy, ctx, data): + # Method starting with - are + # processed internally. (non-valid method name) + method = data['method'] + + # Internal method + # uses internal context for safety. + if data['method'][0] == '-': + # For reply / process_reply + method = method[1:] + if method == 'reply': + self.private_ctx.reply(ctx, proxy, **data['args']) + return + + data.setdefault('version', None) + data.setdefault('args', []) + proxy.dispatch(ctx, data['version'], + data['method'], **data['args']) + + +class ZmqBaseReactor(ConsumerBase): + """ + A consumer class implementing a + centralized casting broker (PULL-PUSH) + for RoundRobin requests. + """ + + def __init__(self, conf): + super(ZmqBaseReactor, self).__init__() + + self.conf = conf + self.mapping = {} + self.proxies = {} + self.threads = [] + self.sockets = [] + self.subscribe = {} + + self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) + + def register(self, proxy, in_addr, zmq_type_in, out_addr=None, + zmq_type_out=None, in_bind=True, out_bind=True, + subscribe=None): + + LOG.info(_("Registering reactor")) + + if zmq_type_in not in (zmq.PULL, zmq.SUB): + raise RPCException("Bad input socktype") + + # Items push in. + inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, + subscribe=subscribe) + + self.proxies[inq] = proxy + self.sockets.append(inq) + + LOG.info(_("In reactor registered")) + + if not out_addr: + return + + if zmq_type_out not in (zmq.PUSH, zmq.PUB): + raise RPCException("Bad output socktype") + + # Items push out. + outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) + + self.mapping[inq] = outq + self.mapping[outq] = inq + self.sockets.append(outq) + + LOG.info(_("Out reactor registered")) + + def consume_in_thread(self): + def _consume(sock): + LOG.info(_("Consuming socket")) + while True: + self.consume(sock) + + for k in self.proxies.keys(): + self.threads.append( + self.pool.spawn(_consume, k) + ) + + def wait(self): + for t in self.threads: + t.wait() + + def close(self): + for s in self.sockets: + s.close() + + for t in self.threads: + t.kill() + + +class ZmqProxy(ZmqBaseReactor): + """ + A consumer class implementing a + topic-based proxy, forwarding to + IPC sockets. + """ + + def __init__(self, conf): + super(ZmqProxy, self).__init__(conf) + + self.topic_proxy = {} + ipc_dir = conf.rpc_zmq_ipc_dir + + self.topic_proxy['zmq_replies'] = \ + ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), + zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['zmq_replies']) + + def consume(self, sock): + ipc_dir = self.conf.rpc_zmq_ipc_dir + + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + msg_id, topic, style, in_msg = data + topic = topic.split('.', 1)[0] + + LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + + # Handle zmq_replies magic + if topic.startswith('fanout~'): + sock_type = zmq.PUB + elif topic.startswith('zmq_replies'): + sock_type = zmq.PUB + inside = _deserialize(in_msg) + msg_id = inside[-1]['args']['msg_id'] + response = inside[-1]['args']['response'] + LOG.debug(_("->response->%s"), response) + data = [str(msg_id), _serialize(response)] + else: + sock_type = zmq.PUSH + + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) + self.topic_proxy[topic].send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + + +class ZmqReactor(ZmqBaseReactor): + """ + A consumer class implementing a + consumer for messages. Can also be + used as a 1:1 proxy + """ + + def __init__(self, conf): + super(ZmqReactor, self).__init__(conf) + + def consume(self, sock): + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) + if sock in self.mapping: + LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { + 'data': data}) + self.mapping[sock].send(data) + return + + msg_id, topic, style, in_msg = data + + ctx, request = _deserialize(in_msg) + ctx = RpcContext.unmarshal(ctx) + + proxy = self.proxies[sock] + + self.pool.spawn_n(self.process, style, topic, + proxy, ctx, request) + + +class Connection(rpc_common.Connection): + """Manages connections and threads.""" + + def __init__(self, conf): + self.conf = conf + self.reactor = ZmqReactor(conf) + + def create_consumer(self, topic, proxy, fanout=False): + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + LOG.info(_("Create Consumer for topic (%(topic)s)") % + {'topic': topic}) + + # Subscription scenarios + if fanout: + subscribe = ('', fanout)[type(fanout) == str] + sock_type = zmq.SUB + topic = 'fanout~' + topic + else: + sock_type = zmq.PULL + subscribe = None + + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (self.conf.rpc_zmq_ipc_dir, topic) + + LOG.debug(_("Consumer is a zmq.%s"), + ['PULL', 'SUB'][sock_type == zmq.SUB]) + + self.reactor.register(proxy, inaddr, sock_type, + subscribe=subscribe, in_bind=False) + + def close(self): + self.reactor.close() + + def wait(self): + self.reactor.wait() + + def consume_in_thread(self): + self.reactor.consume_in_thread() + + +def _cast(addr, context, msg_id, topic, msg, timeout=None): + timeout_cast = timeout or FLAGS.rpc_cast_timeout + payload = [RpcContext.marshal(context), msg] + + with Timeout(timeout_cast, exception=rpc_common.Timeout): + try: + conn = ZmqClient(addr) + + # assumes cast can't return an exception + conn.cast(msg_id, topic, payload) + except zmq.ZMQError: + raise RPCException("Cast failed. ZMQ Socket Exception") + finally: + if 'conn' in vars(): + conn.close() + + +def _call(addr, context, msg_id, topic, msg, timeout=None): + # timeout_response is how long we wait for a response + timeout = timeout or FLAGS.rpc_response_timeout + + # The msg_id is used to track replies. + msg_id = str(uuid.uuid4().hex) + + # Replies always come into the reply service. + reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host + + LOG.debug(_("Creating payload")) + # Curry the original request into a reply method. + mcontext = RpcContext.marshal(context) + payload = { + 'method': '-reply', + 'args': { + 'msg_id': msg_id, + 'context': mcontext, + 'topic': reply_topic, + 'msg': [mcontext, msg] + } + } + + LOG.debug(_("Creating queue socket for reply waiter")) + + # Messages arriving async. + # TODO(ewindisch): have reply consumer with dynamic subscription mgmt + with Timeout(timeout, exception=rpc_common.Timeout): + try: + msg_waiter = ZmqSocket( + "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, + zmq.SUB, subscribe=msg_id, bind=False + ) + + LOG.debug(_("Sending cast")) + _cast(addr, context, msg_id, topic, payload) + + LOG.debug(_("Cast sent; Waiting reply")) + # Blocks until receives reply + msg = msg_waiter.recv() + LOG.debug(_("Received message: %s"), msg) + LOG.debug(_("Unpacking response")) + responses = _deserialize(msg[-1]) + # ZMQError trumps the Timeout error. + except zmq.ZMQError: + raise RPCException("ZMQ Socket Error") + finally: + if 'msg_waiter' in vars(): + msg_waiter.close() + + # It seems we don't need to do all of the following, + # but perhaps it would be useful for multicall? + # One effect of this is that we're checking all + # responses for Exceptions. + for resp in responses: + if isinstance(resp, types.DictType) and 'exc' in resp: + raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) + + return responses[-1] + + +def _multi_send(method, context, topic, msg, timeout=None): + """ + Wraps the sending of messages, + dispatches to the matchmaker and sends + message to all relevant hosts. + """ + conf = FLAGS + LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) + + queues = matchmaker.queues(topic) + LOG.debug(_("Sending message(s) to: %s"), queues) + + # Don't stack if we have no matchmaker results + if len(queues) == 0: + LOG.warn(_("No matchmaker results. Not casting.")) + # While not strictly a timeout, callers know how to handle + # this exception and a timeout isn't too big a lie. + raise rpc_common.Timeout, "No match from matchmaker." + + # This supports brokerless fanout (addresses > 1) + for queue in queues: + (_topic, ip_addr) = queue + _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) + + if method.__name__ == '_cast': + eventlet.spawn_n(method, _addr, context, + _topic, _topic, msg, timeout) + return + return method(_addr, context, _topic, _topic, msg, timeout) + + +def create_connection(conf, new=True): + return Connection(conf) + + +def multicall(conf, *args, **kwargs): + """Multiple calls.""" + register_opts(conf) + return _multi_send(_call, *args, **kwargs) + + +def call(conf, *args, **kwargs): + """Send a message, expect a response.""" + register_opts(conf) + data = _multi_send(_call, *args, **kwargs) + return data[-1] + + +def cast(conf, *args, **kwargs): + """Send a message expecting no reply.""" + register_opts(conf) + _multi_send(_cast, *args, **kwargs) + + +def fanout_cast(conf, context, topic, msg, **kwargs): + """Send a message to all listening and expect no reply.""" + register_opts(conf) + # NOTE(ewindisch): fanout~ is used because it avoid splitting on . + # and acts as a non-subtle hint to the matchmaker and ZmqProxy. + _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) + + +def notify(conf, context, topic, msg, **kwargs): + """ + Send notification event. + Notifications are sent to topic-priority. + This differs from the AMQP drivers which send to topic.priority. + """ + register_opts(conf) + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + topic.replace('.', '-') + cast(conf, context, topic, msg, **kwargs) + + +def cleanup(): + """Clean up resources in use by implementation.""" + global ZMQ_CTX + global matchmaker + matchmaker = None + ZMQ_CTX.destroy() + ZMQ_CTX = None + + +def register_opts(conf): + """Registration of options for this driver.""" + #NOTE(ewindisch): ZMQ_CTX and matchmaker + # are initialized here as this is as good + # an initialization method as any. + + # We memoize through these globals + global ZMQ_CTX + global matchmaker + global FLAGS + + if not FLAGS: + conf.register_opts(zmq_opts) + FLAGS = conf + # Don't re-set, if this method is called twice. + if not ZMQ_CTX: + ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + if not matchmaker: + # rpc_zmq_matchmaker should be set to a 'module.Class' + mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_module = '.'.join(mm_path[:-1]) + mm_class = mm_path[-1] + + # Only initialize a class. + if mm_path[-1][0] not in string.ascii_uppercase: + LOG.error(_("Matchmaker could not be loaded.\n" + "rpc_zmq_matchmaker is not a class.")) + raise RPCException(_("Error loading Matchmaker.")) + + mm_impl = importutils.import_module(mm_module) + mm_constructor = getattr(mm_impl, mm_class) + matchmaker = mm_constructor() + + +register_opts(cfg.CONF) diff --git a/cinder/openstack/common/rpc/matchmaker.py b/cinder/openstack/common/rpc/matchmaker.py new file mode 100644 index 000000000..2c0aac5bb --- /dev/null +++ b/cinder/openstack/common/rpc/matchmaker.py @@ -0,0 +1,258 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import contextlib +import itertools +import json +import logging + +from cinder.openstack.common import cfg +from cinder.openstack.common.gettextutils import _ + + +matchmaker_opts = [ + # Matchmaker ring file + cfg.StrOpt('matchmaker_ringfile', + default='/etc/nova/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts) +LOG = logging.getLogger(__name__) +contextmanager = contextlib.contextmanager + + +class MatchMakerException(Exception): + """Signified a match could not be found.""" + message = _("Match not found by MatchMaker.") + + +class Exchange(object): + """ + Implements lookups. + Subclass this to support hashtables, dns, etc. + """ + def __init__(self): + pass + + def run(self, key): + raise NotImplementedError() + + +class Binding(object): + """ + A binding on which to perform a lookup. + """ + def __init__(self): + pass + + def test(self, key): + raise NotImplementedError() + + +class MatchMakerBase(object): + """Match Maker Base Class.""" + + def __init__(self): + # Array of tuples. Index [2] toggles negation, [3] is last-if-true + self.bindings = [] + + def add_binding(self, binding, rule, last=True): + self.bindings.append((binding, rule, False, last)) + + #NOTE(ewindisch): kept the following method in case we implement the + # underlying support. + #def add_negate_binding(self, binding, rule, last=True): + # self.bindings.append((binding, rule, True, last)) + + def queues(self, key): + workers = [] + + # bit is for negate bindings - if we choose to implement it. + # last stops processing rules if this matches. + for (binding, exchange, bit, last) in self.bindings: + if binding.test(key): + workers.extend(exchange.run(key)) + + # Support last. + if last: + return workers + return workers + + +class DirectBinding(Binding): + """ + Specifies a host in the key via a '.' character + Although dots are used in the key, the behavior here is + that it maps directly to a host, thus direct. + """ + def test(self, key): + if '.' in key: + return True + return False + + +class TopicBinding(Binding): + """ + Where a 'bare' key without dots. + AMQP generally considers topic exchanges to be those *with* dots, + but we deviate here in terminology as the behavior here matches + that of a topic exchange (whereas where there are dots, behavior + matches that of a direct exchange. + """ + def test(self, key): + if '.' not in key: + return True + return False + + +class FanoutBinding(Binding): + """Match on fanout keys, where key starts with 'fanout.' string.""" + def test(self, key): + if key.startswith('fanout~'): + return True + return False + + +class StubExchange(Exchange): + """Exchange that does nothing.""" + def run(self, key): + return [(key, None)] + + +class RingExchange(Exchange): + """ + Match Maker where hosts are loaded from a static file containing + a hashmap (JSON formatted). + + __init__ takes optional ring dictionary argument, otherwise + loads the ringfile from CONF.mathcmaker_ringfile. + """ + def __init__(self, ring=None): + super(RingExchange, self).__init__() + + if ring: + self.ring = ring + else: + fh = open(CONF.matchmaker_ringfile, 'r') + self.ring = json.load(fh) + fh.close() + + self.ring0 = {} + for k in self.ring.keys(): + self.ring0[k] = itertools.cycle(self.ring[k]) + + def _ring_has(self, key): + if key in self.ring0: + return True + return False + + +class RoundRobinRingExchange(RingExchange): + """A Topic Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(RoundRobinRingExchange, self).__init__(ring) + + def run(self, key): + if not self._ring_has(key): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) + ) + return [] + host = next(self.ring0[key]) + return [(key + '.' + host, host)] + + +class FanoutRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(FanoutRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "fanout~", strip it for lookup. + nkey = key.split('fanout~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + +class LocalhostExchange(Exchange): + """Exchange where all direct topics are local.""" + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + return [(key.split('.')[0] + '.localhost', 'localhost')] + + +class DirectExchange(Exchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + b, e = key.split('.', 1) + return [(b, e)] + + +class MatchMakerRing(MatchMakerBase): + """ + Match Maker where hosts are loaded from a static hashmap. + """ + def __init__(self, ring=None): + super(MatchMakerRing, self).__init__() + self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) + + +class MatchMakerLocalhost(MatchMakerBase): + """ + Match Maker where all bare topics resolve to localhost. + Useful for testing. + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), LocalhostExchange()) + + +class MatchMakerStub(MatchMakerBase): + """ + Match Maker where topics are untouched. + Useful for testing, or for AMQP/brokered queues. + Will not work where knowledge of hosts is known (i.e. zeromq) + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + + self.add_binding(FanoutBinding(), StubExchange()) + self.add_binding(DirectBinding(), StubExchange()) + self.add_binding(TopicBinding(), StubExchange()) diff --git a/cinder/rpc/proxy.py b/cinder/openstack/common/rpc/proxy.py similarity index 98% rename from cinder/rpc/proxy.py rename to cinder/openstack/common/rpc/proxy.py index c9f8fa62b..179f4ccd5 100644 --- a/cinder/rpc/proxy.py +++ b/cinder/openstack/common/rpc/proxy.py @@ -22,7 +22,7 @@ For more information about rpc API version numbers, see: """ -from cinder import rpc +from cinder.openstack.common import rpc class RpcProxy(object): @@ -127,7 +127,7 @@ class RpcProxy(object): rpc.fanout_cast(context, self.topic, msg) def cast_to_server(self, context, server_params, msg, topic=None, - version=None): + version=None): """rpc.cast_to_server() a remote method. :param context: The request context diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index b844e2801..ebf465e39 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -26,9 +26,8 @@ from cinder import flags from cinder import log as logging from cinder.openstack.common import cfg from cinder.openstack.common import importutils +from cinder.openstack.common import rpc from cinder.openstack.common import timeutils -from cinder import rpc -from cinder.rpc import common as rpc_common from cinder import utils diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 32fae184a..325edcd33 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -19,13 +19,13 @@ Client side of the scheduler manager RPC API. """ from cinder import flags -import cinder.rpc.proxy +import cinder.openstack.common.rpc.proxy FLAGS = flags.FLAGS -class SchedulerAPI(cinder.rpc.proxy.RpcProxy): +class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy): '''Client side of the scheduler rpc API. API version history: diff --git a/cinder/service.py b/cinder/service.py index ded8bdc2c..0320e7c8f 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -34,7 +34,7 @@ from cinder import flags from cinder import log as logging from cinder.openstack.common import cfg from cinder.openstack.common import importutils -from cinder import rpc +from cinder.openstack.common import rpc from cinder import utils from cinder import version from cinder import wsgi diff --git a/cinder/tests/fake_flags.py b/cinder/tests/fake_flags.py index c4abf41cf..5d0d1f864 100644 --- a/cinder/tests/fake_flags.py +++ b/cinder/tests/fake_flags.py @@ -29,7 +29,7 @@ def set_defaults(conf): conf.set_default('volume_driver', 'cinder.volume.driver.FakeISCSIDriver') conf.set_default('connection_type', 'fake') conf.set_default('fake_rabbit', True) - conf.set_default('rpc_backend', 'cinder.rpc.impl_fake') + conf.set_default('rpc_backend', 'cinder.openstack.common.rpc.impl_fake') conf.set_default('iscsi_num_targets', 8) conf.set_default('verbose', True) conf.set_default('sql_connection', "sqlite://") diff --git a/cinder/tests/rpc/__init__.py b/cinder/tests/rpc/__init__.py deleted file mode 100644 index 3be5ce944..000000000 --- a/cinder/tests/rpc/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work -from cinder.tests import * diff --git a/cinder/tests/rpc/common.py b/cinder/tests/rpc/common.py deleted file mode 100644 index da7b946ff..000000000 --- a/cinder/tests/rpc/common.py +++ /dev/null @@ -1,270 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls shared between all implementations -""" - -import time - -from eventlet import greenthread -import nose - -from cinder import context -from cinder import exception -from cinder import flags -from cinder import log as logging -from cinder.rpc import amqp as rpc_amqp -from cinder.rpc import common as rpc_common -from cinder.rpc import dispatcher as rpc_dispatcher -from cinder import test - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class BaseRpcTestCase(test.TestCase): - def setUp(self, supports_timeouts=True): - super(BaseRpcTestCase, self).setUp() - self.supports_timeouts = supports_timeouts - self.context = context.get_admin_context() - if self.rpc: - self.conn = self.rpc.create_connection(FLAGS, True) - receiver = TestReceiver() - self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver]) - self.conn.create_consumer('test', self.dispatcher, False) - self.conn.consume_in_thread() - - def tearDown(self): - if self.rpc: - self.conn.close() - super(BaseRpcTestCase, self).tearDown() - - def test_call_succeed(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', - {"method": "echo", "args": {"value": value}}) - self.assertEqual(value, result) - - def test_call_succeed_despite_multiple_returns_yield(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', - {"method": "echo_three_times_yield", - "args": {"value": value}}) - self.assertEqual(value + 2, result) - - def test_multicall_succeed_once(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - 'test', - {"method": "echo", - "args": {"value": value}}) - for i, x in enumerate(result): - if i > 0: - self.fail('should only receive one response') - self.assertEqual(value + i, x) - - def test_multicall_three_nones(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - 'test', - {"method": "multicall_three_nones", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(x, None) - # i should have been 0, 1, and finally 2: - self.assertEqual(i, 2) - - def test_multicall_succeed_three_times_yield(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - 'test', - {"method": "echo_three_times_yield", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(value + i, x) - - def test_context_passed(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Makes sure a context is passed through rpc call.""" - value = 42 - result = self.rpc.call(FLAGS, self.context, - 'test', {"method": "context", - "args": {"value": value}}) - self.assertEqual(self.context.to_dict(), result) - - def test_nested_calls(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Test that we can do an rpc.call inside another call.""" - class Nested(object): - @staticmethod - def echo(context, queue, value): - """Calls echo in the passed queue""" - LOG.debug(_("Nested received %(queue)s, %(value)s") - % locals()) - # TODO(comstud): - # so, it will replay the context and use the same REQID? - # that's bizarre. - ret = self.rpc.call(FLAGS, context, - queue, - {"method": "echo", - "args": {"value": value}}) - LOG.debug(_("Nested return %s"), ret) - return value - - nested = Nested() - dispatcher = rpc_dispatcher.RpcDispatcher([nested]) - conn = self.rpc.create_connection(FLAGS, True) - conn.create_consumer('nested', dispatcher, False) - conn.consume_in_thread() - value = 42 - result = self.rpc.call(FLAGS, self.context, - 'nested', {"method": "echo", - "args": {"queue": "test", - "value": value}}) - conn.close() - self.assertEqual(value, result) - - def test_call_timeout(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Make sure rpc.call will time out""" - if not self.supports_timeouts: - raise nose.SkipTest(_("RPC backend does not support timeouts")) - - value = 42 - self.assertRaises(rpc_common.Timeout, - self.rpc.call, - FLAGS, self.context, - 'test', - {"method": "block", - "args": {"value": value}}, timeout=1) - try: - self.rpc.call(FLAGS, self.context, - 'test', - {"method": "block", - "args": {"value": value}}, - timeout=1) - self.fail("should have thrown Timeout") - except rpc_common.Timeout as exc: - pass - - -class BaseRpcAMQPTestCase(BaseRpcTestCase): - """Base test class for all AMQP-based RPC tests""" - def test_proxycallback_handles_exceptions(self): - """Make sure exceptions unpacking messages don't cause hangs.""" - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - orig_unpack = rpc_amqp.unpack_context - - info = {'unpacked': False} - - def fake_unpack_context(*args, **kwargs): - info['unpacked'] = True - raise test.TestingException('moo') - - self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context) - - value = 41 - self.rpc.cast(FLAGS, self.context, 'test', - {"method": "echo", "args": {"value": value}}) - - # Wait for the cast to complete. - for x in xrange(50): - if info['unpacked']: - break - greenthread.sleep(0.1) - else: - self.fail("Timeout waiting for message to be consued") - - # Now see if we get a response even though we raised an - # exception for the cast above. - self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack) - - value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', - {"method": "echo", - "args": {"value": value}}) - self.assertEqual(value, result) - - -class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call. - - Uses static methods because we aren't actually storing any state. - - """ - @staticmethod - def echo(context, value): - """Simply returns whatever value is sent in.""" - LOG.debug(_("Received %s"), value) - return value - - @staticmethod - def context(context, value): - """Returns dictionary version of context.""" - LOG.debug(_("Received %s"), context) - return context.to_dict() - - @staticmethod - def multicall_three_nones(context, value): - yield None - yield None - yield None - - @staticmethod - def echo_three_times_yield(context, value): - yield value - yield value + 1 - yield value + 2 - - @staticmethod - def fail(context, value): - """Raises an exception with the value sent in.""" - raise NotImplementedError(value) - - @staticmethod - def fail_converted(context, value): - """Raises an exception with the value sent in.""" - raise exception.ConvertedException(explanation=value) - - @staticmethod - def block(context, value): - time.sleep(2) diff --git a/cinder/tests/rpc/test_common.py b/cinder/tests/rpc/test_common.py deleted file mode 100644 index 5fd257a6b..000000000 --- a/cinder/tests/rpc/test_common.py +++ /dev/null @@ -1,147 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 OpenStack, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for 'common' functons used through rpc code. -""" - -import json -import sys - -from cinder import context -from cinder import exception -from cinder import flags -from cinder import log as logging -from cinder import test -from cinder.rpc import amqp as rpc_amqp -from cinder.rpc import common as rpc_common -from cinder.tests.rpc import common - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -def raise_exception(): - raise Exception("test") - - -class FakeUserDefinedException(Exception): - def __init__(self): - Exception.__init__(self, "Test Message") - - -class RpcCommonTestCase(test.TestCase): - def test_serialize_remote_exception(self): - expected = { - 'class': 'Exception', - 'module': 'exceptions', - 'message': 'test', - } - - try: - raise_exception() - except Exception as exc: - failure = rpc_common.serialize_remote_exception(sys.exc_info()) - - failure = json.loads(failure) - #assure the traceback was added - self.assertEqual(expected['class'], failure['class']) - self.assertEqual(expected['module'], failure['module']) - self.assertEqual(expected['message'], failure['message']) - - def test_serialize_remote_cinder_exception(self): - def raise_cinder_exception(): - raise exception.CinderException("test", code=500) - - expected = { - 'class': 'CinderException', - 'module': 'cinder.exception', - 'kwargs': {'code': 500}, - 'message': 'test' - } - - try: - raise_cinder_exception() - except Exception as exc: - failure = rpc_common.serialize_remote_exception(sys.exc_info()) - - failure = json.loads(failure) - #assure the traceback was added - self.assertEqual(expected['class'], failure['class']) - self.assertEqual(expected['module'], failure['module']) - self.assertEqual(expected['kwargs'], failure['kwargs']) - self.assertEqual(expected['message'], failure['message']) - - def test_deserialize_remote_exception(self): - failure = { - 'class': 'CinderException', - 'module': 'cinder.exception', - 'message': 'test message', - 'tb': ['raise CinderException'], - } - serialized = json.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, exception.CinderException)) - self.assertTrue('test message' in unicode(after_exc)) - #assure the traceback was added - self.assertTrue('raise CinderException' in unicode(after_exc)) - - def test_deserialize_remote_exception_bad_module(self): - failure = { - 'class': 'popen2', - 'module': 'os', - 'kwargs': {'cmd': '/bin/echo failed'}, - 'message': 'foo', - } - serialized = json.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, rpc_common.RemoteError)) - - def test_deserialize_remote_exception_user_defined_exception(self): - """Ensure a user defined exception can be deserialized.""" - self.flags(allowed_rpc_exception_modules=[self.__class__.__module__]) - failure = { - 'class': 'FakeUserDefinedException', - 'module': self.__class__.__module__, - 'tb': ['raise FakeUserDefinedException'], - } - serialized = json.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, FakeUserDefinedException)) - #assure the traceback was added - self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc)) - - def test_deserialize_remote_exception_cannot_recreate(self): - """Ensure a RemoteError is returned on initialization failure. - - If an exception cannot be recreated with it's original class then a - RemoteError with the exception informations should still be returned. - - """ - self.flags(allowed_rpc_exception_modules=[self.__class__.__module__]) - failure = { - 'class': 'FakeIDontExistException', - 'module': self.__class__.__module__, - 'tb': ['raise FakeIDontExistException'], - } - serialized = json.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, rpc_common.RemoteError)) - #assure the traceback was added - self.assertTrue('raise FakeIDontExistException' in unicode(after_exc)) diff --git a/cinder/tests/rpc/test_dispatcher.py b/cinder/tests/rpc/test_dispatcher.py deleted file mode 100644 index 7a688c297..000000000 --- a/cinder/tests/rpc/test_dispatcher.py +++ /dev/null @@ -1,109 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for rpc.dispatcher -""" - -from cinder import context -from cinder.rpc import dispatcher -from cinder.rpc import common as rpc_common -from cinder import test - - -class RpcDispatcherTestCase(test.TestCase): - class API1(object): - RPC_API_VERSION = '1.0' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - class API2(object): - RPC_API_VERSION = '2.1' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - class API3(object): - RPC_API_VERSION = '3.5' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - def setUp(self): - self.ctxt = context.RequestContext('fake_user', 'fake_project') - super(RpcDispatcherTestCase, self).setUp() - - def tearDown(self): - super(RpcDispatcherTestCase, self).tearDown() - - def _test_dispatch(self, version, expectations): - v2 = self.API2() - v3 = self.API3() - disp = dispatcher.RpcDispatcher([v2, v3]) - - disp.dispatch(self.ctxt, version, 'test_method', arg1=1) - - self.assertEqual(v2.test_method_ctxt, expectations[0]) - self.assertEqual(v2.test_method_arg1, expectations[1]) - self.assertEqual(v3.test_method_ctxt, expectations[2]) - self.assertEqual(v3.test_method_arg1, expectations[3]) - - def test_dispatch(self): - self._test_dispatch('2.1', (self.ctxt, 1, None, None)) - self._test_dispatch('3.5', (None, None, self.ctxt, 1)) - - def test_dispatch_lower_minor_version(self): - self._test_dispatch('2.0', (self.ctxt, 1, None, None)) - self._test_dispatch('3.1', (None, None, self.ctxt, 1)) - - def test_dispatch_higher_minor_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '2.6', (None, None, None, None)) - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '3.6', (None, None, None, None)) - - def test_dispatch_lower_major_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '1.0', (None, None, None, None)) - - def test_dispatch_higher_major_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '4.0', (None, None, None, None)) - - def test_dispatch_no_version_uses_v1(self): - v1 = self.API1() - disp = dispatcher.RpcDispatcher([v1]) - - disp.dispatch(self.ctxt, None, 'test_method', arg1=1) - - self.assertEqual(v1.test_method_ctxt, self.ctxt) - self.assertEqual(v1.test_method_arg1, 1) diff --git a/cinder/tests/rpc/test_fake.py b/cinder/tests/rpc/test_fake.py deleted file mode 100644 index 4f6722ab3..000000000 --- a/cinder/tests/rpc/test_fake.py +++ /dev/null @@ -1,33 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using fake_impl -""" - -from cinder import log as logging -from cinder.rpc import impl_fake -from cinder.tests.rpc import common - - -LOG = logging.getLogger(__name__) - - -class RpcFakeTestCase(common.BaseRpcTestCase): - def setUp(self): - self.rpc = impl_fake - super(RpcFakeTestCase, self).setUp() diff --git a/cinder/tests/rpc/test_kombu.py b/cinder/tests/rpc/test_kombu.py deleted file mode 100644 index 2001be01e..000000000 --- a/cinder/tests/rpc/test_kombu.py +++ /dev/null @@ -1,371 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using kombu -""" - -from cinder import context -from cinder import exception -from cinder import flags -from cinder import log as logging -from cinder import test -from cinder.rpc import amqp as rpc_amqp -from cinder.tests.rpc import common - -try: - import kombu - from cinder.rpc import impl_kombu -except ImportError: - kombu = None - impl_kombu = None - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class MyException(Exception): - pass - - -def _raise_exc_stub(stubs, times, obj, method, exc_msg, - exc_class=MyException): - info = {'called': 0} - orig_method = getattr(obj, method) - - def _raise_stub(*args, **kwargs): - info['called'] += 1 - if info['called'] <= times: - raise exc_class(exc_msg) - orig_method(*args, **kwargs) - stubs.Set(obj, method, _raise_stub) - return info - - -class RpcKombuTestCase(common.BaseRpcAMQPTestCase): - def setUp(self): - if kombu: - self.rpc = impl_kombu - else: - self.rpc = None - super(RpcKombuTestCase, self).setUp() - - def tearDown(self): - if kombu: - impl_kombu.cleanup() - super(RpcKombuTestCase, self).tearDown() - - @test.skip_if(kombu is None, "Test requires kombu") - def test_reusing_connection(self): - """Test that reusing a connection returns same one.""" - conn_context = self.rpc.create_connection(FLAGS, new=False) - conn1 = conn_context.connection - conn_context.close() - conn_context = self.rpc.create_connection(FLAGS, new=False) - conn2 = conn_context.connection - conn_context.close() - self.assertEqual(conn1, conn2) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_topic_send_receive(self): - """Test sending to a topic exchange/queue""" - - conn = self.rpc.create_connection(FLAGS) - message = 'topic test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_topic_consumer('a_topic', _callback) - conn.topic_send('a_topic', message) - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_direct_send_receive(self): - """Test sending to a direct exchange/queue""" - conn = self.rpc.create_connection(FLAGS) - message = 'direct test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_cast_interface_uses_default_options(self): - """Test kombu rpc.cast""" - - ctxt = context.RequestContext('fake_user', 'fake_project') - - class MyConnection(impl_kombu.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.params, - {'hostname': FLAGS.rabbit_host, - 'userid': FLAGS.rabbit_userid, - 'password': FLAGS.rabbit_password, - 'port': FLAGS.rabbit_port, - 'virtual_host': FLAGS.rabbit_virtual_host, - 'transport': 'memory'}) - - def topic_send(_context, topic, msg): - pass - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_kombu, 'Connection', MyConnection) - - impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'}) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_cast_to_server_uses_server_params(self): - """Test kombu rpc.cast""" - - ctxt = context.RequestContext('fake_user', 'fake_project') - - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337, - 'virtual_host': 'fake_virtual_host'} - - class MyConnection(impl_kombu.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.params, - {'hostname': server_params['hostname'], - 'userid': server_params['username'], - 'password': server_params['password'], - 'port': server_params['port'], - 'virtual_host': server_params['virtual_host'], - 'transport': 'memory'}) - - def topic_send(_context, topic, msg): - pass - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_kombu, 'Connection', MyConnection) - - impl_kombu.cast_to_server(FLAGS, ctxt, server_params, - 'fake_topic', {'msg': 'fake'}) - - @test.skip_test("kombu memory transport seems buggy with fanout queues " - "as this test passes when you use rabbit (fake_rabbit=False)") - def test_fanout_send_receive(self): - """Test sending to a fanout exchange and consuming from 2 queues""" - - conn = self.rpc.create_connection() - conn2 = self.rpc.create_connection() - message = 'fanout test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_fanout_consumer('a_fanout', _callback) - conn2.declare_fanout_consumer('a_fanout', _callback) - conn.fanout_send('a_fanout', message) - - conn.consume(limit=1) - conn.close() - self.assertEqual(self.received_message, message) - - self.received_message = None - conn2.consume(limit=1) - conn2.close() - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_declare_consumer_errors_will_reconnect(self): - # Test that any exception with 'timeout' in it causes a - # reconnection - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, - '__init__', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 3) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - # Test that any exception in transport.connection_errors causes - # a reconnection - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer, - '__init__', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 2) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_declare_consumer_ioerrors_will_reconnect(self): - """Test that an IOError exception causes a reconnection""" - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, - '__init__', 'Socket closed', exc_class=IOError) - - conn = self.rpc.Connection(FLAGS) - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 3) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_publishing_errors_will_reconnect(self): - # Test that any exception with 'timeout' in it causes a - # reconnection when declaring the publisher class and when - # calling send() - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, - '__init__', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 3) - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, - 'send', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 3) - - # Test that any exception in transport.connection_errors causes - # a reconnection when declaring the publisher class and when - # calling send() - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, - '__init__', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 2) - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, - 'send', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 2) - - @test.skip_test("kombu memory transport hangs here on precise") - @test.skip_if(kombu is None, "Test requires kombu") - def test_iterconsume_errors_will_reconnect(self): - conn = self.rpc.Connection(FLAGS) - message = 'reconnect test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) - - info = _raise_exc_stub(self.stubs, 1, conn.connection, - 'drain_events', 'foo timeout foo') - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - # Only called once, because our stub goes away during reconnection - - @test.skip_if(kombu is None, "Test requires kombu") - def test_call_exception(self): - """Test that exception gets passed back properly. - - rpc.call returns an Exception object. The value of the - exception is converted to a string. - - """ - self.flags(allowed_rpc_exception_modules=['exceptions']) - value = "This is the exception message" - self.assertRaises(NotImplementedError, - self.rpc.call, - FLAGS, - self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - try: - self.rpc.call(FLAGS, self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - self.fail("should have thrown Exception") - except NotImplementedError as exc: - self.assertTrue(value in unicode(exc)) - #Traceback should be included in exception message - self.assertTrue('raise NotImplementedError(value)' in unicode(exc)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_call_converted_exception(self): - """Test that exception gets passed back properly. - - rpc.call returns an Exception object. The value of the - exception is converted to a string. - - """ - value = "This is the exception message" - self.assertRaises(exception.ConvertedException, - self.rpc.call, - FLAGS, - self.context, - 'test', - {"method": "fail_converted", - "args": {"value": value}}) - try: - self.rpc.call(FLAGS, self.context, - 'test', - {"method": "fail_converted", - "args": {"value": value}}) - self.fail("should have thrown Exception") - except exception.ConvertedException as exc: - self.assertTrue(value in unicode(exc)) - #Traceback should be included in exception message - self.assertTrue('exception.ConvertedException' in unicode(exc)) diff --git a/cinder/tests/rpc/test_kombu_ssl.py b/cinder/tests/rpc/test_kombu_ssl.py deleted file mode 100644 index 8f7329ae7..000000000 --- a/cinder/tests/rpc/test_kombu_ssl.py +++ /dev/null @@ -1,66 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using kombu + ssl -""" - -from cinder import flags -from cinder import test - -try: - import kombu - from cinder.rpc import impl_kombu -except ImportError: - kombu = None - impl_kombu = None - - -# Flag settings we will ensure get passed to amqplib -SSL_VERSION = "SSLv2" -SSL_CERT = "/tmp/cert.blah.blah" -SSL_CA_CERT = "/tmp/cert.ca.blah.blah" -SSL_KEYFILE = "/tmp/keyfile.blah.blah" - -FLAGS = flags.FLAGS - - -class RpcKombuSslTestCase(test.TestCase): - - def setUp(self): - super(RpcKombuSslTestCase, self).setUp() - if kombu: - self.flags(kombu_ssl_keyfile=SSL_KEYFILE, - kombu_ssl_ca_certs=SSL_CA_CERT, - kombu_ssl_certfile=SSL_CERT, - kombu_ssl_version=SSL_VERSION, - rabbit_use_ssl=True) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_ssl_on_extended(self): - rpc = impl_kombu - conn = rpc.create_connection(FLAGS, True) - c = conn.connection - #This might be kombu version dependent... - #Since we are now peaking into the internals of kombu... - self.assertTrue(isinstance(c.connection.ssl, dict)) - self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version")) - self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile")) - self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs")) - self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile")) - #That hash then goes into amqplib which then goes - #Into python ssl creation... diff --git a/cinder/tests/rpc/test_proxy.py b/cinder/tests/rpc/test_proxy.py deleted file mode 100644 index 6a2934565..000000000 --- a/cinder/tests/rpc/test_proxy.py +++ /dev/null @@ -1,124 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for rpc.proxy -""" - -import copy - -from cinder import context -from cinder import rpc -from cinder.rpc import proxy -from cinder import test - - -class RpcProxyTestCase(test.TestCase): - - def setUp(self): - super(RpcProxyTestCase, self).setUp() - - def tearDown(self): - super(RpcProxyTestCase, self).tearDown() - - def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False, - server_params=None, supports_topic_override=True): - topic = 'fake_topic' - timeout = 123 - rpc_proxy = proxy.RpcProxy(topic, '1.0') - ctxt = context.RequestContext('fake_user', 'fake_project') - msg = {'method': 'fake_method', 'args': {'x': 'y'}} - expected_msg = {'method': 'fake_method', 'args': {'x': 'y'}, - 'version': '1.0'} - - expected_retval = 'hi' if has_retval else None - - self.fake_args = None - self.fake_kwargs = None - - def _fake_rpc_method(*args, **kwargs): - self.fake_args = args - self.fake_kwargs = kwargs - if has_retval: - return expected_retval - - self.stubs.Set(rpc, rpc_method, _fake_rpc_method) - - args = [ctxt, msg] - if server_params: - args.insert(1, server_params) - - # Base method usage - retval = getattr(rpc_proxy, rpc_method)(*args) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - # overriding the version - retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1') - self.assertEqual(retval, expected_retval) - new_msg = copy.deepcopy(expected_msg) - new_msg['version'] = '1.1' - expected_args = [ctxt, topic, new_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - if has_timeout: - # set a timeout - retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg, timeout] - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - if supports_topic_override: - # set a topic - new_topic = 'foo.bar' - retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, new_topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - def test_call(self): - self._test_rpc_method('call', has_timeout=True, has_retval=True) - - def test_multicall(self): - self._test_rpc_method('multicall', has_timeout=True, has_retval=True) - - def test_cast(self): - self._test_rpc_method('cast') - - def test_fanout_cast(self): - self._test_rpc_method('fanout_cast', supports_topic_override=False) - - def test_cast_to_server(self): - self._test_rpc_method('cast_to_server', server_params={'blah': 1}) - - def test_fanout_cast_to_server(self): - self._test_rpc_method('fanout_cast_to_server', - server_params={'blah': 1}, supports_topic_override=False) - - def test_make_msg(self): - self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2), - {'method': 'test_method', 'args': {'a': 1, 'b': 2}}) diff --git a/cinder/tests/rpc/test_qpid.py b/cinder/tests/rpc/test_qpid.py deleted file mode 100644 index 4f28a76a3..000000000 --- a/cinder/tests/rpc/test_qpid.py +++ /dev/null @@ -1,343 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using qpid -""" - -import mox - -from cinder import context -from cinder import flags -from cinder import log as logging -from cinder.rpc import amqp as rpc_amqp -from cinder import test - -try: - import qpid - from cinder.rpc import impl_qpid -except ImportError: - qpid = None - impl_qpid = None - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class RpcQpidTestCase(test.TestCase): - """ - Exercise the public API of impl_qpid utilizing mox. - - This set of tests utilizes mox to replace the Qpid objects and ensures - that the right operations happen on them when the various public rpc API - calls are exercised. The API calls tested here include: - - cinder.rpc.create_connection() - cinder.rpc.common.Connection.create_consumer() - cinder.rpc.common.Connection.close() - cinder.rpc.cast() - cinder.rpc.fanout_cast() - cinder.rpc.call() - cinder.rpc.multicall() - """ - - def setUp(self): - super(RpcQpidTestCase, self).setUp() - - self.mock_connection = None - self.mock_session = None - self.mock_sender = None - self.mock_receiver = None - - if qpid: - self.orig_connection = qpid.messaging.Connection - self.orig_session = qpid.messaging.Session - self.orig_sender = qpid.messaging.Sender - self.orig_receiver = qpid.messaging.Receiver - qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection - qpid.messaging.Session = lambda *_x, **_y: self.mock_session - qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender - qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver - - def tearDown(self): - if qpid: - qpid.messaging.Connection = self.orig_connection - qpid.messaging.Session = self.orig_session - qpid.messaging.Sender = self.orig_sender - qpid.messaging.Receiver = self.orig_receiver - if impl_qpid: - # Need to reset this in case we changed the connection_cls - # in self._setup_to_server_tests() - impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection - - super(RpcQpidTestCase, self).tearDown() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_connection(self): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - self.mock_connection.close() - - self.mox.ReplayAll() - - connection = impl_qpid.create_connection(FLAGS) - connection.close() - - def _test_create_consumer(self, fanout): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - if fanout: - # The link name includes a UUID, so match it with a regex. - expected_address = mox.Regex(r'^impl_qpid_test_fanout ; ' - '{"node": {"x-declare": {"auto-delete": true, "durable": ' - 'false, "type": "fanout"}, "type": "topic"}, "create": ' - '"always", "link": {"x-declare": {"auto-delete": true, ' - '"exclusive": true, "durable": false}, "durable": true, ' - '"name": "impl_qpid_test_fanout_.*"}}$') - else: - expected_address = ( - 'cinder/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": true}, "type": "topic"}, ' - '"create": "always", "link": {"x-declare": {"auto-delete": ' - 'true, "exclusive": false, "durable": false}, "durable": ' - 'true, "name": "impl_qpid_test"}}') - self.mock_session.receiver(expected_address).AndReturn( - self.mock_receiver) - self.mock_receiver.capacity = 1 - self.mock_connection.close() - - self.mox.ReplayAll() - - connection = impl_qpid.create_connection(FLAGS) - connection.create_consumer("impl_qpid_test", - lambda *_x, **_y: None, - fanout) - connection.close() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_consumer(self): - self._test_create_consumer(fanout=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_consumer_fanout(self): - self._test_create_consumer(fanout=True) - - def _test_cast(self, fanout, server_params=None): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - - self.mock_connection.session().AndReturn(self.mock_session) - if fanout: - expected_address = ('impl_qpid_test_fanout ; ' - '{"node": {"x-declare": {"auto-delete": true, ' - '"durable": false, "type": "fanout"}, ' - '"type": "topic"}, "create": "always"}') - else: - expected_address = ( - 'cinder/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(expected_address).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - if not server_params: - # This is a pooled connection, so instead of closing it, it - # gets reset, which is just creating a new session on the - # connection. - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - - self.mox.ReplayAll() - - try: - ctx = context.RequestContext("user", "project") - - args = [FLAGS, ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}] - - if server_params: - args.insert(2, server_params) - if fanout: - method = impl_qpid.fanout_cast_to_server - else: - method = impl_qpid.cast_to_server - else: - if fanout: - method = impl_qpid.fanout_cast - else: - method = impl_qpid.cast - - method(*args) - finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_cast(self): - self._test_cast(fanout=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_fanout_cast(self): - self._test_cast(fanout=True) - - def _setup_to_server_tests(self, server_params): - class MyConnection(impl_qpid.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.connection.username, - server_params['username']) - self.assertEqual(myself.connection.password, - server_params['password']) - self.assertEqual(myself.broker, - server_params['hostname'] + ':' + - str(server_params['port'])) - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_qpid, 'Connection', MyConnection) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_cast_to_server(self): - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337} - self._setup_to_server_tests(server_params) - self._test_cast(fanout=False, server_params=server_params) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_fanout_cast_to_server(self): - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337} - self._setup_to_server_tests(server_params) - self._test_cast(fanout=True, server_params=server_params) - - def _test_call(self, multi): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' - ' true, "durable": true, "type": "direct"}, "type": ' - '"topic"}, "create": "always", "link": {"x-declare": ' - '{"auto-delete": true, "exclusive": true, "durable": ' - 'false}, "durable": true, "name": ".*"}}') - self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) - self.mock_receiver.capacity = 1 - send_addr = ('cinder/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(send_addr).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - if multi: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message( - {"result": "bar", "failure": False, - "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message( - {"result": "baz", "failure": False, - "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - - self.mox.ReplayAll() - - try: - ctx = context.RequestContext("user", "project") - - if multi: - method = impl_qpid.multicall - else: - method = impl_qpid.call - - res = method(FLAGS, ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) - - if multi: - self.assertEquals(list(res), ["foo", "bar", "baz"]) - else: - self.assertEquals(res, "foo") - finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_call(self): - self._test_call(multi=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_multicall(self): - self._test_call(multi=True) - - -# -#from cinder.tests.rpc import common -# -# Qpid does not have a handy in-memory transport like kombu, so it's not -# terribly straight forward to take advantage of the common unit tests. -# However, at least at the time of this writing, the common unit tests all pass -# with qpidd running. -# -# class RpcQpidCommonTestCase(common._BaseRpcTestCase): -# def setUp(self): -# self.rpc = impl_qpid -# super(RpcQpidCommonTestCase, self).setUp() -# -# def tearDown(self): -# super(RpcQpidCommonTestCase, self).tearDown() -# diff --git a/cinder/tests/scheduler/test_rpcapi.py b/cinder/tests/scheduler/test_rpcapi.py index d1a030bf0..48c52d955 100644 --- a/cinder/tests/scheduler/test_rpcapi.py +++ b/cinder/tests/scheduler/test_rpcapi.py @@ -20,7 +20,7 @@ Unit Tests for cinder.scheduler.rpcapi from cinder import context from cinder import flags -from cinder import rpc +from cinder.openstack.common import rpc from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test diff --git a/cinder/tests/scheduler/test_scheduler.py b/cinder/tests/scheduler/test_scheduler.py index 0575d0c11..d4d798028 100644 --- a/cinder/tests/scheduler/test_scheduler.py +++ b/cinder/tests/scheduler/test_scheduler.py @@ -27,9 +27,9 @@ from cinder import db from cinder import exception from cinder import flags from cinder.notifier import api as notifier -from cinder import rpc +from cinder.openstack.common import rpc +from cinder.openstack.common.rpc import common as rpc_common from cinder.openstack.common import timeutils -from cinder.rpc import common as rpc_common from cinder.scheduler import driver from cinder.scheduler import manager from cinder import test diff --git a/cinder/tests/test_notifier.py b/cinder/tests/test_notifier.py index e147f786c..e341121b4 100644 --- a/cinder/tests/test_notifier.py +++ b/cinder/tests/test_notifier.py @@ -67,7 +67,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(cls, *args): self.mock_notify = True - self.stubs.Set(cinder.rpc, 'notify', mock_notify) + self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify) notifier_api.notify('publisher_id', 'event_type', cinder.notifier.api.WARN, dict(a=3)) @@ -90,7 +90,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(context, topic, msg): self.test_topic = topic - self.stubs.Set(cinder.rpc, 'notify', mock_notify) + self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify) notifier_api.notify('publisher_id', 'event_type', 'DEBUG', dict(a=3)) self.assertEqual(self.test_topic, 'testnotify.debug') @@ -105,7 +105,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(context, topic, data): msgs.append(data) - self.stubs.Set(cinder.rpc, 'notify', mock_notify) + self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify) LOG.error('foo') self.assertEqual(1, len(msgs)) msg = msgs[0] diff --git a/cinder/tests/test_quota.py b/cinder/tests/test_quota.py index 962768e5b..ecdfbfbaa 100644 --- a/cinder/tests/test_quota.py +++ b/cinder/tests/test_quota.py @@ -18,10 +18,10 @@ from cinder import context from cinder import db +from cinder import exception from cinder import flags from cinder import quota -from cinder import exception -from cinder import rpc +from cinder.openstack.common import rpc from cinder import test from cinder import volume from cinder.scheduler import driver as scheduler_driver diff --git a/cinder/tests/test_test.py b/cinder/tests/test_test.py index 8ff84c1ad..be17f3916 100644 --- a/cinder/tests/test_test.py +++ b/cinder/tests/test_test.py @@ -18,7 +18,7 @@ """Tests for the testing base code.""" -from cinder import rpc +from cinder.openstack.common import rpc from cinder import test diff --git a/cinder/tests/test_volume.py b/cinder/tests/test_volume.py index 7f131a7eb..48fe81d39 100644 --- a/cinder/tests/test_volume.py +++ b/cinder/tests/test_volume.py @@ -30,8 +30,8 @@ from cinder import db from cinder import flags from cinder import log as logging from cinder.openstack.common import importutils +from cinder.openstack.common import rpc import cinder.policy -from cinder import rpc from cinder import test import cinder.volume.api diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 47e330650..699952223 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -27,10 +27,10 @@ from eventlet import greenthread from cinder import exception from cinder import flags from cinder import log as logging +from cinder.openstack.common import rpc import cinder.policy from cinder.openstack.common import timeutils from cinder import quota -from cinder import rpc from cinder import utils from cinder.db import base diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 2afa1e380..5d8901d94 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -46,8 +46,8 @@ from cinder import manager from cinder.openstack.common import cfg from cinder.openstack.common import excutils from cinder.openstack.common import importutils +from cinder.openstack.common import rpc from cinder.openstack.common import timeutils -from cinder import rpc from cinder import utils from cinder.volume import volume_types diff --git a/openstack-common.conf b/openstack-common.conf index 89073e70d..7fa5da702 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,timeutils +modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils # The base module to hold the copy of openstack.common base=cinder