[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup
+modules=cfg,context,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,policy,rpc,setup
# The base module to hold the copy of openstack.common
base=quantum
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Simple class that stores security context information in the web request.
+
+Projects should subclass this class if they wish to enhance the request
+context or provide additional information in their specific WSGI pipeline.
+"""
+
+import itertools
+import uuid
+
+
+def generate_request_id():
+ return 'req-' + str(uuid.uuid4())
+
+
+class RequestContext(object):
+
+ """
+ Stores information about the security context under which the user
+ accesses the system, as well as additional request information.
+ """
+
+ def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
+ read_only=False, show_deleted=False, request_id=None):
+ self.auth_tok = auth_tok
+ self.user = user
+ self.tenant = tenant
+ self.is_admin = is_admin
+ self.read_only = read_only
+ self.show_deleted = show_deleted
+ if not request_id:
+ request_id = generate_request_id()
+ self.request_id = request_id
+
+ def to_dict(self):
+ return {'user': self.user,
+ 'tenant': self.tenant,
+ 'is_admin': self.is_admin,
+ 'read_only': self.read_only,
+ 'show_deleted': self.show_deleted,
+ 'auth_token': self.auth_tok,
+ 'request_id': self.request_id}
+
+
+def get_admin_context(show_deleted="no"):
+ context = RequestContext(None,
+ tenant=None,
+ is_admin=True,
+ show_deleted=show_deleted)
+ return context
+
+
+def get_context_from_function_and_args(function, args, kwargs):
+ """Find an arg of type RequestContext and return it.
+
+ This is useful in a couple of decorators where we don't
+ know much about the function we're wrapping.
+ """
+
+ for arg in itertools.chain(kwargs.values(), args):
+ if isinstance(arg, RequestContext):
+ return arg
+
+ return None
Exceptions common to OpenStack projects
"""
+import itertools
import logging
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Exception related utilities.
+"""
+
+import contextlib
+import logging
+import sys
+import traceback
+
+
+@contextlib.contextmanager
+def save_and_reraise_exception():
+ """Save current exception, run some code and then re-raise.
+
+ In some cases the exception context can be cleared, resulting in None
+ being attempted to be reraised after an exception handler is run. This
+ can happen when eventlet switches greenthreads or when running an
+ exception handler, code raises and catches an exception. In both
+ cases the exception context will be cleared.
+
+ To work around this, we save the exception state, run handler code, and
+ then re-raise the original exception. If another exception occurs, the
+ saved exception is logged and the new exception is reraised.
+ """
+ type_, value, tb = sys.exc_info()
+ try:
+ yield
+ except Exception:
+ logging.error('Original exception being dropped: %s' %
+ (traceback.format_exception(type_, value, tb)))
+ raise
+ raise type_, value, tb
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+gettext for openstack-common modules.
+
+Usual usage in an openstack.common module:
+
+ from openstack.common.gettextutils import _
+"""
+
+import gettext
+
+
+t = gettext.translation('openstack-common', 'locale', fallback=True)
+
+
+def _(msg):
+ return t.ugettext(msg)
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc:
raise ImportError('Class %s cannot be found (%s)' %
- (class_str, str(exc)))
+ (class_str, str(exc)))
def import_object(import_str, *args, **kwargs):
return import_class(import_str)(*args, **kwargs)
+def import_object_ns(name_space, import_str, *args, **kwargs):
+ """
+ Import a class and return an instance of it, first by trying
+ to find the class in a default namespace, then failing back to
+ a full path if not found in the default namespace.
+ """
+ import_value = "%s.%s" % (name_space, import_str)
+ try:
+ return import_class(import_value)(*args, **kwargs)
+ except ImportError:
+ return import_class(import_str)(*args, **kwargs)
+
+
def import_module(import_str):
"""Import a module."""
__import__(import_str)
else:
key, value = line[:colon], line[colon + 1:]
- return key.strip(), [value.strip()]
+ value = value.strip()
+ if value[0] == value[-1] and value[0] == "\"" or value[0] == "'":
+ value = value[1:-1]
+ return key.strip(), [value]
def parse(self, lineiter):
key = None
import inspect
import itertools
import json
+import xmlrpclib
def to_primitive(value, convert_instances=False, level=0):
# The try block may not be necessary after the class check above,
# but just in case ...
try:
+ # It's not clear why xmlrpclib created their own DateTime type, but
+ # for our purposes, make it a datetime type which is explicitly
+ # handled
+ if isinstance(value, xmlrpclib.DateTime):
+ value = datetime.datetime(*tuple(value.timetuple())[:6])
+
if isinstance(value, (list, tuple)):
o = []
for v in value:
return unicode(value)
-def dumps(value):
- return json.dumps(value, default=to_primitive)
+def dumps(value, default=to_primitive, **kwargs):
+ return json.dumps(value, default=default, **kwargs)
def loads(s):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Greenthread local storage of variables using weak references"""
+
+import weakref
+
+from eventlet import corolocal
+
+
+class WeakLocal(corolocal.local):
+ def __getattribute__(self, attr):
+ rval = corolocal.local.__getattribute__(self, attr)
+ if rval:
+ rval = rval()
+ return rval
+
+ def __setattr__(self, attr, value):
+ value = weakref.ref(value)
+ return corolocal.local.__setattr__(self, attr, value)
+
+
+store = WeakLocal()
"""Common Policy Engine Implementation"""
-import json
import logging
import urllib
import urllib2
+from quantum.openstack.common import jsonutils
+
LOG = logging.getLogger(__name__)
@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
- rules_dict = json.loads(data)
+ rules_dict = jsonutils.loads(data)
return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None):
"""
url = match % target_dict
- data = {'target': json.dumps(target_dict),
- 'credentials': json.dumps(cred_dict)}
+ data = {'target': jsonutils.dumps(target_dict),
+ 'credentials': jsonutils.dumps(cred_dict)}
post_data = urllib.urlencode(data)
f = urllib2.urlopen(url, post_data)
return f.read() == "True"
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+ rpc.dispatcher
+ rpc.proxy
+"""
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
+
+
+rpc_opts = [
+ cfg.StrOpt('rpc_backend',
+ default='%s.impl_kombu' % __package__,
+ help="The messaging module to use, defaults to kombu."),
+ cfg.IntOpt('rpc_thread_pool_size',
+ default=64,
+ help='Size of RPC thread pool'),
+ cfg.IntOpt('rpc_conn_pool_size',
+ default=30,
+ help='Size of RPC connection pool'),
+ cfg.IntOpt('rpc_response_timeout',
+ default=60,
+ help='Seconds to wait for a response from call or multicall'),
+ cfg.IntOpt('rpc_cast_timeout',
+ default=30,
+ help='Seconds to wait before a cast expires (TTL). '
+ 'Only supported by impl_zmq.'),
+ cfg.ListOpt('allowed_rpc_exception_modules',
+ default=['quantum.openstack.common.exception',
+ 'nova.exception'],
+ help='Modules of exceptions that are permitted to be recreated'
+ 'upon receiving exception data from an rpc call.'),
+ cfg.StrOpt('control_exchange',
+ default='nova',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.BoolOpt('fake_rabbit',
+ default=False,
+ help='If passed, use a fake RabbitMQ provider'),
+]
+
+cfg.CONF.register_opts(rpc_opts)
+
+
+def create_connection(new=True):
+ """Create a connection to the message bus used for rpc.
+
+ For some example usage of creating a connection and some consumers on that
+ connection, see nova.service.
+
+ :param new: Whether or not to create a new connection. A new connection
+ will be created by default. If new is False, the
+ implementation is free to return an existing connection from a
+ pool.
+
+ :returns: An instance of openstack.common.rpc.common.Connection
+ """
+ return _get_impl().create_connection(cfg.CONF, new=new)
+
+
+def call(context, topic, msg, timeout=None):
+ """Invoke a remote method that returns something.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
+
+ :returns: A dict from the remote method.
+
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
+ """
+ return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
+
+
+def cast(context, topic, msg):
+ """Invoke a remote method that does not return anything.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().cast(cfg.CONF, context, topic, msg)
+
+
+def fanout_cast(context, topic, msg):
+ """Broadcast a remote method invocation with no return.
+
+ This method will get invoked on all consumers that were set up with this
+ topic name and fanout=True.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=True.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
+
+
+def multicall(context, topic, msg, timeout=None):
+ """Invoke a remote method and get back an iterator.
+
+ In this case, the remote method will be returning multiple values in
+ separate messages, so the return values can be processed as the come in via
+ an iterator.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
+
+ :returns: An iterator. The iterator will yield a tuple (N, X) where N is
+ an index that starts at 0 and increases by one for each value
+ returned and X is the Nth value that was returned by the remote
+ method.
+
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
+ """
+ return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
+
+
+def notify(context, topic, msg):
+ """Send notification event.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the notification to.
+ :param msg: This is a dict of content of event.
+
+ :returns: None
+ """
+ return _get_impl().notify(cfg.CONF, context, topic, msg)
+
+
+def cleanup():
+ """Clean up resoruces in use by implementation.
+
+ Clean up any resources that have been allocated by the RPC implementation.
+ This is typically open connections to a messaging service. This function
+ would get called before an application using this API exits to allow
+ connections to get torn down cleanly.
+
+ :returns: None
+ """
+ return _get_impl().cleanup()
+
+
+def cast_to_server(context, server_params, topic, msg):
+ """Invoke a remote method that does not return anything.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param server_params: Connection information
+ :param topic: The topic to send the notification to.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ msg)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+ """Broadcast to a remote method invocation with no return.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param server_params: Connection information
+ :param topic: The topic to send the notification to.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ topic, msg)
+
+
+def queue_get_for(context, topic, host):
+ """Get a queue name for a given topic + host.
+
+ This function only works if this naming convention is followed on the
+ consumer side, as well. For example, in nova, every instance of the
+ nova-foo service calls create_consumer() for two topics:
+
+ foo
+ foo.<host>
+
+ Messages sent to the 'foo' topic are distributed to exactly one instance of
+ the nova-foo service. The services are chosen in a round-robin fashion.
+ Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
+ <host>.
+ """
+ return '%s.%s' % (topic, host)
+
+
+_RPCIMPL = None
+
+
+def _get_impl():
+ """Delay import of rpc_backend until configuration is loaded."""
+ global _RPCIMPL
+ if _RPCIMPL is None:
+ try:
+ _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ except ImportError:
+ # For backwards compatibility with older nova config.
+ impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
+ _RPCIMPL = importutils.import_module(impl)
+ return _RPCIMPL
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 - 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Shared code between AMQP based openstack.common.rpc implementations.
+
+The code in this module is shared between the rpc implemenations based on AMQP.
+Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
+AMQP, but is deprecated and predates this code.
+"""
+
+import inspect
+import logging
+import sys
+import uuid
+
+from eventlet import greenpool
+from eventlet import pools
+from eventlet import semaphore
+
+from quantum.openstack.common import excutils
+from quantum.openstack.common import local
+from quantum.openstack.common.rpc import common as rpc_common
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Pool(pools.Pool):
+ """Class that implements a Pool of Connections."""
+ def __init__(self, conf, connection_cls, *args, **kwargs):
+ self.connection_cls = connection_cls
+ self.conf = conf
+ kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
+ kwargs.setdefault("order_as_stack", True)
+ super(Pool, self).__init__(*args, **kwargs)
+
+ # TODO(comstud): Timeout connections not used in a while
+ def create(self):
+ LOG.debug('Pool creating new connection')
+ return self.connection_cls(self.conf)
+
+ def empty(self):
+ while self.free_items:
+ self.get().close()
+
+
+_pool_create_sem = semaphore.Semaphore()
+
+
+def get_connection_pool(conf, connection_cls):
+ with _pool_create_sem:
+ # Make sure only one thread tries to create the connection pool.
+ if not connection_cls.pool:
+ connection_cls.pool = Pool(conf, connection_cls)
+ return connection_cls.pool
+
+
+class ConnectionContext(rpc_common.Connection):
+ """The class that is actually returned to the caller of
+ create_connection(). This is essentially a wrapper around
+ Connection that supports 'with'. It can also return a new
+ Connection, or one from a pool. The function will also catch
+ when an instance of this class is to be deleted. With that
+ we can return Connections to the pool on exceptions and so
+ forth without making the caller be responsible for catching
+ them. If possible the function makes sure to return a
+ connection to the pool.
+ """
+
+ def __init__(self, conf, connection_pool, pooled=True, server_params=None):
+ """Create a new connection, or get one from the pool"""
+ self.connection = None
+ self.conf = conf
+ self.connection_pool = connection_pool
+ if pooled:
+ self.connection = connection_pool.get()
+ else:
+ self.connection = connection_pool.connection_cls(
+ conf,
+ server_params=server_params)
+ self.pooled = pooled
+
+ def __enter__(self):
+ """When with ConnectionContext() is used, return self"""
+ return self
+
+ def _done(self):
+ """If the connection came from a pool, clean it up and put it back.
+ If it did not come from a pool, close it.
+ """
+ if self.connection:
+ if self.pooled:
+ # Reset the connection so it's ready for the next caller
+ # to grab from the pool
+ self.connection.reset()
+ self.connection_pool.put(self.connection)
+ else:
+ try:
+ self.connection.close()
+ except Exception:
+ pass
+ self.connection = None
+
+ def __exit__(self, exc_type, exc_value, tb):
+ """End of 'with' statement. We're done here."""
+ self._done()
+
+ def __del__(self):
+ """Caller is done with this connection. Make sure we cleaned up."""
+ self._done()
+
+ def close(self):
+ """Caller is done with this connection."""
+ self._done()
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ self.connection.create_consumer(topic, proxy, fanout)
+
+ def create_worker(self, topic, proxy, pool_name):
+ self.connection.create_worker(topic, proxy, pool_name)
+
+ def consume_in_thread(self):
+ self.connection.consume_in_thread()
+
+ def __getattr__(self, key):
+ """Proxy all other calls to the Connection instance"""
+ if self.connection:
+ return getattr(self.connection, key)
+ else:
+ raise rpc_common.InvalidRPCConnectionReuse()
+
+
+def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+ ending=False):
+ """Sends a reply or an error on the channel signified by msg_id.
+
+ Failure should be a sys.exc_info() tuple.
+
+ """
+ with ConnectionContext(conf, connection_pool) as conn:
+ if failure:
+ failure = rpc_common.serialize_remote_exception(failure)
+
+ try:
+ msg = {'result': reply, 'failure': failure}
+ except TypeError:
+ msg = {'result': dict((k, repr(v))
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
+ if ending:
+ msg['ending'] = True
+ conn.direct_send(msg_id, msg)
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call"""
+ def __init__(self, **kwargs):
+ self.msg_id = kwargs.pop('msg_id', None)
+ self.conf = kwargs.pop('conf')
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['conf'] = self.conf
+ values['msg_id'] = self.msg_id
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False,
+ connection_pool=None):
+ if self.msg_id:
+ msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
+ ending)
+ if ending:
+ self.msg_id = None
+
+
+def unpack_context(conf, msg):
+ """Unpack context from msg."""
+ context_dict = {}
+ for key in list(msg.keys()):
+ # NOTE(vish): Some versions of python don't like unicode keys
+ # in kwargs.
+ key = str(key)
+ if key.startswith('_context_'):
+ value = msg.pop(key)
+ context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['conf'] = conf
+ ctx = RpcContext.from_dict(context_dict)
+ rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+ return ctx
+
+
+def pack_context(msg, context):
+ """Pack context into msg.
+
+ Values for message keys need to be less than 255 chars, so we pull
+ context out into a bunch of separate keys. If we want to support
+ more arguments in rabbit messages, we may want to do the same
+ for args at some point.
+
+ """
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in context.to_dict().iteritems()])
+ msg.update(context_d)
+
+
+class ProxyCallback(object):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, conf, proxy, connection_pool):
+ self.proxy = proxy
+ self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
+ self.connection_pool = connection_pool
+ self.conf = conf
+
+ def __call__(self, message_data):
+ """Consumer callback to call a method on a proxy object.
+
+ Parses the message for validity and fires off a thread to call the
+ proxy object method.
+
+ Message data should be a dictionary with two keys:
+ method: string representing the method to call
+ args: dictionary of arg: value
+
+ Example: {'method': 'echo', 'args': {'value': 42}}
+
+ """
+ # It is important to clear the context here, because at this point
+ # the previous context is stored in local.store.context
+ if hasattr(local.store, 'context'):
+ del local.store.context
+ rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ ctxt = unpack_context(self.conf, message_data)
+ method = message_data.get('method')
+ args = message_data.get('args', {})
+ version = message_data.get('version', None)
+ if not method:
+ LOG.warn(_('no method for message: %s') % message_data)
+ ctxt.reply(_('No method for message: %s') % message_data,
+ connection_pool=self.connection_pool)
+ return
+ self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+
+ def _process_data(self, ctxt, version, method, args):
+ """Process a message in a new thread.
+
+ If the proxy object we have has a dispatch method
+ (see rpc.dispatcher.RpcDispatcher), pass it the version,
+ method, and args and let it dispatch as appropriate. If not, use
+ the old behavior of magically calling the specified method on the
+ proxy we have here.
+ """
+ ctxt.update_store()
+ try:
+ rval = self.proxy.dispatch(ctxt, version, method, **args)
+ # Check if the result was a generator
+ if inspect.isgenerator(rval):
+ for x in rval:
+ ctxt.reply(x, None, connection_pool=self.connection_pool)
+ else:
+ ctxt.reply(rval, None, connection_pool=self.connection_pool)
+ # This final None tells multicall that it is done.
+ ctxt.reply(ending=True, connection_pool=self.connection_pool)
+ except Exception as e:
+ LOG.exception('Exception during message handling')
+ ctxt.reply(None, sys.exc_info(),
+ connection_pool=self.connection_pool)
+
+
+class MulticallWaiter(object):
+ def __init__(self, conf, connection, timeout):
+ self._connection = connection
+ self._iterator = connection.iterconsume(timeout=timeout or
+ conf.rpc_response_timeout)
+ self._result = None
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ self._iterator.close()
+ self._iterator = None
+ self._connection.close()
+
+ def __call__(self, data):
+ """The consume() callback will call this. Store the result."""
+ if data['failure']:
+ failure = data['failure']
+ self._result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ self._result = data['result']
+
+ def __iter__(self):
+ """Return a result until we get a 'None' response from consumer"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ self._iterator.next()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ result = self._result
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+
+
+def create_connection(conf, new, connection_pool):
+ """Create a connection"""
+ return ConnectionContext(conf, connection_pool, pooled=not new)
+
+
+def multicall(conf, context, topic, msg, timeout, connection_pool):
+ """Make a call that returns multiple times."""
+ # Can't use 'with' for multicall, as it returns an iterator
+ # that will continue to use the connection. When it's done,
+ # connection.close() will get called which will put it back into
+ # the pool
+ LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug(_('MSG_ID is %s') % (msg_id))
+ pack_context(msg, context)
+
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, msg)
+ return wait_msg
+
+
+def call(conf, context, topic, msg, timeout, connection_pool):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(conf, context, topic, msg, timeout, connection_pool)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
+
+
+def cast(conf, context, topic, msg, connection_pool):
+ """Sends a message on a topic without waiting for a response."""
+ LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, msg)
+
+
+def fanout_cast(conf, context, topic, msg, connection_pool):
+ """Sends a message on a fanout exchange without waiting for a response."""
+ LOG.debug(_('Making asynchronous fanout cast...'))
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.fanout_send(topic, msg)
+
+
+def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+ """Sends a message on a topic to a specific server."""
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
+ conn.topic_send(topic, msg)
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg,
+ connection_pool):
+ """Sends a message on a fanout exchange to a specific server."""
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
+ conn.fanout_send(topic, msg)
+
+
+def notify(conf, context, topic, msg, connection_pool):
+ """Sends a notification event on a topic."""
+ event_type = msg.get('event_type')
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.notify_send(topic, msg)
+
+
+def cleanup(connection_pool):
+ if connection_pool:
+ connection_pool.empty()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import logging
+import sys
+import traceback
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import local
+from quantum.openstack.common.gettextutils import _
+
+
+LOG = logging.getLogger(__name__)
+
+
+class RPCException(Exception):
+ message = _("An unknown RPC related exception occurred.")
+
+ def __init__(self, message=None, **kwargs):
+ self.kwargs = kwargs
+
+ if not message:
+ try:
+ message = self.message % kwargs
+
+ except Exception as e:
+ # kwargs doesn't match a variable in the message
+ # log the issue and the kwargs
+ LOG.exception(_('Exception in string format operation'))
+ for name, value in kwargs.iteritems():
+ LOG.error("%s: %s" % (name, value))
+ # at least get the core message out if something happened
+ message = self.message
+
+ super(RPCException, self).__init__(message)
+
+
+class RemoteError(RPCException):
+ """Signifies that a remote class has raised an exception.
+
+ Contains a string representation of the type of the original exception,
+ the value of the original exception, and the traceback. These are
+ sent to the parent as a joined string so printing the exception
+ contains all of the relevant info.
+
+ """
+ message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+
+ def __init__(self, exc_type=None, value=None, traceback=None):
+ self.exc_type = exc_type
+ self.value = value
+ self.traceback = traceback
+ super(RemoteError, self).__init__(exc_type=exc_type,
+ value=value,
+ traceback=traceback)
+
+
+class Timeout(RPCException):
+ """Signifies that a timeout has occurred.
+
+ This exception is raised if the rpc_response_timeout is reached while
+ waiting for a response from the remote side.
+ """
+ message = _("Timeout while waiting on RPC response.")
+
+
+class InvalidRPCConnectionReuse(RPCException):
+ message = _("Invalid reuse of an RPC connection.")
+
+
+class UnsupportedRpcVersion(RPCException):
+ message = _("Specified RPC version, %(version)s, not supported by "
+ "this endpoint.")
+
+
+class Connection(object):
+ """A connection, returned by rpc.create_connection().
+
+ This class represents a connection to the message bus used for rpc.
+ An instance of this class should never be created by users of the rpc API.
+ Use rpc.create_connection() instead.
+ """
+ def close(self):
+ """Close the connection.
+
+ This method must be called when the connection will no longer be used.
+ It will ensure that any resources associated with the connection, such
+ as a network connection, and cleaned up.
+ """
+ raise NotImplementedError()
+
+ def create_consumer(self, conf, topic, proxy, fanout=False):
+ """Create a consumer on this connection.
+
+ A consumer is associated with a message queue on the backend message
+ bus. The consumer will read messages from the queue, unpack them, and
+ dispatch them to the proxy object. The contents of the message pulled
+ off of the queue will determine which method gets called on the proxy
+ object.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic. For example, all instances of nova-compute consume
+ from a queue called "compute". In that case, the
+ messages will get distributed amongst the consumers in a
+ round-robin fashion if fanout=False. If fanout=True,
+ every consumer associated with this topic will get a
+ copy of every message.
+ :param proxy: The object that will handle all incoming messages.
+ :param fanout: Whether or not this is a fanout topic. See the
+ documentation for the topic parameter for some
+ additional comments on this.
+ """
+ raise NotImplementedError()
+
+ def create_worker(self, conf, topic, proxy, pool_name):
+ """Create a worker on this connection.
+
+ A worker is like a regular consumer of messages directed to a
+ topic, except that it is part of a set of such consumers (the
+ "pool") which may run in parallel. Every pool of workers will
+ receive a given message, but only one worker in the pool will
+ be asked to process it. Load is distributed across the members
+ of the pool in round-robin fashion.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic.
+ :param proxy: The object that will handle all incoming messages.
+ :param pool_name: String containing the name of the pool of workers
+ """
+ raise NotImplementedError()
+
+ def consume_in_thread(self):
+ """Spawn a thread to handle incoming messages.
+
+ Spawn a thread that will be responsible for handling all incoming
+ messages for consumers that were set up on this connection.
+
+ Message dispatching inside of this is expected to be implemented in a
+ non-blocking manner. An example implementation would be having this
+ thread pull messages in for all of the consumers, but utilize a thread
+ pool for dispatching the messages to the proxy objects.
+ """
+ raise NotImplementedError()
+
+
+def _safe_log(log_func, msg, msg_data):
+ """Sanitizes the msg_data field before logging."""
+ SANITIZE = {'set_admin_password': ('new_pass',),
+ 'run_instance': ('admin_password',), }
+
+ has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
+ has_context_token = '_context_auth_token' in msg_data
+ has_token = 'auth_token' in msg_data
+
+ if not any([has_method, has_context_token, has_token]):
+ return log_func(msg, msg_data)
+
+ msg_data = copy.deepcopy(msg_data)
+
+ if has_method:
+ method = msg_data['method']
+ if method in SANITIZE:
+ args_to_sanitize = SANITIZE[method]
+ for arg in args_to_sanitize:
+ try:
+ msg_data['args'][arg] = "<SANITIZED>"
+ except KeyError:
+ pass
+
+ if has_context_token:
+ msg_data['_context_auth_token'] = '<SANITIZED>'
+
+ if has_token:
+ msg_data['auth_token'] = '<SANITIZED>'
+
+ return log_func(msg, msg_data)
+
+
+def serialize_remote_exception(failure_info):
+ """Prepares exception data to be sent over rpc.
+
+ Failure_info should be a sys.exc_info() tuple.
+
+ """
+ tb = traceback.format_exception(*failure_info)
+ failure = failure_info[1]
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
+
+ kwargs = {}
+ if hasattr(failure, 'kwargs'):
+ kwargs = failure.kwargs
+
+ data = {
+ 'class': str(failure.__class__.__name__),
+ 'module': str(failure.__class__.__module__),
+ 'message': unicode(failure),
+ 'tb': tb,
+ 'args': failure.args,
+ 'kwargs': kwargs
+ }
+
+ json_data = jsonutils.dumps(data)
+
+ return json_data
+
+
+def deserialize_remote_exception(conf, data):
+ failure = jsonutils.loads(str(data))
+
+ trace = failure.get('tb', [])
+ message = failure.get('message', "") + "\n" + "\n".join(trace)
+ name = failure.get('class')
+ module = failure.get('module')
+
+ # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
+ # order to prevent arbitrary code execution.
+ if not module in conf.allowed_rpc_exception_modules:
+ return RemoteError(name, failure.get('message'), trace)
+
+ try:
+ mod = importutils.import_module(module)
+ klass = getattr(mod, name)
+ if not issubclass(klass, Exception):
+ raise TypeError("Can only deserialize Exceptions")
+
+ failure = klass(**failure.get('kwargs', {}))
+ except (AttributeError, TypeError, ImportError):
+ return RemoteError(name, failure.get('message'), trace)
+
+ ex_type = type(failure)
+ str_override = lambda self: message
+ new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+ {'__str__': str_override, '__unicode__': str_override})
+ try:
+ # NOTE(ameade): Dynamically create a new exception type and swap it in
+ # as the new type for the exception. This only works on user defined
+ # Exceptions and not core python exceptions. This is important because
+ # we cannot necessarily change an exception message so we must override
+ # the __str__ method.
+ failure.__class__ = new_ex_type
+ except TypeError as e:
+ # NOTE(ameade): If a core exception then just add the traceback to the
+ # first exception argument.
+ failure.args = (message,) + failure.args[1:]
+ return failure
+
+
+class CommonRpcContext(object):
+ def __init__(self, **kwargs):
+ self.values = kwargs
+
+ def __getattr__(self, key):
+ try:
+ return self.values[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def to_dict(self):
+ return copy.deepcopy(self.values)
+
+ @classmethod
+ def from_dict(cls, values):
+ return cls(**values)
+
+ def deepcopy(self):
+ return self.from_dict(self.to_dict())
+
+ def update_store(self):
+ local.store.context = self
+
+ def elevated(self, read_deleted=None, overwrite=False):
+ """Return a version of this context with admin flag set."""
+ # TODO(russellb) This method is a bit of a nova-ism. It makes
+ # some assumptions about the data in the request context sent
+ # across rpc, while the rest of this class does not. We could get
+ # rid of this if we changed the nova code that uses this to
+ # convert the RpcContext back to its native RequestContext doing
+ # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
+
+ context = self.deepcopy()
+ context.values['is_admin'] = True
+
+ context.values.setdefault('roles', [])
+
+ if 'admin' not in context.values['roles']:
+ context.values['roles'].append('admin')
+
+ if read_deleted is not None:
+ context.values['read_deleted'] = read_deleted
+
+ return context
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them. RPC API
+version numbers are in the form:
+
+ Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+ A = X
+
+ B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API. A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time. However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+"""
+
+from quantum.openstack.common.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+ """Dispatch rpc messages according to the requested API version.
+
+ This class can be used as the top level 'manager' for a service. It
+ contains a list of underlying managers that have an API_VERSION attribute.
+ """
+
+ def __init__(self, callbacks):
+ """Initialize the rpc dispatcher.
+
+ :param callbacks: List of proxy objects that are an instance
+ of a class with rpc methods exposed. Each proxy
+ object should have an RPC_API_VERSION attribute.
+ """
+ self.callbacks = callbacks
+ super(RpcDispatcher, self).__init__()
+
+ @staticmethod
+ def _is_compatible(mversion, version):
+ """Determine whether versions are compatible.
+
+ :param mversion: The API version implemented by a callback.
+ :param version: The API version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ mversion_parts = mversion.split('.')
+ if int(version_parts[0]) != int(mversion_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(mversion_parts[1]): # Minor
+ return False
+ return True
+
+ def dispatch(self, ctxt, version, method, **kwargs):
+ """Dispatch a message based on a requested version.
+
+ :param ctxt: The request context
+ :param version: The requested API version from the incoming message
+ :param method: The method requested to be called by the incoming
+ message.
+ :param kwargs: A dict of keyword arguments to be passed to the method.
+
+ :returns: Whatever is returned by the underlying method that gets
+ called.
+ """
+ if not version:
+ version = '1.0'
+
+ had_compatible = False
+ for proxyobj in self.callbacks:
+ if hasattr(proxyobj, 'RPC_API_VERSION'):
+ rpc_api_version = proxyobj.RPC_API_VERSION
+ else:
+ rpc_api_version = '1.0'
+ is_compatible = self._is_compatible(rpc_api_version, version)
+ had_compatible = had_compatible or is_compatible
+ if not hasattr(proxyobj, method):
+ continue
+ if is_compatible:
+ return getattr(proxyobj, method)(ctxt, **kwargs)
+
+ if had_compatible:
+ raise AttributeError("No such RPC function '%s'" % method)
+ else:
+ raise rpc_common.UnsupportedRpcVersion(version=version)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Fake RPC implementation which calls proxy methods directly with no
+queues. Casts will block, but this is very useful for tests.
+"""
+
+import inspect
+import time
+
+import eventlet
+
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common.rpc import common as rpc_common
+
+CONSUMERS = {}
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ def __init__(self, **kwargs):
+ super(RpcContext, self).__init__(**kwargs)
+ self._response = []
+ self._done = False
+
+ def deepcopy(self):
+ values = self.to_dict()
+ new_inst = self.__class__(**values)
+ new_inst._response = self._response
+ new_inst._done = self._done
+ return new_inst
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ self._done = True
+ if not self._done:
+ self._response.append((reply, failure))
+
+
+class Consumer(object):
+ def __init__(self, topic, proxy):
+ self.topic = topic
+ self.proxy = proxy
+
+ def call(self, context, version, method, args, timeout):
+ done = eventlet.event.Event()
+
+ def _inner():
+ ctxt = RpcContext.from_dict(context.to_dict())
+ try:
+ rval = self.proxy.dispatch(context, version, method, **args)
+ res = []
+ # Caller might have called ctxt.reply() manually
+ for (reply, failure) in ctxt._response:
+ if failure:
+ raise failure[0], failure[1], failure[2]
+ res.append(reply)
+ # if ending not 'sent'...we might have more data to
+ # return from the function itself
+ if not ctxt._done:
+ if inspect.isgenerator(rval):
+ for val in rval:
+ res.append(val)
+ else:
+ res.append(rval)
+ done.send(res)
+ except Exception as e:
+ done.send_exception(e)
+
+ thread = eventlet.greenthread.spawn(_inner)
+
+ if timeout:
+ start_time = time.time()
+ while not done.ready():
+ eventlet.greenthread.sleep(1)
+ cur_time = time.time()
+ if (cur_time - start_time) > timeout:
+ thread.kill()
+ raise rpc_common.Timeout()
+
+ return done.wait()
+
+
+class Connection(object):
+ """Connection object."""
+
+ def __init__(self):
+ self.consumers = []
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ consumer = Consumer(topic, proxy)
+ self.consumers.append(consumer)
+ if topic not in CONSUMERS:
+ CONSUMERS[topic] = []
+ CONSUMERS[topic].append(consumer)
+
+ def close(self):
+ for consumer in self.consumers:
+ CONSUMERS[consumer.topic].remove(consumer)
+ self.consumers = []
+
+ def consume_in_thread(self):
+ pass
+
+
+def create_connection(conf, new=True):
+ """Create a connection"""
+ return Connection()
+
+
+def check_serialize(msg):
+ """Make sure a message intended for rpc can be serialized."""
+ jsonutils.dumps(msg)
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+ """Make a call that returns multiple times."""
+
+ check_serialize(msg)
+
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+ version = msg.get('version', None)
+
+ try:
+ consumer = CONSUMERS[topic][0]
+ except (KeyError, IndexError):
+ return iter([None])
+ else:
+ return consumer.call(context, version, method, args, timeout)
+
+
+def call(conf, context, topic, msg, timeout=None):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(conf, context, topic, msg, timeout)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
+
+
+def cast(conf, context, topic, msg):
+ try:
+ call(conf, context, topic, msg)
+ except Exception:
+ pass
+
+
+def notify(conf, context, topic, msg):
+ check_serialize(msg)
+
+
+def cleanup():
+ pass
+
+
+def fanout_cast(conf, context, topic, msg):
+ """Cast to all consumers of a topic"""
+ check_serialize(msg)
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+ version = msg.get('version', None)
+
+ for consumer in CONSUMERS.get(topic, []):
+ try:
+ consumer.call(context, version, method, args, None)
+ except Exception:
+ pass
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import itertools
+import socket
+import ssl
+import sys
+import time
+import uuid
+
+import eventlet
+import greenlet
+import kombu
+import kombu.connection
+import kombu.entity
+import kombu.messaging
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common.rpc import amqp as rpc_amqp
+from quantum.openstack.common.rpc import common as rpc_common
+
+kombu_opts = [
+ cfg.StrOpt('kombu_ssl_version',
+ default='',
+ help='SSL version to use (valid only if SSL enabled)'),
+ cfg.StrOpt('kombu_ssl_keyfile',
+ default='',
+ help='SSL key file (valid only if SSL enabled)'),
+ cfg.StrOpt('kombu_ssl_certfile',
+ default='',
+ help='SSL cert file (valid only if SSL enabled)'),
+ cfg.StrOpt('kombu_ssl_ca_certs',
+ default='',
+ help=('SSL certification authority file '
+ '(valid only if SSL enabled)')),
+ cfg.StrOpt('rabbit_host',
+ default='localhost',
+ help='the RabbitMQ host'),
+ cfg.IntOpt('rabbit_port',
+ default=5672,
+ help='the RabbitMQ port'),
+ cfg.BoolOpt('rabbit_use_ssl',
+ default=False,
+ help='connect over SSL for RabbitMQ'),
+ cfg.StrOpt('rabbit_userid',
+ default='guest',
+ help='the RabbitMQ userid'),
+ cfg.StrOpt('rabbit_password',
+ default='guest',
+ help='the RabbitMQ password'),
+ cfg.StrOpt('rabbit_virtual_host',
+ default='/',
+ help='the RabbitMQ virtual host'),
+ cfg.IntOpt('rabbit_retry_interval',
+ default=1,
+ help='how frequently to retry connecting with RabbitMQ'),
+ cfg.IntOpt('rabbit_retry_backoff',
+ default=2,
+ help='how long to backoff for between retries when connecting '
+ 'to RabbitMQ'),
+ cfg.IntOpt('rabbit_max_retries',
+ default=0,
+ help='maximum retries with trying to connect to RabbitMQ '
+ '(the default of 0 implies an infinite retry count)'),
+ cfg.BoolOpt('rabbit_durable_queues',
+ default=False,
+ help='use durable queues in RabbitMQ'),
+
+]
+
+cfg.CONF.register_opts(kombu_opts)
+
+LOG = rpc_common.LOG
+
+
+class ConsumerBase(object):
+ """Consumer base class."""
+
+ def __init__(self, channel, callback, tag, **kwargs):
+ """Declare a queue on an amqp channel.
+
+ 'channel' is the amqp channel to use
+ 'callback' is the callback to call when messages are received
+ 'tag' is a unique ID for the consumer on the channel
+
+ queue name, exchange name, and other kombu options are
+ passed in here as a dictionary.
+ """
+ self.callback = callback
+ self.tag = str(tag)
+ self.kwargs = kwargs
+ self.queue = None
+ self.reconnect(channel)
+
+ def reconnect(self, channel):
+ """Re-declare the queue after a rabbit reconnect"""
+ self.channel = channel
+ self.kwargs['channel'] = channel
+ self.queue = kombu.entity.Queue(**self.kwargs)
+ self.queue.declare()
+
+ def consume(self, *args, **kwargs):
+ """Actually declare the consumer on the amqp channel. This will
+ start the flow of messages from the queue. Using the
+ Connection.iterconsume() iterator will process the messages,
+ calling the appropriate callback.
+
+ If a callback is specified in kwargs, use that. Otherwise,
+ use the callback passed during __init__()
+
+ If kwargs['nowait'] is True, then this call will block until
+ a message is read.
+
+ Messages will automatically be acked if the callback doesn't
+ raise an exception
+ """
+
+ options = {'consumer_tag': self.tag}
+ options['nowait'] = kwargs.get('nowait', False)
+ callback = kwargs.get('callback', self.callback)
+ if not callback:
+ raise ValueError("No callback defined")
+
+ def _callback(raw_message):
+ message = self.channel.message_to_python(raw_message)
+ try:
+ callback(message.payload)
+ message.ack()
+ except Exception:
+ LOG.exception(_("Failed to process message... skipping it."))
+
+ self.queue.consume(*args, callback=_callback, **options)
+
+ def cancel(self):
+ """Cancel the consuming from the queue, if it has started"""
+ try:
+ self.queue.cancel(self.tag)
+ except KeyError, e:
+ # NOTE(comstud): Kludge to get around a amqplib bug
+ if str(e) != "u'%s'" % self.tag:
+ raise
+ self.queue = None
+
+
+class DirectConsumer(ConsumerBase):
+ """Queue/consumer class for 'direct'"""
+
+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
+ """Init a 'direct' queue.
+
+ 'channel' is the amqp channel to use
+ 'msg_id' is the msg_id to listen on
+ 'callback' is the callback to call when messages are received
+ 'tag' is a unique ID for the consumer on the channel
+
+ Other kombu options may be passed
+ """
+ # Default options
+ options = {'durable': False,
+ 'auto_delete': True,
+ 'exclusive': True}
+ options.update(kwargs)
+ exchange = kombu.entity.Exchange(name=msg_id,
+ type='direct',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(DirectConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=msg_id,
+ exchange=exchange,
+ routing_key=msg_id,
+ **options)
+
+
+class TopicConsumer(ConsumerBase):
+ """Consumer class for 'topic'"""
+
+ def __init__(self, conf, channel, topic, callback, tag, name=None,
+ **kwargs):
+ """Init a 'topic' queue.
+
+ :param channel: the amqp channel to use
+ :param topic: the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param tag: a unique ID for the consumer on the channel
+ :param name: optional queue name, defaults to topic
+ :paramtype name: str
+
+ Other kombu options may be passed as keyword arguments
+ """
+ # Default options
+ options = {'durable': conf.rabbit_durable_queues,
+ 'auto_delete': False,
+ 'exclusive': False}
+ options.update(kwargs)
+ exchange = kombu.entity.Exchange(name=conf.control_exchange,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(TopicConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=name or topic,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
+
+
+class FanoutConsumer(ConsumerBase):
+ """Consumer class for 'fanout'"""
+
+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
+ """Init a 'fanout' queue.
+
+ 'channel' is the amqp channel to use
+ 'topic' is the topic to listen on
+ 'callback' is the callback to call when messages are received
+ 'tag' is a unique ID for the consumer on the channel
+
+ Other kombu options may be passed
+ """
+ unique = uuid.uuid4().hex
+ exchange_name = '%s_fanout' % topic
+ queue_name = '%s_fanout_%s' % (topic, unique)
+
+ # Default options
+ options = {'durable': False,
+ 'auto_delete': True,
+ 'exclusive': True}
+ options.update(kwargs)
+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(FanoutConsumer, self).__init__(channel, callback, tag,
+ name=queue_name,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
+
+
+class Publisher(object):
+ """Base Publisher class"""
+
+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
+ """Init the Publisher class with the exchange_name, routing_key,
+ and other options
+ """
+ self.exchange_name = exchange_name
+ self.routing_key = routing_key
+ self.kwargs = kwargs
+ self.reconnect(channel)
+
+ def reconnect(self, channel):
+ """Re-establish the Producer after a rabbit reconnection"""
+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
+ **self.kwargs)
+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
+ channel=channel,
+ routing_key=self.routing_key)
+
+ def send(self, msg):
+ """Send a message"""
+ self.producer.publish(msg)
+
+
+class DirectPublisher(Publisher):
+ """Publisher class for 'direct'"""
+ def __init__(self, conf, channel, msg_id, **kwargs):
+ """init a 'direct' publisher.
+
+ Kombu options may be passed as keyword args to override defaults
+ """
+
+ options = {'durable': False,
+ 'auto_delete': True,
+ 'exclusive': True}
+ options.update(kwargs)
+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+ type='direct', **options)
+
+
+class TopicPublisher(Publisher):
+ """Publisher class for 'topic'"""
+ def __init__(self, conf, channel, topic, **kwargs):
+ """init a 'topic' publisher.
+
+ Kombu options may be passed as keyword args to override defaults
+ """
+ options = {'durable': conf.rabbit_durable_queues,
+ 'auto_delete': False,
+ 'exclusive': False}
+ options.update(kwargs)
+ super(TopicPublisher, self).__init__(channel, conf.control_exchange,
+ topic, type='topic', **options)
+
+
+class FanoutPublisher(Publisher):
+ """Publisher class for 'fanout'"""
+ def __init__(self, conf, channel, topic, **kwargs):
+ """init a 'fanout' publisher.
+
+ Kombu options may be passed as keyword args to override defaults
+ """
+ options = {'durable': False,
+ 'auto_delete': True,
+ 'exclusive': True}
+ options.update(kwargs)
+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+ None, type='fanout', **options)
+
+
+class NotifyPublisher(TopicPublisher):
+ """Publisher class for 'notify'"""
+
+ def __init__(self, conf, channel, topic, **kwargs):
+ self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
+
+ def reconnect(self, channel):
+ super(NotifyPublisher, self).reconnect(channel)
+
+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
+ # we do this to ensure that messages don't get dropped if the
+ # consumer is started after we do
+ queue = kombu.entity.Queue(channel=channel,
+ exchange=self.exchange,
+ durable=self.durable,
+ name=self.routing_key,
+ routing_key=self.routing_key)
+ queue.declare()
+
+
+class Connection(object):
+ """Connection object."""
+
+ pool = None
+
+ def __init__(self, conf, server_params=None):
+ self.consumers = []
+ self.consumer_thread = None
+ self.conf = conf
+ self.max_retries = self.conf.rabbit_max_retries
+ # Try forever?
+ if self.max_retries <= 0:
+ self.max_retries = None
+ self.interval_start = self.conf.rabbit_retry_interval
+ self.interval_stepping = self.conf.rabbit_retry_backoff
+ # max retry-interval = 30 seconds
+ self.interval_max = 30
+ self.memory_transport = False
+
+ if server_params is None:
+ server_params = {}
+
+ # Keys to translate from server_params to kombu params
+ server_params_to_kombu_params = {'username': 'userid'}
+
+ params = {}
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
+
+ params.setdefault('hostname', self.conf.rabbit_host)
+ params.setdefault('port', self.conf.rabbit_port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+ self.params = params
+
+ if self.conf.fake_rabbit:
+ self.params['transport'] = 'memory'
+ self.memory_transport = True
+ else:
+ self.memory_transport = False
+
+ if self.conf.rabbit_use_ssl:
+ self.params['ssl'] = self._fetch_ssl_params()
+
+ self.connection = None
+ self.reconnect()
+
+ def _fetch_ssl_params(self):
+ """Handles fetching what ssl params
+ should be used for the connection (if any)"""
+ ssl_params = dict()
+
+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
+ if self.conf.kombu_ssl_version:
+ ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+ if self.conf.kombu_ssl_keyfile:
+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
+ if self.conf.kombu_ssl_certfile:
+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
+ if self.conf.kombu_ssl_ca_certs:
+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
+ # We might want to allow variations in the
+ # future with this?
+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
+
+ if not ssl_params:
+ # Just have the default behavior
+ return True
+ else:
+ # Return the extended behavior
+ return ssl_params
+
+ def _connect(self):
+ """Connect to rabbit. Re-establish any queues that may have
+ been declared before if we are reconnecting. Exceptions should
+ be handled by the caller.
+ """
+ if self.connection:
+ LOG.info(_("Reconnecting to AMQP server on "
+ "%(hostname)s:%(port)d") % self.params)
+ try:
+ self.connection.close()
+ except self.connection_errors:
+ pass
+ # Setting this in case the next statement fails, though
+ # it shouldn't be doing any network operations, yet.
+ self.connection = None
+ self.connection = kombu.connection.BrokerConnection(**self.params)
+ self.connection_errors = self.connection.connection_errors
+ if self.memory_transport:
+ # Kludge to speed up tests.
+ self.connection.transport.polling_interval = 0.0
+ self.consumer_num = itertools.count(1)
+ self.connection.connect()
+ self.channel = self.connection.channel()
+ # work around 'memory' transport bug in 1.1.3
+ if self.memory_transport:
+ self.channel._new_queue('ae.undeliver')
+ for consumer in self.consumers:
+ consumer.reconnect(self.channel)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
+ self.params)
+
+ def reconnect(self):
+ """Handles reconnecting and re-establishing queues.
+ Will retry up to self.max_retries number of times.
+ self.max_retries = 0 means to retry forever.
+ Sleep between tries, starting at self.interval_start
+ seconds, backing off self.interval_stepping number of seconds
+ each attempt.
+ """
+
+ attempt = 0
+ while True:
+ attempt += 1
+ try:
+ self._connect()
+ return
+ except (self.connection_errors, IOError), e:
+ pass
+ except Exception, e:
+ # NOTE(comstud): Unfortunately it's possible for amqplib
+ # to return an error not covered by its transport
+ # connection_errors in the case of a timeout waiting for
+ # a protocol response. (See paste link in LP888621)
+ # So, we check all exceptions for 'timeout' in them
+ # and try to reconnect in this case.
+ if 'timeout' not in str(e):
+ raise
+
+ log_info = {}
+ log_info['err_str'] = str(e)
+ log_info['max_retries'] = self.max_retries
+ log_info.update(self.params)
+
+ if self.max_retries and attempt == self.max_retries:
+ LOG.exception(_('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
+ # NOTE(comstud): Copied from original code. There's
+ # really no better recourse because if this was a queue we
+ # need to consume on, we have no way to consume anymore.
+ sys.exit(1)
+
+ if attempt == 1:
+ sleep_time = self.interval_start or 1
+ elif attempt > 1:
+ sleep_time += self.interval_stepping
+ if self.interval_max:
+ sleep_time = min(sleep_time, self.interval_max)
+
+ log_info['sleep_time'] = sleep_time
+ LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
+ ' unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
+ time.sleep(sleep_time)
+
+ def ensure(self, error_callback, method, *args, **kwargs):
+ while True:
+ try:
+ return method(*args, **kwargs)
+ except (self.connection_errors, socket.timeout, IOError), e:
+ pass
+ except Exception, e:
+ # NOTE(comstud): Unfortunately it's possible for amqplib
+ # to return an error not covered by its transport
+ # connection_errors in the case of a timeout waiting for
+ # a protocol response. (See paste link in LP888621)
+ # So, we check all exceptions for 'timeout' in them
+ # and try to reconnect in this case.
+ if 'timeout' not in str(e):
+ raise
+ if error_callback:
+ error_callback(e)
+ self.reconnect()
+
+ def get_channel(self):
+ """Convenience call for bin/clear_rabbit_queues"""
+ return self.channel
+
+ def close(self):
+ """Close/release this connection"""
+ self.cancel_consumer_thread()
+ self.connection.release()
+ self.connection = None
+
+ def reset(self):
+ """Reset a connection so it can be used again"""
+ self.cancel_consumer_thread()
+ self.channel.close()
+ self.channel = self.connection.channel()
+ # work around 'memory' transport bug in 1.1.3
+ if self.memory_transport:
+ self.channel._new_queue('ae.undeliver')
+ self.consumers = []
+
+ def declare_consumer(self, consumer_cls, topic, callback):
+ """Create a Consumer using the class that was passed in and
+ add it to our list of consumers
+ """
+
+ def _connect_error(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+ "%(err_str)s") % log_info)
+
+ def _declare_consumer():
+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
+ self.consumer_num.next())
+ self.consumers.append(consumer)
+ return consumer
+
+ return self.ensure(_connect_error, _declare_consumer)
+
+ def iterconsume(self, limit=None, timeout=None):
+ """Return an iterator that will consume from all queues/consumers"""
+
+ info = {'do_consume': True}
+
+ def _error_callback(exc):
+ if isinstance(exc, socket.timeout):
+ LOG.exception(_('Timed out waiting for RPC response: %s') %
+ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+ str(exc))
+ info['do_consume'] = True
+
+ def _consume():
+ if info['do_consume']:
+ queues_head = self.consumers[:-1]
+ queues_tail = self.consumers[-1]
+ for queue in queues_head:
+ queue.consume(nowait=True)
+ queues_tail.consume(nowait=False)
+ info['do_consume'] = False
+ return self.connection.drain_events(timeout=timeout)
+
+ for iteration in itertools.count(0):
+ if limit and iteration >= limit:
+ raise StopIteration
+ yield self.ensure(_error_callback, _consume)
+
+ def cancel_consumer_thread(self):
+ """Cancel a consumer thread"""
+ if self.consumer_thread is not None:
+ self.consumer_thread.kill()
+ try:
+ self.consumer_thread.wait()
+ except greenlet.GreenletExit:
+ pass
+ self.consumer_thread = None
+
+ def publisher_send(self, cls, topic, msg, **kwargs):
+ """Send to a publisher based on the publisher class"""
+
+ def _error_callback(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.exception(_("Failed to publish message to topic "
+ "'%(topic)s': %(err_str)s") % log_info)
+
+ def _publish():
+ publisher = cls(self.conf, self.channel, topic, **kwargs)
+ publisher.send(msg)
+
+ self.ensure(_error_callback, _publish)
+
+ def declare_direct_consumer(self, topic, callback):
+ """Create a 'direct' queue.
+ In nova's use, this is generally a msg_id queue used for
+ responses for call/multicall
+ """
+ self.declare_consumer(DirectConsumer, topic, callback)
+
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ """Create a 'topic' consumer."""
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
+
+ def declare_fanout_consumer(self, topic, callback):
+ """Create a 'fanout' consumer"""
+ self.declare_consumer(FanoutConsumer, topic, callback)
+
+ def direct_send(self, msg_id, msg):
+ """Send a 'direct' message"""
+ self.publisher_send(DirectPublisher, msg_id, msg)
+
+ def topic_send(self, topic, msg):
+ """Send a 'topic' message"""
+ self.publisher_send(TopicPublisher, topic, msg)
+
+ def fanout_send(self, topic, msg):
+ """Send a 'fanout' message"""
+ self.publisher_send(FanoutPublisher, topic, msg)
+
+ def notify_send(self, topic, msg, **kwargs):
+ """Send a notify message on a topic"""
+ self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+
+ def consume(self, limit=None):
+ """Consume from all queues/consumers"""
+ it = self.iterconsume(limit=limit)
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+
+ def consume_in_thread(self):
+ """Consumer from all queues/consumers in a greenthread"""
+ def _consumer_thread():
+ try:
+ self.consume()
+ except greenlet.GreenletExit:
+ return
+ if self.consumer_thread is None:
+ self.consumer_thread = eventlet.spawn(_consumer_thread)
+ return self.consumer_thread
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ """Create a consumer that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
+ if fanout:
+ self.declare_fanout_consumer(topic, proxy_cb)
+ else:
+ self.declare_topic_consumer(topic, proxy_cb)
+
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
+
+
+def create_connection(conf, new=True):
+ """Create a connection"""
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+ """Make a call that returns multiple times."""
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def call(conf, context, topic, msg, timeout=None):
+ """Sends a message on a topic and wait for a response."""
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast(conf, context, topic, msg):
+ """Sends a message on a topic without waiting for a response."""
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast(conf, context, topic, msg):
+ """Sends a message on a fanout exchange without waiting for a response."""
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast_to_server(conf, context, server_params, topic, msg):
+ """Sends a message on a topic to a specific server."""
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
+ """Sends a message on a fanout exchange to a specific server."""
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def notify(conf, context, topic, msg):
+ """Sends a notification event on a topic."""
+ return rpc_amqp.notify(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cleanup():
+ return rpc_amqp.cleanup(Connection.pool)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+# Copyright 2011 - 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import itertools
+import logging
+import time
+import uuid
+
+import eventlet
+import greenlet
+import qpid.messaging
+import qpid.messaging.exceptions
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common.rpc import amqp as rpc_amqp
+from quantum.openstack.common.rpc import common as rpc_common
+
+LOG = logging.getLogger(__name__)
+
+qpid_opts = [
+ cfg.StrOpt('qpid_hostname',
+ default='localhost',
+ help='Qpid broker hostname'),
+ cfg.StrOpt('qpid_port',
+ default='5672',
+ help='Qpid broker port'),
+ cfg.StrOpt('qpid_username',
+ default='',
+ help='Username for qpid connection'),
+ cfg.StrOpt('qpid_password',
+ default='',
+ help='Password for qpid connection'),
+ cfg.StrOpt('qpid_sasl_mechanisms',
+ default='',
+ help='Space separated list of SASL mechanisms to use for auth'),
+ cfg.BoolOpt('qpid_reconnect',
+ default=True,
+ help='Automatically reconnect'),
+ cfg.IntOpt('qpid_reconnect_timeout',
+ default=0,
+ help='Reconnection timeout in seconds'),
+ cfg.IntOpt('qpid_reconnect_limit',
+ default=0,
+ help='Max reconnections before giving up'),
+ cfg.IntOpt('qpid_reconnect_interval_min',
+ default=0,
+ help='Minimum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval_max',
+ default=0,
+ help='Maximum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval',
+ default=0,
+ help='Equivalent to setting max and min to the same value'),
+ cfg.IntOpt('qpid_heartbeat',
+ default=5,
+ help='Seconds between connection keepalive heartbeats'),
+ cfg.StrOpt('qpid_protocol',
+ default='tcp',
+ help="Transport to use, either 'tcp' or 'ssl'"),
+ cfg.BoolOpt('qpid_tcp_nodelay',
+ default=True,
+ help='Disable Nagle algorithm'),
+]
+
+cfg.CONF.register_opts(qpid_opts)
+
+
+class ConsumerBase(object):
+ """Consumer base class."""
+
+ def __init__(self, session, callback, node_name, node_opts,
+ link_name, link_opts):
+ """Declare a queue on an amqp session.
+
+ 'session' is the amqp session to use
+ 'callback' is the callback to call when messages are received
+ 'node_name' is the first part of the Qpid address string, before ';'
+ 'node_opts' will be applied to the "x-declare" section of "node"
+ in the address string.
+ 'link_name' goes into the "name" field of the "link" in the address
+ string
+ 'link_opts' will be applied to the "x-declare" section of "link"
+ in the address string.
+ """
+ self.callback = callback
+ self.receiver = None
+ self.session = None
+
+ addr_opts = {
+ "create": "always",
+ "node": {
+ "type": "topic",
+ "x-declare": {
+ "durable": True,
+ "auto-delete": True,
+ },
+ },
+ "link": {
+ "name": link_name,
+ "durable": True,
+ "x-declare": {
+ "durable": False,
+ "auto-delete": True,
+ "exclusive": False,
+ },
+ },
+ }
+ addr_opts["node"]["x-declare"].update(node_opts)
+ addr_opts["link"]["x-declare"].update(link_opts)
+
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+
+ self.reconnect(session)
+
+ def reconnect(self, session):
+ """Re-declare the receiver after a qpid reconnect"""
+ self.session = session
+ self.receiver = session.receiver(self.address)
+ self.receiver.capacity = 1
+
+ def consume(self):
+ """Fetch the message and pass it to the callback object"""
+ message = self.receiver.fetch()
+ try:
+ self.callback(message.content)
+ except Exception:
+ LOG.exception(_("Failed to process message... skipping it."))
+ finally:
+ self.session.acknowledge(message)
+
+ def get_receiver(self):
+ return self.receiver
+
+
+class DirectConsumer(ConsumerBase):
+ """Queue/consumer class for 'direct'"""
+
+ def __init__(self, conf, session, msg_id, callback):
+ """Init a 'direct' queue.
+
+ 'session' is the amqp session to use
+ 'msg_id' is the msg_id to listen on
+ 'callback' is the callback to call when messages are received
+ """
+
+ super(DirectConsumer, self).__init__(session, callback,
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
+
+
+class TopicConsumer(ConsumerBase):
+ """Consumer class for 'topic'"""
+
+ def __init__(self, conf, session, topic, callback, name=None):
+ """Init a 'topic' queue.
+
+ :param session: the amqp session to use
+ :param topic: is the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param name: optional queue name, defaults to topic
+ """
+
+ super(TopicConsumer, self).__init__(session, callback,
+ "%s/%s" % (conf.control_exchange,
+ topic),
+ {}, name or topic, {})
+
+
+class FanoutConsumer(ConsumerBase):
+ """Consumer class for 'fanout'"""
+
+ def __init__(self, conf, session, topic, callback):
+ """Init a 'fanout' queue.
+
+ 'session' is the amqp session to use
+ 'topic' is the topic to listen on
+ 'callback' is the callback to call when messages are received
+ """
+
+ super(FanoutConsumer, self).__init__(
+ session, callback,
+ "%s_fanout" % topic,
+ {"durable": False, "type": "fanout"},
+ "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+ {"exclusive": True})
+
+
+class Publisher(object):
+ """Base Publisher class"""
+
+ def __init__(self, session, node_name, node_opts=None):
+ """Init the Publisher class with the exchange_name, routing_key,
+ and other options
+ """
+ self.sender = None
+ self.session = session
+
+ addr_opts = {
+ "create": "always",
+ "node": {
+ "type": "topic",
+ "x-declare": {
+ "durable": False,
+ # auto-delete isn't implemented for exchanges in qpid,
+ # but put in here anyway
+ "auto-delete": True,
+ },
+ },
+ }
+ if node_opts:
+ addr_opts["node"]["x-declare"].update(node_opts)
+
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+
+ self.reconnect(session)
+
+ def reconnect(self, session):
+ """Re-establish the Sender after a reconnection"""
+ self.sender = session.sender(self.address)
+
+ def send(self, msg):
+ """Send a message"""
+ self.sender.send(msg)
+
+
+class DirectPublisher(Publisher):
+ """Publisher class for 'direct'"""
+ def __init__(self, conf, session, msg_id):
+ """Init a 'direct' publisher."""
+ super(DirectPublisher, self).__init__(session, msg_id,
+ {"type": "Direct"})
+
+
+class TopicPublisher(Publisher):
+ """Publisher class for 'topic'"""
+ def __init__(self, conf, session, topic):
+ """init a 'topic' publisher.
+ """
+ super(TopicPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic))
+
+
+class FanoutPublisher(Publisher):
+ """Publisher class for 'fanout'"""
+ def __init__(self, conf, session, topic):
+ """init a 'fanout' publisher.
+ """
+ super(FanoutPublisher, self).__init__(
+ session,
+ "%s_fanout" % topic, {"type": "fanout"})
+
+
+class NotifyPublisher(Publisher):
+ """Publisher class for notifications"""
+ def __init__(self, conf, session, topic):
+ """init a 'topic' publisher.
+ """
+ super(NotifyPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic),
+ {"durable": True})
+
+
+class Connection(object):
+ """Connection object."""
+
+ pool = None
+
+ def __init__(self, conf, server_params=None):
+ self.session = None
+ self.consumers = {}
+ self.consumer_thread = None
+ self.conf = conf
+
+ if server_params is None:
+ server_params = {}
+
+ default_params = dict(hostname=self.conf.qpid_hostname,
+ port=self.conf.qpid_port,
+ username=self.conf.qpid_username,
+ password=self.conf.qpid_password)
+
+ params = server_params
+ for key in default_params.keys():
+ params.setdefault(key, default_params[key])
+
+ self.broker = params['hostname'] + ":" + str(params['port'])
+ # Create the connection - this does not open the connection
+ self.connection = qpid.messaging.Connection(self.broker)
+
+ # Check if flags are set and if so set them for the connection
+ # before we call open
+ self.connection.username = params['username']
+ self.connection.password = params['password']
+ self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
+ self.connection.reconnect = self.conf.qpid_reconnect
+ if self.conf.qpid_reconnect_timeout:
+ self.connection.reconnect_timeout = (
+ self.conf.qpid_reconnect_timeout)
+ if self.conf.qpid_reconnect_limit:
+ self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
+ if self.conf.qpid_reconnect_interval_max:
+ self.connection.reconnect_interval_max = (
+ self.conf.qpid_reconnect_interval_max)
+ if self.conf.qpid_reconnect_interval_min:
+ self.connection.reconnect_interval_min = (
+ self.conf.qpid_reconnect_interval_min)
+ if self.conf.qpid_reconnect_interval:
+ self.connection.reconnect_interval = (
+ self.conf.qpid_reconnect_interval)
+ self.connection.hearbeat = self.conf.qpid_heartbeat
+ self.connection.protocol = self.conf.qpid_protocol
+ self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
+
+ # Open is part of reconnect -
+ # NOTE(WGH) not sure we need this with the reconnect flags
+ self.reconnect()
+
+ def _register_consumer(self, consumer):
+ self.consumers[str(consumer.get_receiver())] = consumer
+
+ def _lookup_consumer(self, receiver):
+ return self.consumers[str(receiver)]
+
+ def reconnect(self):
+ """Handles reconnecting and re-establishing sessions and queues"""
+ if self.connection.opened():
+ try:
+ self.connection.close()
+ except qpid.messaging.exceptions.ConnectionError:
+ pass
+
+ while True:
+ try:
+ self.connection.open()
+ except qpid.messaging.exceptions.ConnectionError, e:
+ LOG.error(_('Unable to connect to AMQP server: %s'), e)
+ time.sleep(self.conf.qpid_reconnect_interval or 1)
+ else:
+ break
+
+ LOG.info(_('Connected to AMQP server on %s'), self.broker)
+
+ self.session = self.connection.session()
+
+ for consumer in self.consumers.itervalues():
+ consumer.reconnect(self.session)
+
+ if self.consumers:
+ LOG.debug(_("Re-established AMQP queues"))
+
+ def ensure(self, error_callback, method, *args, **kwargs):
+ while True:
+ try:
+ return method(*args, **kwargs)
+ except (qpid.messaging.exceptions.Empty,
+ qpid.messaging.exceptions.ConnectionError), e:
+ if error_callback:
+ error_callback(e)
+ self.reconnect()
+
+ def close(self):
+ """Close/release this connection"""
+ self.cancel_consumer_thread()
+ self.connection.close()
+ self.connection = None
+
+ def reset(self):
+ """Reset a connection so it can be used again"""
+ self.cancel_consumer_thread()
+ self.session.close()
+ self.session = self.connection.session()
+ self.consumers = {}
+
+ def declare_consumer(self, consumer_cls, topic, callback):
+ """Create a Consumer using the class that was passed in and
+ add it to our list of consumers
+ """
+ def _connect_error(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+ "%(err_str)s") % log_info)
+
+ def _declare_consumer():
+ consumer = consumer_cls(self.conf, self.session, topic, callback)
+ self._register_consumer(consumer)
+ return consumer
+
+ return self.ensure(_connect_error, _declare_consumer)
+
+ def iterconsume(self, limit=None, timeout=None):
+ """Return an iterator that will consume from all queues/consumers"""
+
+ def _error_callback(exc):
+ if isinstance(exc, qpid.messaging.exceptions.Empty):
+ LOG.exception(_('Timed out waiting for RPC response: %s') %
+ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+ str(exc))
+
+ def _consume():
+ nxt_receiver = self.session.next_receiver(timeout=timeout)
+ try:
+ self._lookup_consumer(nxt_receiver).consume()
+ except Exception:
+ LOG.exception(_("Error processing message. Skipping it."))
+
+ for iteration in itertools.count(0):
+ if limit and iteration >= limit:
+ raise StopIteration
+ yield self.ensure(_error_callback, _consume)
+
+ def cancel_consumer_thread(self):
+ """Cancel a consumer thread"""
+ if self.consumer_thread is not None:
+ self.consumer_thread.kill()
+ try:
+ self.consumer_thread.wait()
+ except greenlet.GreenletExit:
+ pass
+ self.consumer_thread = None
+
+ def publisher_send(self, cls, topic, msg):
+ """Send to a publisher based on the publisher class"""
+
+ def _connect_error(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.exception(_("Failed to publish message to topic "
+ "'%(topic)s': %(err_str)s") % log_info)
+
+ def _publisher_send():
+ publisher = cls(self.conf, self.session, topic)
+ publisher.send(msg)
+
+ return self.ensure(_connect_error, _publisher_send)
+
+ def declare_direct_consumer(self, topic, callback):
+ """Create a 'direct' queue.
+ In nova's use, this is generally a msg_id queue used for
+ responses for call/multicall
+ """
+ self.declare_consumer(DirectConsumer, topic, callback)
+
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ """Create a 'topic' consumer."""
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
+
+ def declare_fanout_consumer(self, topic, callback):
+ """Create a 'fanout' consumer"""
+ self.declare_consumer(FanoutConsumer, topic, callback)
+
+ def direct_send(self, msg_id, msg):
+ """Send a 'direct' message"""
+ self.publisher_send(DirectPublisher, msg_id, msg)
+
+ def topic_send(self, topic, msg):
+ """Send a 'topic' message"""
+ self.publisher_send(TopicPublisher, topic, msg)
+
+ def fanout_send(self, topic, msg):
+ """Send a 'fanout' message"""
+ self.publisher_send(FanoutPublisher, topic, msg)
+
+ def notify_send(self, topic, msg, **kwargs):
+ """Send a notify message on a topic"""
+ self.publisher_send(NotifyPublisher, topic, msg)
+
+ def consume(self, limit=None):
+ """Consume from all queues/consumers"""
+ it = self.iterconsume(limit=limit)
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+
+ def consume_in_thread(self):
+ """Consumer from all queues/consumers in a greenthread"""
+ def _consumer_thread():
+ try:
+ self.consume()
+ except greenlet.GreenletExit:
+ return
+ if self.consumer_thread is None:
+ self.consumer_thread = eventlet.spawn(_consumer_thread)
+ return self.consumer_thread
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ """Create a consumer that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
+ if fanout:
+ consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
+ else:
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
+
+ self._register_consumer(consumer)
+
+ return consumer
+
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+ name=pool_name)
+
+ self._register_consumer(consumer)
+
+ return consumer
+
+
+def create_connection(conf, new=True):
+ """Create a connection"""
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+ """Make a call that returns multiple times."""
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def call(conf, context, topic, msg, timeout=None):
+ """Sends a message on a topic and wait for a response."""
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast(conf, context, topic, msg):
+ """Sends a message on a topic without waiting for a response."""
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast(conf, context, topic, msg):
+ """Sends a message on a fanout exchange without waiting for a response."""
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast_to_server(conf, context, server_params, topic, msg):
+ """Sends a message on a topic to a specific server."""
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
+ """Sends a message on a fanout exchange to a specific server."""
+ return rpc_amqp.fanout_cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def notify(conf, context, topic, msg):
+ """Sends a notification event on a topic."""
+ return rpc_amqp.notify(conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cleanup():
+ return rpc_amqp.cleanup(Connection.pool)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import pprint
+import string
+import sys
+import types
+import uuid
+
+import eventlet
+from eventlet.green import zmq
+import greenlet
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common.rpc import common as rpc_common
+
+
+# for convenience, are not modified.
+pformat = pprint.pformat
+Timeout = eventlet.timeout.Timeout
+LOG = rpc_common.LOG
+RemoteError = rpc_common.RemoteError
+RPCException = rpc_common.RPCException
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ # The module.Class to use for matchmaking.
+ cfg.StrOpt('rpc_zmq_matchmaker',
+ default='quantum.openstack.common.rpc.matchmaker.'
+ 'MatchMakerLocalhost', help='MatchMaker driver'),
+
+ # The following port is unassigned by IANA as of 2012-05-21
+ cfg.IntOpt('rpc_zmq_port', default=9501,
+ help='ZeroMQ receiver listening port'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ help='Number of ZeroMQ contexts, defaults to 1'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ help='Directory for holding IPC sockets'),
+]
+
+
+# These globals are defined in register_opts(conf),
+# a mandatory initialization call
+FLAGS = None
+ZMQ_CTX = None # ZeroMQ Context, must be global.
+matchmaker = None # memoized matchmaker object
+
+
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
+class ZmqSocket(object):
+ """
+ A tiny wrapper around ZeroMQ to simplify the send/recv protocol
+ and connection management.
+
+ Can be used as a Context (supports the 'with' statement).
+ """
+
+ def __init__(self, addr, zmq_type, bind=True, subscribe=None):
+ self.sock = ZMQ_CTX.socket(zmq_type)
+ self.addr = addr
+ self.type = zmq_type
+ self.subscriptions = []
+
+ # Support failures on sending/receiving on wrong socket type.
+ self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
+ self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
+ self.can_sub = zmq_type in (zmq.SUB, )
+
+ # Support list, str, & None for subscribe arg (cast to list)
+ do_sub = {
+ list: subscribe,
+ str: [subscribe],
+ type(None): []
+ }[type(subscribe)]
+
+ for f in do_sub:
+ self.subscribe(f)
+
+ LOG.debug(_("Connecting to %{addr}s with %{type}s"
+ "\n-> Subscribed to %{subscribe}s"
+ "\n-> bind: %{bind}s"),
+ {'addr': addr, 'type': self.socket_s(),
+ 'subscribe': subscribe, 'bind': bind})
+
+ try:
+ if bind:
+ self.sock.bind(addr)
+ else:
+ self.sock.connect(addr)
+ except Exception:
+ raise RPCException(_("Could not open socket."))
+
+ def socket_s(self):
+ """Get socket type as string."""
+ t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
+ 'DEALER')
+ return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
+
+ def subscribe(self, msg_filter):
+ """Subscribe."""
+ if not self.can_sub:
+ raise RPCException("Cannot subscribe on this socket.")
+ LOG.debug(_("Subscribing to %s"), msg_filter)
+
+ try:
+ self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
+ except Exception:
+ return
+
+ self.subscriptions.append(msg_filter)
+
+ def unsubscribe(self, msg_filter):
+ """Unsubscribe."""
+ if msg_filter not in self.subscriptions:
+ return
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
+ self.subscriptions.remove(msg_filter)
+
+ def close(self):
+ if self.sock is None or self.sock.closed:
+ return
+
+ # We must unsubscribe, or we'll leak descriptors.
+ if len(self.subscriptions) > 0:
+ for f in self.subscriptions:
+ try:
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
+ except Exception:
+ pass
+ self.subscriptions = []
+
+ # Linger -1 prevents lost/dropped messages
+ try:
+ self.sock.close(linger=-1)
+ except Exception:
+ pass
+ self.sock = None
+
+ def recv(self):
+ if not self.can_recv:
+ raise RPCException(_("You cannot recv on this socket."))
+ return self.sock.recv_multipart()
+
+ def send(self, data):
+ if not self.can_send:
+ raise RPCException(_("You cannot send on this socket."))
+ self.sock.send_multipart(data)
+
+
+class ZmqClient(object):
+ """Client for ZMQ sockets."""
+
+ def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+ def cast(self, msg_id, topic, data):
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
+
+ def close(self):
+ self.outq.close()
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call."""
+ def __init__(self, **kwargs):
+ self.replies = []
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['replies'] = self.replies
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ return
+ self.replies.append(reply)
+
+ @classmethod
+ def marshal(self, ctx):
+ ctx_data = ctx.to_dict()
+ return _serialize(ctx_data)
+
+ @classmethod
+ def unmarshal(self, data):
+ return RpcContext.from_dict(_deserialize(data))
+
+
+class InternalContext(object):
+ """Used by ConsumerBase as a private context for - methods."""
+
+ def __init__(self, proxy):
+ self.proxy = proxy
+ self.msg_waiter = None
+
+ def _get_response(self, ctx, proxy, topic, data):
+ """Process a curried message and cast the result to topic."""
+ LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+
+ try:
+ result = proxy.dispatch(
+ ctx, data['version'], data['method'], **data['args'])
+ return ConsumerBase.normalize_reply(result, ctx.replies)
+ except greenlet.GreenletExit:
+ # ignore these since they are just from shutdowns
+ pass
+ except Exception:
+ return {'exc':
+ rpc_common.serialize_remote_exception(sys.exc_info())}
+
+ def reply(self, ctx, proxy,
+ msg_id=None, context=None, topic=None, msg=None):
+ """Reply to a casted call."""
+ # Our real method is curried into msg['args']
+
+ child_ctx = RpcContext.unmarshal(msg[0])
+ response = ConsumerBase.normalize_reply(
+ self._get_response(child_ctx, proxy, topic, msg[1]),
+ ctx.replies)
+
+ LOG.debug(_("Sending reply"))
+ cast(FLAGS, ctx, topic, {
+ 'method': '-process_reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'response': response
+ }
+ })
+
+
+class ConsumerBase(object):
+ """Base Consumer."""
+
+ def __init__(self):
+ self.private_ctx = InternalContext(None)
+
+ @classmethod
+ def normalize_reply(self, result, replies):
+ #TODO(ewindisch): re-evaluate and document this method.
+ if isinstance(result, types.GeneratorType):
+ return list(result)
+ elif replies:
+ return replies
+ else:
+ return [result]
+
+ def process(self, style, target, proxy, ctx, data):
+ # Method starting with - are
+ # processed internally. (non-valid method name)
+ method = data['method']
+
+ # Internal method
+ # uses internal context for safety.
+ if data['method'][0] == '-':
+ # For reply / process_reply
+ method = method[1:]
+ if method == 'reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
+ return
+
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+ proxy.dispatch(ctx, data['version'],
+ data['method'], **data['args'])
+
+
+class ZmqBaseReactor(ConsumerBase):
+ """
+ A consumer class implementing a
+ centralized casting broker (PULL-PUSH)
+ for RoundRobin requests.
+ """
+
+ def __init__(self, conf):
+ super(ZmqBaseReactor, self).__init__()
+
+ self.conf = conf
+ self.mapping = {}
+ self.proxies = {}
+ self.threads = []
+ self.sockets = []
+ self.subscribe = {}
+
+ self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
+
+ def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
+ zmq_type_out=None, in_bind=True, out_bind=True,
+ subscribe=None):
+
+ LOG.info(_("Registering reactor"))
+
+ if zmq_type_in not in (zmq.PULL, zmq.SUB):
+ raise RPCException("Bad input socktype")
+
+ # Items push in.
+ inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
+ subscribe=subscribe)
+
+ self.proxies[inq] = proxy
+ self.sockets.append(inq)
+
+ LOG.info(_("In reactor registered"))
+
+ if not out_addr:
+ return
+
+ if zmq_type_out not in (zmq.PUSH, zmq.PUB):
+ raise RPCException("Bad output socktype")
+
+ # Items push out.
+ outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
+
+ self.mapping[inq] = outq
+ self.mapping[outq] = inq
+ self.sockets.append(outq)
+
+ LOG.info(_("Out reactor registered"))
+
+ def consume_in_thread(self):
+ def _consume(sock):
+ LOG.info(_("Consuming socket"))
+ while True:
+ self.consume(sock)
+
+ for k in self.proxies.keys():
+ self.threads.append(
+ self.pool.spawn(_consume, k)
+ )
+
+ def wait(self):
+ for t in self.threads:
+ t.wait()
+
+ def close(self):
+ for s in self.sockets:
+ s.close()
+
+ for t in self.threads:
+ t.kill()
+
+
+class ZmqProxy(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ topic-based proxy, forwarding to
+ IPC sockets.
+ """
+
+ def __init__(self, conf):
+ super(ZmqProxy, self).__init__(conf)
+
+ self.topic_proxy = {}
+ ipc_dir = conf.rpc_zmq_ipc_dir
+
+ self.topic_proxy['zmq_replies'] = \
+ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
+ zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['zmq_replies'])
+
+ def consume(self, sock):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ msg_id, topic, style, in_msg = data
+ topic = topic.split('.', 1)[0]
+
+ LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+ # Handle zmq_replies magic
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ elif topic.startswith('zmq_replies'):
+ sock_type = zmq.PUB
+ inside = _deserialize(in_msg)
+ msg_id = inside[-1]['args']['msg_id']
+ response = inside[-1]['args']['response']
+ LOG.debug(_("->response->%s"), response)
+ data = [str(msg_id), _serialize(response)]
+ else:
+ sock_type = zmq.PUSH
+
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
+ self.topic_proxy[topic].send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+
+
+class ZmqReactor(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ consumer for messages. Can also be
+ used as a 1:1 proxy
+ """
+
+ def __init__(self, conf):
+ super(ZmqReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+ if sock in self.mapping:
+ LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
+ 'data': data})
+ self.mapping[sock].send(data)
+ return
+
+ msg_id, topic, style, in_msg = data
+
+ ctx, request = _deserialize(in_msg)
+ ctx = RpcContext.unmarshal(ctx)
+
+ proxy = self.proxies[sock]
+
+ self.pool.spawn_n(self.process, style, topic,
+ proxy, ctx, request)
+
+
+class Connection(rpc_common.Connection):
+ """Manages connections and threads."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.reactor = ZmqReactor(conf)
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ LOG.info(_("Create Consumer for topic (%(topic)s)") %
+ {'topic': topic})
+
+ # Subscription scenarios
+ if fanout:
+ subscribe = ('', fanout)[type(fanout) == str]
+ sock_type = zmq.SUB
+ topic = 'fanout~' + topic
+ else:
+ sock_type = zmq.PULL
+ subscribe = None
+
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (self.conf.rpc_zmq_ipc_dir, topic)
+
+ LOG.debug(_("Consumer is a zmq.%s"),
+ ['PULL', 'SUB'][sock_type == zmq.SUB])
+
+ self.reactor.register(proxy, inaddr, sock_type,
+ subscribe=subscribe, in_bind=False)
+
+ def close(self):
+ self.reactor.close()
+
+ def wait(self):
+ self.reactor.wait()
+
+ def consume_in_thread(self):
+ self.reactor.consume_in_thread()
+
+
+def _cast(addr, context, msg_id, topic, msg, timeout=None):
+ timeout_cast = timeout or FLAGS.rpc_cast_timeout
+ payload = [RpcContext.marshal(context), msg]
+
+ with Timeout(timeout_cast, exception=rpc_common.Timeout):
+ try:
+ conn = ZmqClient(addr)
+
+ # assumes cast can't return an exception
+ conn.cast(msg_id, topic, payload)
+ except zmq.ZMQError:
+ raise RPCException("Cast failed. ZMQ Socket Exception")
+ finally:
+ if 'conn' in vars():
+ conn.close()
+
+
+def _call(addr, context, msg_id, topic, msg, timeout=None):
+ # timeout_response is how long we wait for a response
+ timeout = timeout or FLAGS.rpc_response_timeout
+
+ # The msg_id is used to track replies.
+ msg_id = str(uuid.uuid4().hex)
+
+ # Replies always come into the reply service.
+ # We require that FLAGS.host is a FQDN, IP, or resolvable hostname.
+ reply_topic = "zmq_replies.%s" % FLAGS.host
+
+ LOG.debug(_("Creating payload"))
+ # Curry the original request into a reply method.
+ mcontext = RpcContext.marshal(context)
+ payload = {
+ 'method': '-reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'context': mcontext,
+ 'topic': reply_topic,
+ 'msg': [mcontext, msg]
+ }
+ }
+
+ LOG.debug(_("Creating queue socket for reply waiter"))
+
+ # Messages arriving async.
+ # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
+ with Timeout(timeout, exception=rpc_common.Timeout):
+ try:
+ msg_waiter = ZmqSocket(
+ "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+ zmq.SUB, subscribe=msg_id, bind=False
+ )
+
+ LOG.debug(_("Sending cast"))
+ _cast(addr, context, msg_id, topic, payload)
+
+ LOG.debug(_("Cast sent; Waiting reply"))
+ # Blocks until receives reply
+ msg = msg_waiter.recv()
+ LOG.debug(_("Received message: %s"), msg)
+ LOG.debug(_("Unpacking response"))
+ responses = _deserialize(msg[-1])
+ # ZMQError trumps the Timeout error.
+ except zmq.ZMQError:
+ raise RPCException("ZMQ Socket Error")
+ finally:
+ if 'msg_waiter' in vars():
+ msg_waiter.close()
+
+ # It seems we don't need to do all of the following,
+ # but perhaps it would be useful for multicall?
+ # One effect of this is that we're checking all
+ # responses for Exceptions.
+ for resp in responses:
+ if isinstance(resp, types.DictType) and 'exc' in resp:
+ raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+
+ return responses[-1]
+
+
+def _multi_send(method, context, topic, msg, timeout=None):
+ """
+ Wraps the sending of messages,
+ dispatches to the matchmaker and sends
+ message to all relevant hosts.
+ """
+ conf = FLAGS
+ LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+
+ queues = matchmaker.queues(topic)
+ LOG.debug(_("Sending message(s) to: %s"), queues)
+
+ # Don't stack if we have no matchmaker results
+ if len(queues) == 0:
+ LOG.warn(_("No matchmaker results. Not casting."))
+ # While not strictly a timeout, callers know how to handle
+ # this exception and a timeout isn't too big a lie.
+ raise rpc_common.Timeout, "No match from matchmaker."
+
+ # This supports brokerless fanout (addresses > 1)
+ for queue in queues:
+ (_topic, ip_addr) = queue
+ _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
+
+ if method.__name__ == '_cast':
+ eventlet.spawn_n(method, _addr, context,
+ _topic, _topic, msg, timeout)
+ return
+ return method(_addr, context, _topic, _topic, msg, timeout)
+
+
+def create_connection(conf, new=True):
+ return Connection(conf)
+
+
+def multicall(conf, *args, **kwargs):
+ """Multiple calls."""
+ register_opts(conf)
+ return _multi_send(_call, *args, **kwargs)
+
+
+def call(conf, *args, **kwargs):
+ """Send a message, expect a response."""
+ register_opts(conf)
+ data = _multi_send(_call, *args, **kwargs)
+ return data[-1]
+
+
+def cast(conf, *args, **kwargs):
+ """Send a message expecting no reply."""
+ register_opts(conf)
+ _multi_send(_cast, *args, **kwargs)
+
+
+def fanout_cast(conf, context, topic, msg, **kwargs):
+ """Send a message to all listening and expect no reply."""
+ register_opts(conf)
+ # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
+ # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+def notify(conf, context, topic, msg, **kwargs):
+ """
+ Send notification event.
+ Notifications are sent to topic-priority.
+ This differs from the AMQP drivers which send to topic.priority.
+ """
+ register_opts(conf)
+ # NOTE(ewindisch): dot-priority in rpc notifier does not
+ # work with our assumptions.
+ topic.replace('.', '-')
+ cast(conf, context, topic, msg, **kwargs)
+
+
+def cleanup():
+ """Clean up resources in use by implementation."""
+ global ZMQ_CTX
+ global matchmaker
+ matchmaker = None
+ ZMQ_CTX.destroy()
+ ZMQ_CTX = None
+
+
+def register_opts(conf):
+ """Registration of options for this driver."""
+ #NOTE(ewindisch): ZMQ_CTX and matchmaker
+ # are initialized here as this is as good
+ # an initialization method as any.
+
+ # We memoize through these globals
+ global ZMQ_CTX
+ global matchmaker
+ global FLAGS
+
+ if not FLAGS:
+ conf.register_opts(zmq_opts)
+ FLAGS = conf
+ # Don't re-set, if this method is called twice.
+ if not ZMQ_CTX:
+ ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ if not matchmaker:
+ # rpc_zmq_matchmaker should be set to a 'module.Class'
+ mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_module = '.'.join(mm_path[:-1])
+ mm_class = mm_path[-1]
+
+ # Only initialize a class.
+ if mm_path[-1][0] not in string.ascii_uppercase:
+ LOG.error(_("Matchmaker could not be loaded.\n"
+ "rpc_zmq_matchmaker is not a class."))
+ raise RPCException(_("Error loading Matchmaker."))
+
+ mm_impl = importutils.import_module(mm_module)
+ mm_constructor = getattr(mm_impl, mm_class)
+ matchmaker = mm_constructor()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import contextlib
+import itertools
+import json
+import logging
+
+from quantum.openstack.common import cfg
+
+
+matchmaker_opts = [
+ # Matchmaker ring file
+ cfg.StrOpt('matchmaker_ringfile',
+ default='/etc/nova/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts)
+LOG = logging.getLogger(__name__)
+contextmanager = contextlib.contextmanager
+
+
+class MatchMakerException(Exception):
+ """Signified a match could not be found."""
+ message = _("Match not found by MatchMaker.")
+
+
+class Exchange(object):
+ """
+ Implements lookups.
+ Subclass this to support hashtables, dns, etc.
+ """
+ def __init__(self):
+ pass
+
+ def run(self, key):
+ raise NotImplementedError()
+
+
+class Binding(object):
+ """
+ A binding on which to perform a lookup.
+ """
+ def __init__(self):
+ pass
+
+ def test(self, key):
+ raise NotImplementedError()
+
+
+class MatchMakerBase(object):
+ """Match Maker Base Class."""
+
+ def __init__(self):
+ # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+ self.bindings = []
+
+ def add_binding(self, binding, rule, last=True):
+ self.bindings.append((binding, rule, False, last))
+
+ #NOTE(ewindisch): kept the following method in case we implement the
+ # underlying support.
+ #def add_negate_binding(self, binding, rule, last=True):
+ # self.bindings.append((binding, rule, True, last))
+
+ def queues(self, key):
+ workers = []
+
+ # bit is for negate bindings - if we choose to implement it.
+ # last stops processing rules if this matches.
+ for (binding, exchange, bit, last) in self.bindings:
+ if binding.test(key):
+ workers.extend(exchange.run(key))
+
+ # Support last.
+ if last:
+ return workers
+ return workers
+
+
+class DirectBinding(Binding):
+ """
+ Specifies a host in the key via a '.' character
+ Although dots are used in the key, the behavior here is
+ that it maps directly to a host, thus direct.
+ """
+ def test(self, key):
+ if '.' in key:
+ return True
+ return False
+
+
+class TopicBinding(Binding):
+ """
+ Where a 'bare' key without dots.
+ AMQP generally considers topic exchanges to be those *with* dots,
+ but we deviate here in terminology as the behavior here matches
+ that of a topic exchange (whereas where there are dots, behavior
+ matches that of a direct exchange.
+ """
+ def test(self, key):
+ if '.' not in key:
+ return True
+ return False
+
+
+class FanoutBinding(Binding):
+ """Match on fanout keys, where key starts with 'fanout.' string."""
+ def test(self, key):
+ if key.startswith('fanout~'):
+ return True
+ return False
+
+
+class StubExchange(Exchange):
+ """Exchange that does nothing."""
+ def run(self, key):
+ return [(key, None)]
+
+
+class RingExchange(Exchange):
+ """
+ Match Maker where hosts are loaded from a static file containing
+ a hashmap (JSON formatted).
+
+ __init__ takes optional ring dictionary argument, otherwise
+ loads the ringfile from CONF.mathcmaker_ringfile.
+ """
+ def __init__(self, ring=None):
+ super(RingExchange, self).__init__()
+
+ if ring:
+ self.ring = ring
+ else:
+ fh = open(CONF.matchmaker_ringfile, 'r')
+ self.ring = json.load(fh)
+ fh.close()
+
+ self.ring0 = {}
+ for k in self.ring.keys():
+ self.ring0[k] = itertools.cycle(self.ring[k])
+
+ def _ring_has(self, key):
+ if key in self.ring0:
+ return True
+ return False
+
+
+class RoundRobinRingExchange(RingExchange):
+ """A Topic Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(RoundRobinRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ if not self._ring_has(key):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
+ )
+ return []
+ host = next(self.ring0[key])
+ return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(FanoutRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "fanout~", strip it for lookup.
+ nkey = key.split('fanout~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class LocalhostExchange(Exchange):
+ """Exchange where all direct topics are local."""
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ return [(key.split('.')[0] + '.localhost', 'localhost')]
+
+
+class DirectExchange(Exchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ b, e = key.split('.', 1)
+ return [(b, e)]
+
+
+class MatchMakerRing(MatchMakerBase):
+ """
+ Match Maker where hosts are loaded from a static hashmap.
+ """
+ def __init__(self, ring=None):
+ super(MatchMakerRing, self).__init__()
+ self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
+
+
+class MatchMakerLocalhost(MatchMakerBase):
+ """
+ Match Maker where all bare topics resolve to localhost.
+ Useful for testing.
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange())
+
+
+class MatchMakerStub(MatchMakerBase):
+ """
+ Match Maker where topics are untouched.
+ Useful for testing, or for AMQP/brokered queues.
+ Will not work where knowledge of hosts is known (i.e. zeromq)
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+
+ self.add_binding(FanoutBinding(), StubExchange())
+ self.add_binding(DirectBinding(), StubExchange())
+ self.add_binding(TopicBinding(), StubExchange())
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+ rpc/dispatcher.py
+"""
+
+
+from quantum.openstack.common import rpc
+
+
+class RpcProxy(object):
+ """A helper class for rpc clients.
+
+ This class is a wrapper around the RPC client API. It allows you to
+ specify the topic and API version in a single place. This is intended to
+ be used as a base class for a class that implements the client side of an
+ rpc API.
+ """
+
+ def __init__(self, topic, default_version):
+ """Initialize an RpcProxy.
+
+ :param topic: The topic to use for all messages.
+ :param default_version: The default API version to request in all
+ outgoing messages. This can be overridden on a per-message
+ basis.
+ """
+ self.topic = topic
+ self.default_version = default_version
+ super(RpcProxy, self).__init__()
+
+ def _set_version(self, msg, vers):
+ """Helper method to set the version in a message.
+
+ :param msg: The message having a version added to it.
+ :param vers: The version number to add to the message.
+ """
+ msg['version'] = vers if vers else self.default_version
+
+ def _get_topic(self, topic):
+ """Return the topic to use for a message."""
+ return topic if topic else self.topic
+
+ @staticmethod
+ def make_msg(method, **kwargs):
+ return {'method': method, 'args': kwargs}
+
+ def call(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.call() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: The return value from the remote method.
+ """
+ self._set_version(msg, version)
+ return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+ def multicall(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.multicall() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: An iterator that lets you process each of the returned values
+ from the remote method as they arrive.
+ """
+ self._set_version(msg, version)
+ return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+ def cast(self, context, msg, topic=None, version=None):
+ """rpc.cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast() does not wait on any return value from the
+ remote method.
+ """
+ self._set_version(msg, version)
+ rpc.cast(context, self._get_topic(topic), msg)
+
+ def fanout_cast(self, context, msg, version=None):
+ """rpc.fanout_cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast() does not wait on any return value
+ from the remote method.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast(context, self.topic, msg)
+
+ def cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+ def fanout_cast_to_server(self, context, server_params, msg, version=None):
+ """rpc.fanout_cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast_to_server(context, server_params, self.topic, msg)