from heat.openstack.common import cfg
from heat.openstack.common import log as logging
-from heat import rpc
-
LOG = logging.getLogger('heat.api')
if __name__ == '__main__':
cfg.CONF(project='heat', prog='heat-api')
config.setup_logging()
config.register_api_opts()
- rpc.configure()
app = config.load_paste_app()
from heat.common import utils
from heat.db import api as db_api
-from heat import rpc
-
logger = logging.getLogger('heat.engine')
if __name__ == '__main__':
config.setup_logging()
config.register_engine_opts()
db_api.configure()
- rpc.configure()
#utils.monkey_patch()
server = service.Service.create(binary='heat-engine',
gettext.install('heat', unicode=1)
-from heat import rpc
+from heat.openstack.common import rpc
from heat.common import config
from heat.common import wsgi
from heat.common import context
config.setup_logging()
config.register_metadata_opts()
- rpc.configure()
app = config.load_paste_app()
from heat.common import config
from heat.common import context
from heat import utils
-from heat import rpc
-import heat.rpc.common as rpc_common
import heat.engine.api as engine_api
+from heat.openstack.common import rpc
+import heat.openstack.common.rpc.common as rpc_common
from heat.openstack.common import log as logging
logger = logging.getLogger('heat.api.v1.stacks')
from heat import version
from heat.common import wsgi
from heat.openstack.common import cfg
+from heat.openstack.common import rpc
DEFAULT_PORT = 8000
bind_opts = [cfg.IntOpt('bind_port', default=8000),
cfg.StrOpt('bind_host', default='127.0.0.1')]
-rpc_opts = [
- cfg.StrOpt('rpc_backend',
- default='heat.rpc.impl_qpid',
- help="The messaging module to use, defaults to kombu."),
- cfg.IntOpt('rpc_thread_pool_size',
- default=1024,
- help='Size of RPC thread pool'),
- cfg.IntOpt('rpc_conn_pool_size',
- default=30,
- help='Size of RPC connection pool'),
- cfg.IntOpt('rpc_response_timeout',
- default=60,
- help='Seconds to wait for a response from call or multicall'),
- cfg.StrOpt('qpid_hostname',
- default='localhost',
- help='Qpid broker hostname'),
- cfg.StrOpt('qpid_port',
- default='5672',
- help='Qpid broker port'),
- cfg.StrOpt('qpid_username',
- default='',
- help='Username for qpid connection'),
- cfg.StrOpt('qpid_password',
- default='',
- help='Password for qpid connection'),
- cfg.StrOpt('qpid_sasl_mechanisms',
- default='',
- help='Space separated list of SASL mechanisms to use for auth'),
- cfg.BoolOpt('qpid_reconnect',
- default=True,
- help='Automatically reconnect'),
- cfg.IntOpt('qpid_reconnect_timeout',
- default=0,
- help='Reconnection timeout in seconds'),
- cfg.IntOpt('qpid_reconnect_limit',
- default=0,
- help='Max reconnections before giving up'),
- cfg.IntOpt('qpid_reconnect_interval_min',
- default=0,
- help='Minimum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval_max',
- default=0,
- help='Maximum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval',
- default=0,
- help='Equivalent to setting max and min to the same value'),
- cfg.IntOpt('qpid_heartbeat',
- default=5,
- help='Seconds between connection keepalive heartbeats'),
- cfg.StrOpt('qpid_protocol',
- default='tcp',
- help="Transport to use, either 'tcp' or 'ssl'"),
- cfg.BoolOpt('qpid_tcp_nodelay',
- default=True,
- help='Disable Nagle algorithm'),
- cfg.StrOpt('rabbit_host',
- default='localhost',
- help='the RabbitMQ host'),
- cfg.IntOpt('rabbit_port',
- default=5672,
- help='the RabbitMQ port'),
- cfg.BoolOpt('rabbit_use_ssl',
- default=False,
- help='connect over SSL for RabbitMQ'),
- cfg.StrOpt('rabbit_userid',
- default='guest',
- help='the RabbitMQ userid'),
- cfg.StrOpt('rabbit_password',
- default='guest',
- help='the RabbitMQ password'),
- cfg.StrOpt('rabbit_virtual_host',
- default='/',
- help='the RabbitMQ virtual host'),
- cfg.IntOpt('rabbit_retry_interval',
- default=1,
- help='how frequently to retry connecting with RabbitMQ'),
- cfg.IntOpt('rabbit_retry_backoff',
- default=2,
- help='how long to backoff for between retries when connecting '
- 'to RabbitMQ'),
- cfg.IntOpt('rabbit_max_retries',
- default=0,
- help='maximum retries with trying to connect to RabbitMQ '
- '(the default of 0 implies an infinite retry count)'),
- cfg.StrOpt('control_exchange',
- default='heat-engine',
- help='the main RabbitMQ exchange to connect to'),
- cfg.BoolOpt('rabbit_durable_queues',
- default=False,
- help='use durable queues in RabbitMQ'),
- cfg.BoolOpt('fake_rabbit',
- default=False,
- help='If passed, use a fake RabbitMQ provider'),
-]
-
service_opts = [
cfg.IntOpt('report_interval',
default=10,
def register_metadata_opts():
cfg.CONF.register_opts(service_opts)
cfg.CONF.register_opts(bind_opts)
- cfg.CONF.register_opts(rpc_opts)
def register_api_opts():
cfg.CONF.register_opts(bind_opts)
- cfg.CONF.register_opts(rpc_opts)
def register_engine_opts():
cfg.CONF.register_opts(engine_opts)
cfg.CONF.register_opts(db_opts)
cfg.CONF.register_opts(service_opts)
- cfg.CONF.register_opts(rpc_opts)
def setup_logging():
from heat import version
+from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
from heat.openstack.common import log as logging
from heat.openstack.common import cfg
self.host = host
super(Manager, self).__init__(db_driver)
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return rpc_dispatcher.RpcDispatcher([self])
+
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
from heat.common import wsgi
from heat.common import context
-from heat import rpc
+from heat.openstack.common import rpc
def json_response(http_status, data):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Exception related utilities.
+"""
+
+import contextlib
+import logging
+import sys
+import traceback
+
+
+@contextlib.contextmanager
+def save_and_reraise_exception():
+ """Save current exception, run some code and then re-raise.
+
+ In some cases the exception context can be cleared, resulting in None
+ being attempted to be reraised after an exception handler is run. This
+ can happen when eventlet switches greenthreads or when running an
+ exception handler, code raises and catches an exception. In both
+ cases the exception context will be cleared.
+
+ To work around this, we save the exception state, run handler code, and
+ then re-raise the original exception. If another exception occurs, the
+ saved exception is logged and the new exception is reraised.
+ """
+ type_, value, tb = sys.exc_info()
+ try:
+ yield
+ except Exception:
+ logging.error('Original exception being dropped: %s' %
+ (traceback.format_exception(type_, value, tb)))
+ raise
+ raise type_, value, tb
# License for the specific language governing permissions and limitations
# under the License.
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+ rpc.dispatcher
+ rpc.proxy
+"""
+
from heat.openstack.common import cfg
from heat.openstack.common import importutils
-from heat.openstack.common import log as logging
-LOG = logging.getLogger(__name__)
+rpc_opts = [
+ cfg.StrOpt('rpc_backend',
+ default='%s.impl_kombu' % __package__,
+ help="The messaging module to use, defaults to kombu."),
+ cfg.IntOpt('rpc_thread_pool_size',
+ default=64,
+ help='Size of RPC thread pool'),
+ cfg.IntOpt('rpc_conn_pool_size',
+ default=30,
+ help='Size of RPC connection pool'),
+ cfg.IntOpt('rpc_response_timeout',
+ default=60,
+ help='Seconds to wait for a response from call or multicall'),
+ cfg.IntOpt('rpc_cast_timeout',
+ default=30,
+ help='Seconds to wait before a cast expires (TTL). '
+ 'Only supported by impl_zmq.'),
+ cfg.ListOpt('allowed_rpc_exception_modules',
+ default=['heat.openstack.common.exception',
+ 'nova.exception',
+ ],
+ help='Modules of exceptions that are permitted to be recreated'
+ 'upon receiving exception data from an rpc call.'),
+ cfg.StrOpt('control_exchange',
+ default='nova',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.BoolOpt('fake_rabbit',
+ default=False,
+ help='If passed, use a fake RabbitMQ provider'),
+]
+
+cfg.CONF.register_opts(rpc_opts)
def create_connection(new=True):
implementation is free to return an existing connection from a
pool.
- :returns: An instance of nova.rpc.common.Connection
+ :returns: An instance of openstack.common.rpc.common.Connection
"""
- return _get_impl().create_connection(new=new)
+ return _get_impl().create_connection(cfg.CONF, new=new)
def call(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
:returns: A dict from the remote method.
- :raises: nova.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
- return _get_impl().call(context, topic, msg, timeout)
+ return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
- return _get_impl().cast(context, topic, msg)
+ return _get_impl().cast(cfg.CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=True.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
- return _get_impl().fanout_cast(context, topic, msg)
+ return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
returned and X is the Nth value that was returned by the remote
method.
- :raises: nova.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
- return _get_impl().multicall(context, topic, msg, timeout)
+ return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg):
:returns: None
"""
- return _get_impl().notify(context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg)
def cleanup():
:returns: None
"""
- return _get_impl().cast_to_server(context, server_params, topic, msg)
+ return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ msg)
def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast_to_server(context, server_params, topic,
- msg)
+ return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ topic, msg)
-_RPCIMPL = None
+def queue_get_for(context, topic, host):
+ """Get a queue name for a given topic + host.
+ This function only works if this naming convention is followed on the
+ consumer side, as well. For example, in nova, every instance of the
+ nova-foo service calls create_consumer() for two topics:
-def configure():
- """Delay import of rpc_backend until FLAGS are loaded."""
- LOG.debug(_("Configuring RPC %s") % cfg.CONF.rpc_backend)
+ foo
+ foo.<host>
- global _RPCIMPL
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ Messages sent to the 'foo' topic are distributed to exactly one instance of
+ the nova-foo service. The services are chosen in a round-robin fashion.
+ Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
+ <host>.
+ """
+ return '%s.%s' % (topic, host)
+
+
+_RPCIMPL = None
def _get_impl():
- """Delay import of rpc_backend until FLAGS are loaded."""
+ """Delay import of rpc_backend until configuration is loaded."""
global _RPCIMPL
if _RPCIMPL is None:
- LOG.error(_("RPC not configured."))
-
+ try:
+ _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ except ImportError:
+ # For backwards compatibility with older nova config.
+ impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
+ _RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
# under the License.
"""
-Shared code between AMQP based nova.rpc implementations.
+Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
"""
import inspect
+import logging
import sys
-import traceback
import uuid
-from heat.openstack.common import log as logging
-
from eventlet import greenpool
from eventlet import pools
+from eventlet import semaphore
-from heat.common import context
-from heat.common import exception
-from heat.common import config
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _
from heat.openstack.common import local
-from heat.openstack.common import cfg
+from heat.openstack.common.rpc import common as rpc_common
-import heat.rpc.common as rpc_common
LOG = logging.getLogger(__name__)
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
- def __init__(self, *args, **kwargs):
- self.connection_cls = kwargs.pop("connection_cls", None)
- kwargs.setdefault("max_size", cfg.CONF.rpc_conn_pool_size)
+ def __init__(self, conf, connection_cls, *args, **kwargs):
+ self.connection_cls = connection_cls
+ self.conf = conf
+ kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
- return self.connection_cls()
+ return self.connection_cls(self.conf)
def empty(self):
while self.free_items:
- item = self.get()
- try:
- item.close()
- except Exception:
- pass
+ self.get().close()
+
+
+_pool_create_sem = semaphore.Semaphore()
+
+
+def get_connection_pool(conf, connection_cls):
+ with _pool_create_sem:
+ # Make sure only one thread tries to create the connection pool.
+ if not connection_cls.pool:
+ connection_cls.pool = Pool(conf, connection_cls)
+ return connection_cls.pool
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the caller of
- create_connection(). This is a essentially a wrapper around
- Connection that supports 'with' and can return a new Connection or
- one from a pool. It will also catch when an instance of this class
- is to be deleted so that we can return Connections to the pool on
- exceptions and so forth without making the caller be responsible for
- catching all exceptions and making sure to return a connection to
- the pool.
+ create_connection(). This is essentially a wrapper around
+ Connection that supports 'with'. It can also return a new
+ Connection, or one from a pool. The function will also catch
+ when an instance of this class is to be deleted. With that
+ we can return Connections to the pool on exceptions and so
+ forth without making the caller be responsible for catching
+ them. If possible the function makes sure to return a
+ connection to the pool.
"""
- def __init__(self, connection_pool, pooled=True, server_params=None):
+ def __init__(self, conf, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool"""
self.connection = None
+ self.conf = conf
self.connection_pool = connection_pool
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls(
- server_params=server_params)
+ conf,
+ server_params=server_params)
self.pooled = pooled
def __enter__(self):
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
+ def create_worker(self, topic, proxy, pool_name):
+ self.connection.create_worker(topic, proxy, pool_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
if self.connection:
return getattr(self.connection, key)
else:
- raise exception.InvalidRPCConnectionReuse()
+ raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
+def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+ ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
- with ConnectionContext(connection_pool) as conn:
+ with ConnectionContext(conf, connection_pool) as conn:
if failure:
- message = str(failure[1])
- tb = ''.join(traceback.format_exception(*failure))
- LOG.error(_("Returning exception %s to caller"), message)
- LOG.error(tb)
- failure = (failure[0].__name__, str(failure[1]), tb)
+ failure = rpc_common.serialize_remote_exception(failure)
try:
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
-class RpcContext(context.RequestContext):
+class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
- super(RpcContext, self).__init__(*args, **kwargs)
+ self.conf = kwargs.pop('conf')
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['conf'] = self.conf
+ values['msg_id'] = self.msg_id
+ return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
if self.msg_id:
- msg_reply(self.msg_id, connection_pool, reply, failure,
+ msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending)
if ending:
self.msg_id = None
-def unpack_context(msg):
+def unpack_context(conf, msg):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
- LOG.debug(_('unpacked context: %s'), ctx.to_dict())
+ rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
return ctx
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
- def __init__(self, proxy, connection_pool):
+ def __init__(self, conf, proxy, connection_pool):
self.proxy = proxy
- self.pool = greenpool.GreenPool(cfg.CONF.rpc_thread_pool_size)
+ self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
+ self.conf = conf
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
- ctxt = unpack_context(message_data)
+ ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
+ version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method, args)
- @exception.wrap_exception()
- def _process_data(self, ctxt, method, args):
- """Thread that magically looks for a method on the proxy
- object and calls it.
+ def _process_data(self, ctxt, version, method, args):
+ """Process a message in a new thread.
+
+ If the proxy object we have has a dispatch method
+ (see rpc.dispatcher.RpcDispatcher), pass it the version,
+ method, and args and let it dispatch as appropriate. If not, use
+ the old behavior of magically calling the specified method on the
+ proxy we have here.
"""
ctxt.update_store()
try:
- node_func = getattr(self.proxy, str(method))
- node_args = dict((str(k), v) for k, v in args.iteritems())
- # NOTE(vish): magic is fun!
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(ctxt, version, method, **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
- return
class MulticallWaiter(object):
- def __init__(self, connection, timeout):
+ def __init__(self, conf, connection, timeout):
self._connection = connection
- self._iterator = connection.iterconsume(
- timeout=timeout or
- cfg.CONF.rpc_response_timeout)
+ self._iterator = connection.iterconsume(timeout=timeout or
+ conf.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
+ self._conf = conf
def done(self):
if self._done:
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
- self._result = rpc_common.RemoteError(*data['failure'])
+ failure = data['failure']
+ self._result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+
elif data.get('ending', False):
self._got_ending = True
else:
if self._done:
raise StopIteration
while True:
- self._iterator.next()
+ try:
+ self._iterator.next()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
if self._got_ending:
self.done()
raise StopIteration
yield result
-def create_connection(new, connection_pool):
+def create_connection(conf, new, connection_pool):
"""Create a connection"""
- return ConnectionContext(connection_pool, pooled=not new)
+ return ConnectionContext(conf, connection_pool, pooled=not new)
-def multicall(context, topic, msg, timeout, connection_pool):
+def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext(connection_pool)
- wait_msg = MulticallWaiter(conn, timeout)
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
-def call(context, topic, msg, timeout, connection_pool):
+def call(conf, context, topic, msg, timeout, connection_pool):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg, timeout, connection_pool)
+ rv = multicall(conf, context, topic, msg, timeout, connection_pool)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return rv[-1]
-def cast(context, topic, msg, connection_pool):
+def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
- with ConnectionContext(connection_pool) as conn:
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg)
-def fanout_cast(context, topic, msg, connection_pool):
+def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
- with ConnectionContext(connection_pool) as conn:
+ with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg)
-def cast_to_server(context, server_params, topic, msg, connection_pool):
+def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
- with ConnectionContext(connection_pool, pooled=False,
- server_params=server_params) as conn:
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
conn.topic_send(topic, msg)
-def fanout_cast_to_server(context, server_params, topic, msg,
- connection_pool):
+def fanout_cast_to_server(conf, context, server_params, topic, msg,
+ connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
- with ConnectionContext(connection_pool, pooled=False,
- server_params=server_params) as conn:
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
conn.fanout_send(topic, msg)
-def notify(context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
- LOG.debug(_('Sending notification on %s...'), topic)
+ event_type = msg.get('event_type')
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
pack_context(msg, context)
- with ConnectionContext(connection_pool) as conn:
+ with ConnectionContext(conf, connection_pool) as conn:
conn.notify_send(topic, msg)
def cleanup(connection_pool):
- connection_pool.empty()
+ if connection_pool:
+ connection_pool.empty()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import logging
+import sys
+import traceback
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
+from heat.openstack.common import jsonutils
+from heat.openstack.common import local
+
+
+LOG = logging.getLogger(__name__)
+
+
+class RPCException(Exception):
+ message = _("An unknown RPC related exception occurred.")
+
+ def __init__(self, message=None, **kwargs):
+ self.kwargs = kwargs
+
+ if not message:
+ try:
+ message = self.message % kwargs
+
+ except Exception as e:
+ # kwargs doesn't match a variable in the message
+ # log the issue and the kwargs
+ LOG.exception(_('Exception in string format operation'))
+ for name, value in kwargs.iteritems():
+ LOG.error("%s: %s" % (name, value))
+ # at least get the core message out if something happened
+ message = self.message
+
+ super(RPCException, self).__init__(message)
+
+
+class RemoteError(RPCException):
+ """Signifies that a remote class has raised an exception.
+
+ Contains a string representation of the type of the original exception,
+ the value of the original exception, and the traceback. These are
+ sent to the parent as a joined string so printing the exception
+ contains all of the relevant info.
+
+ """
+ message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+
+ def __init__(self, exc_type=None, value=None, traceback=None):
+ self.exc_type = exc_type
+ self.value = value
+ self.traceback = traceback
+ super(RemoteError, self).__init__(exc_type=exc_type,
+ value=value,
+ traceback=traceback)
+
+
+class Timeout(RPCException):
+ """Signifies that a timeout has occurred.
+
+ This exception is raised if the rpc_response_timeout is reached while
+ waiting for a response from the remote side.
+ """
+ message = _("Timeout while waiting on RPC response.")
+
+
+class InvalidRPCConnectionReuse(RPCException):
+ message = _("Invalid reuse of an RPC connection.")
+
+
+class UnsupportedRpcVersion(RPCException):
+ message = _("Specified RPC version, %(version)s, not supported by "
+ "this endpoint.")
+
+
+class Connection(object):
+ """A connection, returned by rpc.create_connection().
+
+ This class represents a connection to the message bus used for rpc.
+ An instance of this class should never be created by users of the rpc API.
+ Use rpc.create_connection() instead.
+ """
+ def close(self):
+ """Close the connection.
+
+ This method must be called when the connection will no longer be used.
+ It will ensure that any resources associated with the connection, such
+ as a network connection, and cleaned up.
+ """
+ raise NotImplementedError()
+
+ def create_consumer(self, conf, topic, proxy, fanout=False):
+ """Create a consumer on this connection.
+
+ A consumer is associated with a message queue on the backend message
+ bus. The consumer will read messages from the queue, unpack them, and
+ dispatch them to the proxy object. The contents of the message pulled
+ off of the queue will determine which method gets called on the proxy
+ object.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic. For example, all instances of nova-compute consume
+ from a queue called "compute". In that case, the
+ messages will get distributed amongst the consumers in a
+ round-robin fashion if fanout=False. If fanout=True,
+ every consumer associated with this topic will get a
+ copy of every message.
+ :param proxy: The object that will handle all incoming messages.
+ :param fanout: Whether or not this is a fanout topic. See the
+ documentation for the topic parameter for some
+ additional comments on this.
+ """
+ raise NotImplementedError()
+
+ def create_worker(self, conf, topic, proxy, pool_name):
+ """Create a worker on this connection.
+
+ A worker is like a regular consumer of messages directed to a
+ topic, except that it is part of a set of such consumers (the
+ "pool") which may run in parallel. Every pool of workers will
+ receive a given message, but only one worker in the pool will
+ be asked to process it. Load is distributed across the members
+ of the pool in round-robin fashion.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic.
+ :param proxy: The object that will handle all incoming messages.
+ :param pool_name: String containing the name of the pool of workers
+ """
+ raise NotImplementedError()
+
+ def consume_in_thread(self):
+ """Spawn a thread to handle incoming messages.
+
+ Spawn a thread that will be responsible for handling all incoming
+ messages for consumers that were set up on this connection.
+
+ Message dispatching inside of this is expected to be implemented in a
+ non-blocking manner. An example implementation would be having this
+ thread pull messages in for all of the consumers, but utilize a thread
+ pool for dispatching the messages to the proxy objects.
+ """
+ raise NotImplementedError()
+
+
+def _safe_log(log_func, msg, msg_data):
+ """Sanitizes the msg_data field before logging."""
+ SANITIZE = {'set_admin_password': ('new_pass',),
+ 'run_instance': ('admin_password',), }
+
+ has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
+ has_context_token = '_context_auth_token' in msg_data
+ has_token = 'auth_token' in msg_data
+
+ if not any([has_method, has_context_token, has_token]):
+ return log_func(msg, msg_data)
+
+ msg_data = copy.deepcopy(msg_data)
+
+ if has_method:
+ method = msg_data['method']
+ if method in SANITIZE:
+ args_to_sanitize = SANITIZE[method]
+ for arg in args_to_sanitize:
+ try:
+ msg_data['args'][arg] = "<SANITIZED>"
+ except KeyError:
+ pass
+
+ if has_context_token:
+ msg_data['_context_auth_token'] = '<SANITIZED>'
+
+ if has_token:
+ msg_data['auth_token'] = '<SANITIZED>'
+
+ return log_func(msg, msg_data)
+
+
+def serialize_remote_exception(failure_info):
+ """Prepares exception data to be sent over rpc.
+
+ Failure_info should be a sys.exc_info() tuple.
+
+ """
+ tb = traceback.format_exception(*failure_info)
+ failure = failure_info[1]
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
+
+ kwargs = {}
+ if hasattr(failure, 'kwargs'):
+ kwargs = failure.kwargs
+
+ data = {
+ 'class': str(failure.__class__.__name__),
+ 'module': str(failure.__class__.__module__),
+ 'message': unicode(failure),
+ 'tb': tb,
+ 'args': failure.args,
+ 'kwargs': kwargs
+ }
+
+ json_data = jsonutils.dumps(data)
+
+ return json_data
+
+
+def deserialize_remote_exception(conf, data):
+ failure = jsonutils.loads(str(data))
+
+ trace = failure.get('tb', [])
+ message = failure.get('message', "") + "\n" + "\n".join(trace)
+ name = failure.get('class')
+ module = failure.get('module')
+
+ # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
+ # order to prevent arbitrary code execution.
+ if not module in conf.allowed_rpc_exception_modules:
+ return RemoteError(name, failure.get('message'), trace)
+
+ try:
+ mod = importutils.import_module(module)
+ klass = getattr(mod, name)
+ if not issubclass(klass, Exception):
+ raise TypeError("Can only deserialize Exceptions")
+
+ failure = klass(**failure.get('kwargs', {}))
+ except (AttributeError, TypeError, ImportError):
+ return RemoteError(name, failure.get('message'), trace)
+
+ ex_type = type(failure)
+ str_override = lambda self: message
+ new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+ {'__str__': str_override, '__unicode__': str_override})
+ try:
+ # NOTE(ameade): Dynamically create a new exception type and swap it in
+ # as the new type for the exception. This only works on user defined
+ # Exceptions and not core python exceptions. This is important because
+ # we cannot necessarily change an exception message so we must override
+ # the __str__ method.
+ failure.__class__ = new_ex_type
+ except TypeError as e:
+ # NOTE(ameade): If a core exception then just add the traceback to the
+ # first exception argument.
+ failure.args = (message,) + failure.args[1:]
+ return failure
+
+
+class CommonRpcContext(object):
+ def __init__(self, **kwargs):
+ self.values = kwargs
+
+ def __getattr__(self, key):
+ try:
+ return self.values[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def to_dict(self):
+ return copy.deepcopy(self.values)
+
+ @classmethod
+ def from_dict(cls, values):
+ return cls(**values)
+
+ def deepcopy(self):
+ return self.from_dict(self.to_dict())
+
+ def update_store(self):
+ local.store.context = self
+
+ def elevated(self, read_deleted=None, overwrite=False):
+ """Return a version of this context with admin flag set."""
+ # TODO(russellb) This method is a bit of a nova-ism. It makes
+ # some assumptions about the data in the request context sent
+ # across rpc, while the rest of this class does not. We could get
+ # rid of this if we changed the nova code that uses this to
+ # convert the RpcContext back to its native RequestContext doing
+ # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
+
+ context = self.deepcopy()
+ context.values['is_admin'] = True
+
+ context.values.setdefault('roles', [])
+
+ if 'admin' not in context.values['roles']:
+ context.values['roles'].append('admin')
+
+ if read_deleted is not None:
+ context.values['read_deleted'] = read_deleted
+
+ return context
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them. RPC API
+version numbers are in the form:
+
+ Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+ A = X
+
+ B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API. A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time. However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+
+
+EXAMPLES:
+
+Nova was the first project to use versioned rpc APIs. Consider the compute rpc
+API as an example. The client side is in nova/compute/rpcapi.py and the server
+side is in nova/compute/manager.py.
+
+
+Example 1) Adding a new method.
+
+Adding a new method is a backwards compatible change. It should be added to
+nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
+X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
+have a specific version specified to indicate the minimum API version that must
+be implemented for the method to be supported. For example:
+
+ def get_host_uptime(self, ctxt, host):
+ topic = _compute_topic(self.topic, ctxt, host, None)
+ return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
+ version='1.1')
+
+In this case, version '1.1' is the first version that supported the
+get_host_uptime() method.
+
+
+Example 2) Adding a new parameter.
+
+Adding a new parameter to an rpc method can be made backwards compatible. The
+RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
+The implementation of the method must not expect the parameter to be present.
+
+ def some_remote_method(self, arg1, arg2, newarg=None):
+ # The code needs to deal with newarg=None for cases
+ # where an older client sends a message without it.
+ pass
+
+On the client side, the same changes should be made as in example 1. The
+minimum version that supports the new parameter should be specified.
+"""
+
+from heat.openstack.common.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+ """Dispatch rpc messages according to the requested API version.
+
+ This class can be used as the top level 'manager' for a service. It
+ contains a list of underlying managers that have an API_VERSION attribute.
+ """
+
+ def __init__(self, callbacks):
+ """Initialize the rpc dispatcher.
+
+ :param callbacks: List of proxy objects that are an instance
+ of a class with rpc methods exposed. Each proxy
+ object should have an RPC_API_VERSION attribute.
+ """
+ self.callbacks = callbacks
+ super(RpcDispatcher, self).__init__()
+
+ @staticmethod
+ def _is_compatible(mversion, version):
+ """Determine whether versions are compatible.
+
+ :param mversion: The API version implemented by a callback.
+ :param version: The API version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ mversion_parts = mversion.split('.')
+ if int(version_parts[0]) != int(mversion_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(mversion_parts[1]): # Minor
+ return False
+ return True
+
+ def dispatch(self, ctxt, version, method, **kwargs):
+ """Dispatch a message based on a requested version.
+
+ :param ctxt: The request context
+ :param version: The requested API version from the incoming message
+ :param method: The method requested to be called by the incoming
+ message.
+ :param kwargs: A dict of keyword arguments to be passed to the method.
+
+ :returns: Whatever is returned by the underlying method that gets
+ called.
+ """
+ if not version:
+ version = '1.0'
+
+ had_compatible = False
+ for proxyobj in self.callbacks:
+ if hasattr(proxyobj, 'RPC_API_VERSION'):
+ rpc_api_version = proxyobj.RPC_API_VERSION
+ else:
+ rpc_api_version = '1.0'
+ is_compatible = self._is_compatible(rpc_api_version, version)
+ had_compatible = had_compatible or is_compatible
+ if not hasattr(proxyobj, method):
+ continue
+ if is_compatible:
+ return getattr(proxyobj, method)(ctxt, **kwargs)
+
+ if had_compatible:
+ raise AttributeError("No such RPC function '%s'" % method)
+ else:
+ raise rpc_common.UnsupportedRpcVersion(version=version)
"""
import inspect
-import json
-import signal
-import sys
import time
-import traceback
import eventlet
-from heat.common import context
-from heat.common import config
-from heat.rpc import common as rpc_common
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
-FLAGS = config.FLAGS
-
-class RpcContext(context.RequestContext):
- def __init__(self, *args, **kwargs):
- super(RpcContext, self).__init__(*args, **kwargs)
+class RpcContext(rpc_common.CommonRpcContext):
+ def __init__(self, **kwargs):
+ super(RpcContext, self).__init__(**kwargs)
self._response = []
self._done = False
+ def deepcopy(self):
+ values = self.to_dict()
+ new_inst = self.__class__(**values)
+ new_inst._response = self._response
+ new_inst._done = self._done
+ return new_inst
+
def reply(self, reply=None, failure=None, ending=False):
if ending:
self._done = True
self.topic = topic
self.proxy = proxy
- def call(self, context, method, args, timeout):
- node_func = getattr(self.proxy, method)
- node_args = dict((str(k), v) for k, v in args.iteritems())
+ def call(self, context, version, method, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(context, version, method, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
else:
res.append(rval)
done.send(res)
- except Exception:
- exc_info = sys.exc_info()
- done.send_exception(
- rpc_common.RemoteError(exc_info[0].__name__,
- str(exc_info[1]),
- ''.join(traceback.format_exception(*exc_info))))
+ except Exception as e:
+ done.send_exception(e)
thread = eventlet.greenthread.spawn(_inner)
pass
-def create_connection(new=True):
+def create_connection(conf, new=True):
"""Create a connection"""
return Connection()
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- json.dumps(msg)
+ jsonutils.dumps(msg)
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
check_serialize(msg)
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, method, args, timeout)
+ return consumer.call(context, version, method, args, timeout)
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg, timeout)
+ rv = multicall(conf, context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return rv[-1]
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
try:
- call(context, topic, msg)
- except rpc_common.RemoteError:
+ call(conf, context, topic, msg)
+ except Exception:
pass
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
check_serialize(msg)
pass
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
"""Cast to all consumers of a topic"""
check_serialize(msg)
method = msg.get('method')
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, method, args, None)
- except rpc_common.RemoteError:
+ consumer.call(context, version, method, args, None)
+ except Exception:
pass
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
import socket
import ssl
import eventlet
import greenlet
import kombu
+import kombu.connection
import kombu.entity
import kombu.messaging
-import kombu.connection
-from heat.common import config
from heat.openstack.common import cfg
-from heat.rpc import amqp as rpc_amqp
-from heat.rpc import common as rpc_common
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common.rpc import amqp as rpc_amqp
+from heat.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
help=('SSL certification authority file '
- '(valid only if SSL enabled)')),
- ]
+ '(valid only if SSL enabled)')),
+ cfg.StrOpt('rabbit_host',
+ default='localhost',
+ help='the RabbitMQ host'),
+ cfg.IntOpt('rabbit_port',
+ default=5672,
+ help='the RabbitMQ port'),
+ cfg.BoolOpt('rabbit_use_ssl',
+ default=False,
+ help='connect over SSL for RabbitMQ'),
+ cfg.StrOpt('rabbit_userid',
+ default='guest',
+ help='the RabbitMQ userid'),
+ cfg.StrOpt('rabbit_password',
+ default='guest',
+ help='the RabbitMQ password'),
+ cfg.StrOpt('rabbit_virtual_host',
+ default='/',
+ help='the RabbitMQ virtual host'),
+ cfg.IntOpt('rabbit_retry_interval',
+ default=1,
+ help='how frequently to retry connecting with RabbitMQ'),
+ cfg.IntOpt('rabbit_retry_backoff',
+ default=2,
+ help='how long to backoff for between retries when connecting '
+ 'to RabbitMQ'),
+ cfg.IntOpt('rabbit_max_retries',
+ default=0,
+ help='maximum retries with trying to connect to RabbitMQ '
+ '(the default of 0 implies an infinite retry count)'),
+ cfg.BoolOpt('rabbit_durable_queues',
+ default=False,
+ help='use durable queues in RabbitMQ'),
+
+]
+
+cfg.CONF.register_opts(kombu_opts)
-FLAGS = config.FLAGS
-FLAGS.register_opts(kombu_opts)
LOG = rpc_common.LOG
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
- def __init__(self, channel, msg_id, callback, tag, **kwargs):
+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
'channel' is the amqp channel to use
"""
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=msg_id,
- type='direct',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(DirectConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=msg_id,
- exchange=exchange,
- routing_key=msg_id,
- **options)
+ exchange = kombu.entity.Exchange(name=msg_id,
+ type='direct',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(DirectConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=msg_id,
+ exchange=exchange,
+ routing_key=msg_id,
+ **options)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, channel, topic, callback, tag, **kwargs):
+ def __init__(self, conf, channel, topic, callback, tag, name=None,
+ **kwargs):
"""Init a 'topic' queue.
- 'channel' is the amqp channel to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
+ :param channel: the amqp channel to use
+ :param topic: the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param tag: a unique ID for the consumer on the channel
+ :param name: optional queue name, defaults to topic
+ :paramtype name: str
- Other kombu options may be passed
+ Other kombu options may be passed as keyword arguments
"""
# Default options
- options = {'durable': FLAGS.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ options = {'durable': conf.rabbit_durable_queues,
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=FLAGS.control_exchange,
- type='topic',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(TopicConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=topic,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=conf.control_exchange,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(TopicConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=name or topic,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
- def __init__(self, channel, topic, callback, tag, **kwargs):
+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
'channel' is the amqp channel to use
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=exchange_name,
- type='fanout',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(FanoutConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=queue_name,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(FanoutConsumer, self).__init__(channel, callback, tag,
+ name=queue_name,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class Publisher(object):
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
- **self.kwargs)
+ **self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
- channel=channel, routing_key=self.routing_key)
+ channel=channel,
+ routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
- def __init__(self, channel, msg_id, **kwargs):
+ def __init__(self, conf, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(DirectPublisher, self).__init__(channel,
- msg_id,
- msg_id,
- type='direct',
- **options)
+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+ type='direct', **options)
class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
- def __init__(self, channel, topic, **kwargs):
+ def __init__(self, conf, channel, topic, **kwargs):
"""init a 'topic' publisher.
Kombu options may be passed as keyword args to override defaults
"""
- options = {'durable': FLAGS.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ options = {'durable': conf.rabbit_durable_queues,
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- super(TopicPublisher, self).__init__(channel,
- FLAGS.control_exchange,
- topic,
- type='topic',
- **options)
+ super(TopicPublisher, self).__init__(channel, conf.control_exchange,
+ topic, type='topic', **options)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
- def __init__(self, channel, topic, **kwargs):
+ def __init__(self, conf, channel, topic, **kwargs):
"""init a 'fanout' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(FanoutPublisher, self).__init__(channel,
- '%s_fanout' % topic,
- None,
- type='fanout',
- **options)
+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+ None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'"""
- def __init__(self, *args, **kwargs):
- self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
- super(NotifyPublisher, self).__init__(*args, **kwargs)
+ def __init__(self, conf, channel, topic, **kwargs):
+ self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
super(NotifyPublisher, self).reconnect(channel)
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
- exchange=self.exchange,
- durable=self.durable,
- name=self.routing_key,
- routing_key=self.routing_key)
+ exchange=self.exchange,
+ durable=self.durable,
+ name=self.routing_key,
+ routing_key=self.routing_key)
queue.declare()
class Connection(object):
"""Connection object."""
- def __init__(self, server_params=None):
+ pool = None
+
+ def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
- self.max_retries = FLAGS.rabbit_max_retries
+ self.conf = conf
+ self.max_retries = self.conf.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
- self.interval_start = FLAGS.rabbit_retry_interval
- self.interval_stepping = FLAGS.rabbit_retry_backoff
+ self.interval_start = self.conf.rabbit_retry_interval
+ self.interval_stepping = self.conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
- params.setdefault('hostname', FLAGS.rabbit_host)
- params.setdefault('port', FLAGS.rabbit_port)
- params.setdefault('userid', FLAGS.rabbit_userid)
- params.setdefault('password', FLAGS.rabbit_password)
- params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
+ params.setdefault('hostname', self.conf.rabbit_host)
+ params.setdefault('port', self.conf.rabbit_port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
self.params = params
- if FLAGS.fake_rabbit:
+ if self.conf.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
else:
self.memory_transport = False
- if FLAGS.rabbit_use_ssl:
+ if self.conf.rabbit_use_ssl:
self.params['ssl'] = self._fetch_ssl_params()
self.connection = None
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
- if FLAGS.kombu_ssl_version:
- ssl_params['ssl_version'] = FLAGS.kombu_ssl_version
- if FLAGS.kombu_ssl_keyfile:
- ssl_params['keyfile'] = FLAGS.kombu_ssl_keyfile
- if FLAGS.kombu_ssl_certfile:
- ssl_params['certfile'] = FLAGS.kombu_ssl_certfile
- if FLAGS.kombu_ssl_ca_certs:
- ssl_params['ca_certs'] = FLAGS.kombu_ssl_ca_certs
+ if self.conf.kombu_ssl_version:
+ ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+ if self.conf.kombu_ssl_keyfile:
+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
+ if self.conf.kombu_ssl_certfile:
+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
+ if self.conf.kombu_ssl_ca_certs:
+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection_errors:
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(
- **self.params)
+ self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on '
- '%(hostname)s:%(port)d') % self.params)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
+ self.params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
try:
self._connect()
return
- except self.connection_errors, e:
+ except (self.connection_errors, IOError), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ ' unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
- except (self.connection_errors, socket.timeout), e:
+ except (self.connection_errors, socket.timeout, IOError), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
- consumer = consumer_cls(self.channel, topic, callback,
- self.consumer_num.next())
+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
+ self.consumer_num.next())
self.consumers.append(consumer)
return consumer
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
info['do_consume'] = True
def _consume():
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publish():
- publisher = cls(self.channel, topic, **kwargs)
+ publisher = cls(self.conf, self.channel, topic, **kwargs)
publisher.send(msg)
self.ensure(_error_callback, _publish)
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
if fanout:
- self.declare_fanout_consumer(topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ self.declare_fanout_consumer(topic, proxy_cb)
else:
- self.declare_topic_consumer(topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
-
+ self.declare_topic_consumer(topic, proxy_cb)
-Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
-def create_connection(new=True):
+def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new, Connection.pool)
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg, Connection.pool)
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def cast_to_server(context, server_params, topic, msg):
+def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(context, server_params, topic, msg,
- Connection.pool)
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def fanout_cast_to_server(context, server_params, topic, msg):
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.cast_to_server(context, server_params, topic, msg,
- Connection.pool)
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg, Connection.pool)
+ return rpc_amqp.notify(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
+import logging
import time
import uuid
-import json
import eventlet
import greenlet
import qpid.messaging
import qpid.messaging.exceptions
-from heat.common import config
from heat.openstack.common import cfg
-from heat.rpc import amqp as rpc_amqp
-from heat.rpc import common as rpc_common
-
-from heat.openstack.common import log as logging
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import amqp as rpc_amqp
+from heat.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
+qpid_opts = [
+ cfg.StrOpt('qpid_hostname',
+ default='localhost',
+ help='Qpid broker hostname'),
+ cfg.StrOpt('qpid_port',
+ default='5672',
+ help='Qpid broker port'),
+ cfg.StrOpt('qpid_username',
+ default='',
+ help='Username for qpid connection'),
+ cfg.StrOpt('qpid_password',
+ default='',
+ help='Password for qpid connection'),
+ cfg.StrOpt('qpid_sasl_mechanisms',
+ default='',
+ help='Space separated list of SASL mechanisms to use for auth'),
+ cfg.BoolOpt('qpid_reconnect',
+ default=True,
+ help='Automatically reconnect'),
+ cfg.IntOpt('qpid_reconnect_timeout',
+ default=0,
+ help='Reconnection timeout in seconds'),
+ cfg.IntOpt('qpid_reconnect_limit',
+ default=0,
+ help='Max reconnections before giving up'),
+ cfg.IntOpt('qpid_reconnect_interval_min',
+ default=0,
+ help='Minimum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval_max',
+ default=0,
+ help='Maximum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval',
+ default=0,
+ help='Equivalent to setting max and min to the same value'),
+ cfg.IntOpt('qpid_heartbeat',
+ default=5,
+ help='Seconds between connection keepalive heartbeats'),
+ cfg.StrOpt('qpid_protocol',
+ default='tcp',
+ help="Transport to use, either 'tcp' or 'ssl'"),
+ cfg.BoolOpt('qpid_tcp_nodelay',
+ default=True,
+ help='Disable Nagle algorithm'),
+]
+
+cfg.CONF.register_opts(qpid_opts)
+
class ConsumerBase(object):
"""Consumer base class."""
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
- self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
- self.callback(message.content)
+ try:
+ self.callback(message.content)
+ except Exception:
+ LOG.exception(_("Failed to process message... skipping it."))
+ finally:
+ self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
- def __init__(self, session, msg_id, callback):
+ def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
'session' is the amqp session to use
"""
super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, session, topic, callback):
+ def __init__(self, conf, session, topic, callback, name=None):
"""Init a 'topic' queue.
- 'session' is the amqp session to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
+ :param session: the amqp session to use
+ :param topic: is the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param name: optional queue name, defaults to topic
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (cfg.CONF.control_exchange, topic), {},
- topic, {})
+ "%s/%s" % (conf.control_exchange,
+ topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
- def __init__(self, session, topic, callback):
+ def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
'session' is the amqp session to use
'callback' is the callback to call when messages are received
"""
- super(FanoutConsumer, self).__init__(session, callback,
- "%s_fanout" % topic,
- {"durable": False, "type": "fanout"},
- "%s_fanout_%s" % (topic, uuid.uuid4().hex),
- {"exclusive": True})
+ super(FanoutConsumer, self).__init__(
+ session, callback,
+ "%s_fanout" % topic,
+ {"durable": False, "type": "fanout"},
+ "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+ {"exclusive": True})
class Publisher(object):
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
- self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
- def __init__(self, session, msg_id):
+ def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})
class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
- def __init__(self, session, topic):
+ def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(session,
- "%s/%s" % (cfg.CONF.control_exchange, topic))
+ super(TopicPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
- def __init__(self, session, topic):
+ def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
- super(FanoutPublisher, self).__init__(session,
- "%s_fanout" % topic, {"type": "fanout"})
+ super(FanoutPublisher, self).__init__(
+ session,
+ "%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
"""Publisher class for notifications"""
- def __init__(self, session, topic):
+ def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (cfg.CONF.control_exchange, topic),
- {"durable": True})
+ super(NotifyPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic),
+ {"durable": True})
class Connection(object):
"""Connection object."""
- def __init__(self, server_params=None):
+ pool = None
+
+ def __init__(self, conf, server_params=None):
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.conf = conf
if server_params is None:
server_params = {}
- default_params = dict(hostname=cfg.CONF.qpid_hostname,
- port=cfg.CONF.qpid_port,
- username=cfg.CONF.qpid_username,
- password=cfg.CONF.qpid_password)
+ default_params = dict(hostname=self.conf.qpid_hostname,
+ port=self.conf.qpid_port,
+ username=self.conf.qpid_username,
+ password=self.conf.qpid_password)
params = server_params
for key in default_params.keys():
# before we call open
self.connection.username = params['username']
self.connection.password = params['password']
- self.connection.sasl_mechanisms = cfg.CONF.qpid_sasl_mechanisms
- self.connection.reconnect = cfg.CONF.qpid_reconnect
- if cfg.CONF.qpid_reconnect_timeout:
- self.connection.reconnect_timeout = \
- cfg.CONF.qpid_reconnect_timeout
- if cfg.CONF.qpid_reconnect_limit:
- self.connection.reconnect_limit = cfg.CONF.qpid_reconnect_limit
- if cfg.CONF.qpid_reconnect_interval_max:
+ self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
+ self.connection.reconnect = self.conf.qpid_reconnect
+ if self.conf.qpid_reconnect_timeout:
+ self.connection.reconnect_timeout = (
+ self.conf.qpid_reconnect_timeout)
+ if self.conf.qpid_reconnect_limit:
+ self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
+ if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
- cfg.CONF.qpid_reconnect_interval_max)
- if cfg.CONF.qpid_reconnect_interval_min:
+ self.conf.qpid_reconnect_interval_max)
+ if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
- cfg.CONF.qpid_reconnect_interval_min)
- if cfg.CONF.qpid_reconnect_interval:
- self.connection.reconnect_interval = \
- cfg.CONF.qpid_reconnect_interval
- self.connection.hearbeat = cfg.CONF.qpid_heartbeat
- self.connection.protocol = cfg.CONF.qpid_protocol
- self.connection.tcp_nodelay = cfg.CONF.qpid_tcp_nodelay
+ self.conf.qpid_reconnect_interval_min)
+ if self.conf.qpid_reconnect_interval:
+ self.connection.reconnect_interval = (
+ self.conf.qpid_reconnect_interval)
+ self.connection.hearbeat = self.conf.qpid_heartbeat
+ self.connection.protocol = self.conf.qpid_protocol
+ self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
try:
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
- LOG.error(_('Unable to connect to AMQP server: %s ') % e)
- time.sleep(cfg.CONF.qpid_reconnect_interval or 1)
+ LOG.error(_('Unable to connect to AMQP server: %s'), e)
+ time.sleep(self.conf.qpid_reconnect_interval or 1)
else:
break
- LOG.info(_('Connected to AMQP server on %s') % self.broker)
+ LOG.info(_('Connected to AMQP server on %s'), self.broker)
self.session = self.connection.session()
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
- consumer = consumer_cls(self.session, topic, callback)
+ consumer = consumer_cls(self.conf, self.session, topic, callback)
self._register_consumer(consumer)
return consumer
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
- publisher = cls(self.session, topic)
+ publisher = cls(self.conf, self.session, topic)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send)
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
if fanout:
- consumer = FanoutConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
else:
- consumer = TopicConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
+
self._register_consumer(consumer)
+
return consumer
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
+
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+ name=pool_name)
-Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+ self._register_consumer(consumer)
+
+ return consumer
-def create_connection(new=True):
+def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new, Connection.pool)
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg, Connection.pool)
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def cast_to_server(context, server_params, topic, msg):
+def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(context, server_params, topic, msg,
- Connection.pool)
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def fanout_cast_to_server(context, server_params, topic, msg):
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
- msg, Connection.pool)
+ return rpc_amqp.fanout_cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg, Connection.pool)
+ return rpc_amqp.notify(conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import pprint
+import socket
+import string
+import sys
+import types
+import uuid
+
+import eventlet
+from eventlet.green import zmq
+import greenlet
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import common as rpc_common
+
+
+# for convenience, are not modified.
+pformat = pprint.pformat
+Timeout = eventlet.timeout.Timeout
+LOG = rpc_common.LOG
+RemoteError = rpc_common.RemoteError
+RPCException = rpc_common.RPCException
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ # The module.Class to use for matchmaking.
+ cfg.StrOpt(
+ 'rpc_zmq_matchmaker',
+ default=('heat.openstack.common.rpc.'
+ 'matchmaker.MatchMakerLocalhost'),
+ help='MatchMaker driver',
+ ),
+
+ # The following port is unassigned by IANA as of 2012-05-21
+ cfg.IntOpt('rpc_zmq_port', default=9501,
+ help='ZeroMQ receiver listening port'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ help='Number of ZeroMQ contexts, defaults to 1'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ help='Directory for holding IPC sockets'),
+ cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+ help='Name of this node. Must be a valid hostname, FQDN, or '
+ 'IP address')
+]
+
+
+# These globals are defined in register_opts(conf),
+# a mandatory initialization call
+FLAGS = None
+ZMQ_CTX = None # ZeroMQ Context, must be global.
+matchmaker = None # memoized matchmaker object
+
+
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
+class ZmqSocket(object):
+ """
+ A tiny wrapper around ZeroMQ to simplify the send/recv protocol
+ and connection management.
+
+ Can be used as a Context (supports the 'with' statement).
+ """
+
+ def __init__(self, addr, zmq_type, bind=True, subscribe=None):
+ self.sock = ZMQ_CTX.socket(zmq_type)
+ self.addr = addr
+ self.type = zmq_type
+ self.subscriptions = []
+
+ # Support failures on sending/receiving on wrong socket type.
+ self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
+ self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
+ self.can_sub = zmq_type in (zmq.SUB, )
+
+ # Support list, str, & None for subscribe arg (cast to list)
+ do_sub = {
+ list: subscribe,
+ str: [subscribe],
+ type(None): []
+ }[type(subscribe)]
+
+ for f in do_sub:
+ self.subscribe(f)
+
+ str_data = {'addr': addr, 'type': self.socket_s(),
+ 'subscribe': subscribe, 'bind': bind}
+
+ LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
+ LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
+ LOG.debug(_("-> bind: %(bind)s"), str_data)
+
+ try:
+ if bind:
+ self.sock.bind(addr)
+ else:
+ self.sock.connect(addr)
+ except Exception:
+ raise RPCException(_("Could not open socket."))
+
+ def socket_s(self):
+ """Get socket type as string."""
+ t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
+ 'DEALER')
+ return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
+
+ def subscribe(self, msg_filter):
+ """Subscribe."""
+ if not self.can_sub:
+ raise RPCException("Cannot subscribe on this socket.")
+ LOG.debug(_("Subscribing to %s"), msg_filter)
+
+ try:
+ self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
+ except Exception:
+ return
+
+ self.subscriptions.append(msg_filter)
+
+ def unsubscribe(self, msg_filter):
+ """Unsubscribe."""
+ if msg_filter not in self.subscriptions:
+ return
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
+ self.subscriptions.remove(msg_filter)
+
+ def close(self):
+ if self.sock is None or self.sock.closed:
+ return
+
+ # We must unsubscribe, or we'll leak descriptors.
+ if len(self.subscriptions) > 0:
+ for f in self.subscriptions:
+ try:
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
+ except Exception:
+ pass
+ self.subscriptions = []
+
+ # Linger -1 prevents lost/dropped messages
+ try:
+ self.sock.close(linger=-1)
+ except Exception:
+ pass
+ self.sock = None
+
+ def recv(self):
+ if not self.can_recv:
+ raise RPCException(_("You cannot recv on this socket."))
+ return self.sock.recv_multipart()
+
+ def send(self, data):
+ if not self.can_send:
+ raise RPCException(_("You cannot send on this socket."))
+ self.sock.send_multipart(data)
+
+
+class ZmqClient(object):
+ """Client for ZMQ sockets."""
+
+ def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+ def cast(self, msg_id, topic, data):
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
+
+ def close(self):
+ self.outq.close()
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call."""
+ def __init__(self, **kwargs):
+ self.replies = []
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['replies'] = self.replies
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ return
+ self.replies.append(reply)
+
+ @classmethod
+ def marshal(self, ctx):
+ ctx_data = ctx.to_dict()
+ return _serialize(ctx_data)
+
+ @classmethod
+ def unmarshal(self, data):
+ return RpcContext.from_dict(_deserialize(data))
+
+
+class InternalContext(object):
+ """Used by ConsumerBase as a private context for - methods."""
+
+ def __init__(self, proxy):
+ self.proxy = proxy
+ self.msg_waiter = None
+
+ def _get_response(self, ctx, proxy, topic, data):
+ """Process a curried message and cast the result to topic."""
+ LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+
+ try:
+ result = proxy.dispatch(
+ ctx, data['version'], data['method'], **data['args'])
+ return ConsumerBase.normalize_reply(result, ctx.replies)
+ except greenlet.GreenletExit:
+ # ignore these since they are just from shutdowns
+ pass
+ except Exception:
+ return {'exc':
+ rpc_common.serialize_remote_exception(sys.exc_info())}
+
+ def reply(self, ctx, proxy,
+ msg_id=None, context=None, topic=None, msg=None):
+ """Reply to a casted call."""
+ # Our real method is curried into msg['args']
+
+ child_ctx = RpcContext.unmarshal(msg[0])
+ response = ConsumerBase.normalize_reply(
+ self._get_response(child_ctx, proxy, topic, msg[1]),
+ ctx.replies)
+
+ LOG.debug(_("Sending reply"))
+ cast(FLAGS, ctx, topic, {
+ 'method': '-process_reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'response': response
+ }
+ })
+
+
+class ConsumerBase(object):
+ """Base Consumer."""
+
+ def __init__(self):
+ self.private_ctx = InternalContext(None)
+
+ @classmethod
+ def normalize_reply(self, result, replies):
+ #TODO(ewindisch): re-evaluate and document this method.
+ if isinstance(result, types.GeneratorType):
+ return list(result)
+ elif replies:
+ return replies
+ else:
+ return [result]
+
+ def process(self, style, target, proxy, ctx, data):
+ # Method starting with - are
+ # processed internally. (non-valid method name)
+ method = data['method']
+
+ # Internal method
+ # uses internal context for safety.
+ if data['method'][0] == '-':
+ # For reply / process_reply
+ method = method[1:]
+ if method == 'reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
+ return
+
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+ proxy.dispatch(ctx, data['version'],
+ data['method'], **data['args'])
+
+
+class ZmqBaseReactor(ConsumerBase):
+ """
+ A consumer class implementing a
+ centralized casting broker (PULL-PUSH)
+ for RoundRobin requests.
+ """
+
+ def __init__(self, conf):
+ super(ZmqBaseReactor, self).__init__()
+
+ self.conf = conf
+ self.mapping = {}
+ self.proxies = {}
+ self.threads = []
+ self.sockets = []
+ self.subscribe = {}
+
+ self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
+
+ def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
+ zmq_type_out=None, in_bind=True, out_bind=True,
+ subscribe=None):
+
+ LOG.info(_("Registering reactor"))
+
+ if zmq_type_in not in (zmq.PULL, zmq.SUB):
+ raise RPCException("Bad input socktype")
+
+ # Items push in.
+ inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
+ subscribe=subscribe)
+
+ self.proxies[inq] = proxy
+ self.sockets.append(inq)
+
+ LOG.info(_("In reactor registered"))
+
+ if not out_addr:
+ return
+
+ if zmq_type_out not in (zmq.PUSH, zmq.PUB):
+ raise RPCException("Bad output socktype")
+
+ # Items push out.
+ outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
+
+ self.mapping[inq] = outq
+ self.mapping[outq] = inq
+ self.sockets.append(outq)
+
+ LOG.info(_("Out reactor registered"))
+
+ def consume_in_thread(self):
+ def _consume(sock):
+ LOG.info(_("Consuming socket"))
+ while True:
+ self.consume(sock)
+
+ for k in self.proxies.keys():
+ self.threads.append(
+ self.pool.spawn(_consume, k)
+ )
+
+ def wait(self):
+ for t in self.threads:
+ t.wait()
+
+ def close(self):
+ for s in self.sockets:
+ s.close()
+
+ for t in self.threads:
+ t.kill()
+
+
+class ZmqProxy(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ topic-based proxy, forwarding to
+ IPC sockets.
+ """
+
+ def __init__(self, conf):
+ super(ZmqProxy, self).__init__(conf)
+
+ self.topic_proxy = {}
+ ipc_dir = conf.rpc_zmq_ipc_dir
+
+ self.topic_proxy['zmq_replies'] = \
+ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
+ zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['zmq_replies'])
+
+ def consume(self, sock):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ msg_id, topic, style, in_msg = data
+ topic = topic.split('.', 1)[0]
+
+ LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+ # Handle zmq_replies magic
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ elif topic.startswith('zmq_replies'):
+ sock_type = zmq.PUB
+ inside = _deserialize(in_msg)
+ msg_id = inside[-1]['args']['msg_id']
+ response = inside[-1]['args']['response']
+ LOG.debug(_("->response->%s"), response)
+ data = [str(msg_id), _serialize(response)]
+ else:
+ sock_type = zmq.PUSH
+
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
+ self.topic_proxy[topic].send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+
+
+class ZmqReactor(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ consumer for messages. Can also be
+ used as a 1:1 proxy
+ """
+
+ def __init__(self, conf):
+ super(ZmqReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+ if sock in self.mapping:
+ LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
+ 'data': data})
+ self.mapping[sock].send(data)
+ return
+
+ msg_id, topic, style, in_msg = data
+
+ ctx, request = _deserialize(in_msg)
+ ctx = RpcContext.unmarshal(ctx)
+
+ proxy = self.proxies[sock]
+
+ self.pool.spawn_n(self.process, style, topic,
+ proxy, ctx, request)
+
+
+class Connection(rpc_common.Connection):
+ """Manages connections and threads."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.reactor = ZmqReactor(conf)
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ LOG.info(_("Create Consumer for topic (%(topic)s)") %
+ {'topic': topic})
+
+ # Subscription scenarios
+ if fanout:
+ subscribe = ('', fanout)[type(fanout) == str]
+ sock_type = zmq.SUB
+ topic = 'fanout~' + topic
+ else:
+ sock_type = zmq.PULL
+ subscribe = None
+
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (self.conf.rpc_zmq_ipc_dir, topic)
+
+ LOG.debug(_("Consumer is a zmq.%s"),
+ ['PULL', 'SUB'][sock_type == zmq.SUB])
+
+ self.reactor.register(proxy, inaddr, sock_type,
+ subscribe=subscribe, in_bind=False)
+
+ def close(self):
+ self.reactor.close()
+
+ def wait(self):
+ self.reactor.wait()
+
+ def consume_in_thread(self):
+ self.reactor.consume_in_thread()
+
+
+def _cast(addr, context, msg_id, topic, msg, timeout=None):
+ timeout_cast = timeout or FLAGS.rpc_cast_timeout
+ payload = [RpcContext.marshal(context), msg]
+
+ with Timeout(timeout_cast, exception=rpc_common.Timeout):
+ try:
+ conn = ZmqClient(addr)
+
+ # assumes cast can't return an exception
+ conn.cast(msg_id, topic, payload)
+ except zmq.ZMQError:
+ raise RPCException("Cast failed. ZMQ Socket Exception")
+ finally:
+ if 'conn' in vars():
+ conn.close()
+
+
+def _call(addr, context, msg_id, topic, msg, timeout=None):
+ # timeout_response is how long we wait for a response
+ timeout = timeout or FLAGS.rpc_response_timeout
+
+ # The msg_id is used to track replies.
+ msg_id = str(uuid.uuid4().hex)
+
+ # Replies always come into the reply service.
+ reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+
+ LOG.debug(_("Creating payload"))
+ # Curry the original request into a reply method.
+ mcontext = RpcContext.marshal(context)
+ payload = {
+ 'method': '-reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'context': mcontext,
+ 'topic': reply_topic,
+ 'msg': [mcontext, msg]
+ }
+ }
+
+ LOG.debug(_("Creating queue socket for reply waiter"))
+
+ # Messages arriving async.
+ # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
+ with Timeout(timeout, exception=rpc_common.Timeout):
+ try:
+ msg_waiter = ZmqSocket(
+ "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+ zmq.SUB, subscribe=msg_id, bind=False
+ )
+
+ LOG.debug(_("Sending cast"))
+ _cast(addr, context, msg_id, topic, payload)
+
+ LOG.debug(_("Cast sent; Waiting reply"))
+ # Blocks until receives reply
+ msg = msg_waiter.recv()
+ LOG.debug(_("Received message: %s"), msg)
+ LOG.debug(_("Unpacking response"))
+ responses = _deserialize(msg[-1])
+ # ZMQError trumps the Timeout error.
+ except zmq.ZMQError:
+ raise RPCException("ZMQ Socket Error")
+ finally:
+ if 'msg_waiter' in vars():
+ msg_waiter.close()
+
+ # It seems we don't need to do all of the following,
+ # but perhaps it would be useful for multicall?
+ # One effect of this is that we're checking all
+ # responses for Exceptions.
+ for resp in responses:
+ if isinstance(resp, types.DictType) and 'exc' in resp:
+ raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+
+ return responses[-1]
+
+
+def _multi_send(method, context, topic, msg, timeout=None):
+ """
+ Wraps the sending of messages,
+ dispatches to the matchmaker and sends
+ message to all relevant hosts.
+ """
+ conf = FLAGS
+ LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+
+ queues = matchmaker.queues(topic)
+ LOG.debug(_("Sending message(s) to: %s"), queues)
+
+ # Don't stack if we have no matchmaker results
+ if len(queues) == 0:
+ LOG.warn(_("No matchmaker results. Not casting."))
+ # While not strictly a timeout, callers know how to handle
+ # this exception and a timeout isn't too big a lie.
+ raise rpc_common.Timeout, "No match from matchmaker."
+
+ # This supports brokerless fanout (addresses > 1)
+ for queue in queues:
+ (_topic, ip_addr) = queue
+ _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
+
+ if method.__name__ == '_cast':
+ eventlet.spawn_n(method, _addr, context,
+ _topic, _topic, msg, timeout)
+ return
+ return method(_addr, context, _topic, _topic, msg, timeout)
+
+
+def create_connection(conf, new=True):
+ return Connection(conf)
+
+
+def multicall(conf, *args, **kwargs):
+ """Multiple calls."""
+ register_opts(conf)
+ return _multi_send(_call, *args, **kwargs)
+
+
+def call(conf, *args, **kwargs):
+ """Send a message, expect a response."""
+ register_opts(conf)
+ data = _multi_send(_call, *args, **kwargs)
+ return data[-1]
+
+
+def cast(conf, *args, **kwargs):
+ """Send a message expecting no reply."""
+ register_opts(conf)
+ _multi_send(_cast, *args, **kwargs)
+
+
+def fanout_cast(conf, context, topic, msg, **kwargs):
+ """Send a message to all listening and expect no reply."""
+ register_opts(conf)
+ # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
+ # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+def notify(conf, context, topic, msg, **kwargs):
+ """
+ Send notification event.
+ Notifications are sent to topic-priority.
+ This differs from the AMQP drivers which send to topic.priority.
+ """
+ register_opts(conf)
+ # NOTE(ewindisch): dot-priority in rpc notifier does not
+ # work with our assumptions.
+ topic.replace('.', '-')
+ cast(conf, context, topic, msg, **kwargs)
+
+
+def cleanup():
+ """Clean up resources in use by implementation."""
+ global ZMQ_CTX
+ global matchmaker
+ matchmaker = None
+ ZMQ_CTX.destroy()
+ ZMQ_CTX = None
+
+
+def register_opts(conf):
+ """Registration of options for this driver."""
+ #NOTE(ewindisch): ZMQ_CTX and matchmaker
+ # are initialized here as this is as good
+ # an initialization method as any.
+
+ # We memoize through these globals
+ global ZMQ_CTX
+ global matchmaker
+ global FLAGS
+
+ if not FLAGS:
+ conf.register_opts(zmq_opts)
+ FLAGS = conf
+ # Don't re-set, if this method is called twice.
+ if not ZMQ_CTX:
+ ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ if not matchmaker:
+ # rpc_zmq_matchmaker should be set to a 'module.Class'
+ mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_module = '.'.join(mm_path[:-1])
+ mm_class = mm_path[-1]
+
+ # Only initialize a class.
+ if mm_path[-1][0] not in string.ascii_uppercase:
+ LOG.error(_("Matchmaker could not be loaded.\n"
+ "rpc_zmq_matchmaker is not a class."))
+ raise RPCException(_("Error loading Matchmaker."))
+
+ mm_impl = importutils.import_module(mm_module)
+ mm_constructor = getattr(mm_impl, mm_class)
+ matchmaker = mm_constructor()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import contextlib
+import itertools
+import json
+import logging
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+
+
+matchmaker_opts = [
+ # Matchmaker ring file
+ cfg.StrOpt('matchmaker_ringfile',
+ default='/etc/nova/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts)
+LOG = logging.getLogger(__name__)
+contextmanager = contextlib.contextmanager
+
+
+class MatchMakerException(Exception):
+ """Signified a match could not be found."""
+ message = _("Match not found by MatchMaker.")
+
+
+class Exchange(object):
+ """
+ Implements lookups.
+ Subclass this to support hashtables, dns, etc.
+ """
+ def __init__(self):
+ pass
+
+ def run(self, key):
+ raise NotImplementedError()
+
+
+class Binding(object):
+ """
+ A binding on which to perform a lookup.
+ """
+ def __init__(self):
+ pass
+
+ def test(self, key):
+ raise NotImplementedError()
+
+
+class MatchMakerBase(object):
+ """Match Maker Base Class."""
+
+ def __init__(self):
+ # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+ self.bindings = []
+
+ def add_binding(self, binding, rule, last=True):
+ self.bindings.append((binding, rule, False, last))
+
+ #NOTE(ewindisch): kept the following method in case we implement the
+ # underlying support.
+ #def add_negate_binding(self, binding, rule, last=True):
+ # self.bindings.append((binding, rule, True, last))
+
+ def queues(self, key):
+ workers = []
+
+ # bit is for negate bindings - if we choose to implement it.
+ # last stops processing rules if this matches.
+ for (binding, exchange, bit, last) in self.bindings:
+ if binding.test(key):
+ workers.extend(exchange.run(key))
+
+ # Support last.
+ if last:
+ return workers
+ return workers
+
+
+class DirectBinding(Binding):
+ """
+ Specifies a host in the key via a '.' character
+ Although dots are used in the key, the behavior here is
+ that it maps directly to a host, thus direct.
+ """
+ def test(self, key):
+ if '.' in key:
+ return True
+ return False
+
+
+class TopicBinding(Binding):
+ """
+ Where a 'bare' key without dots.
+ AMQP generally considers topic exchanges to be those *with* dots,
+ but we deviate here in terminology as the behavior here matches
+ that of a topic exchange (whereas where there are dots, behavior
+ matches that of a direct exchange.
+ """
+ def test(self, key):
+ if '.' not in key:
+ return True
+ return False
+
+
+class FanoutBinding(Binding):
+ """Match on fanout keys, where key starts with 'fanout.' string."""
+ def test(self, key):
+ if key.startswith('fanout~'):
+ return True
+ return False
+
+
+class StubExchange(Exchange):
+ """Exchange that does nothing."""
+ def run(self, key):
+ return [(key, None)]
+
+
+class RingExchange(Exchange):
+ """
+ Match Maker where hosts are loaded from a static file containing
+ a hashmap (JSON formatted).
+
+ __init__ takes optional ring dictionary argument, otherwise
+ loads the ringfile from CONF.mathcmaker_ringfile.
+ """
+ def __init__(self, ring=None):
+ super(RingExchange, self).__init__()
+
+ if ring:
+ self.ring = ring
+ else:
+ fh = open(CONF.matchmaker_ringfile, 'r')
+ self.ring = json.load(fh)
+ fh.close()
+
+ self.ring0 = {}
+ for k in self.ring.keys():
+ self.ring0[k] = itertools.cycle(self.ring[k])
+
+ def _ring_has(self, key):
+ if key in self.ring0:
+ return True
+ return False
+
+
+class RoundRobinRingExchange(RingExchange):
+ """A Topic Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(RoundRobinRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ if not self._ring_has(key):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
+ )
+ return []
+ host = next(self.ring0[key])
+ return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(FanoutRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "fanout~", strip it for lookup.
+ nkey = key.split('fanout~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class LocalhostExchange(Exchange):
+ """Exchange where all direct topics are local."""
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ return [(key.split('.')[0] + '.localhost', 'localhost')]
+
+
+class DirectExchange(Exchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+ b, e = key.split('.', 1)
+ return [(b, e)]
+
+
+class MatchMakerRing(MatchMakerBase):
+ """
+ Match Maker where hosts are loaded from a static hashmap.
+ """
+ def __init__(self, ring=None):
+ super(MatchMakerRing, self).__init__()
+ self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
+
+
+class MatchMakerLocalhost(MatchMakerBase):
+ """
+ Match Maker where all bare topics resolve to localhost.
+ Useful for testing.
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(DirectBinding(), DirectExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange())
+
+
+class MatchMakerStub(MatchMakerBase):
+ """
+ Match Maker where topics are untouched.
+ Useful for testing, or for AMQP/brokered queues.
+ Will not work where knowledge of hosts is known (i.e. zeromq)
+ """
+ def __init__(self):
+ super(MatchMakerLocalhost, self).__init__()
+
+ self.add_binding(FanoutBinding(), StubExchange())
+ self.add_binding(DirectBinding(), StubExchange())
+ self.add_binding(TopicBinding(), StubExchange())
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+ rpc/dispatcher.py
+"""
+
+
+from heat.openstack.common import rpc
+
+
+class RpcProxy(object):
+ """A helper class for rpc clients.
+
+ This class is a wrapper around the RPC client API. It allows you to
+ specify the topic and API version in a single place. This is intended to
+ be used as a base class for a class that implements the client side of an
+ rpc API.
+ """
+
+ def __init__(self, topic, default_version):
+ """Initialize an RpcProxy.
+
+ :param topic: The topic to use for all messages.
+ :param default_version: The default API version to request in all
+ outgoing messages. This can be overridden on a per-message
+ basis.
+ """
+ self.topic = topic
+ self.default_version = default_version
+ super(RpcProxy, self).__init__()
+
+ def _set_version(self, msg, vers):
+ """Helper method to set the version in a message.
+
+ :param msg: The message having a version added to it.
+ :param vers: The version number to add to the message.
+ """
+ msg['version'] = vers if vers else self.default_version
+
+ def _get_topic(self, topic):
+ """Return the topic to use for a message."""
+ return topic if topic else self.topic
+
+ @staticmethod
+ def make_msg(method, **kwargs):
+ return {'method': method, 'args': kwargs}
+
+ def call(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.call() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: The return value from the remote method.
+ """
+ self._set_version(msg, version)
+ return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+ def multicall(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.multicall() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: An iterator that lets you process each of the returned values
+ from the remote method as they arrive.
+ """
+ self._set_version(msg, version)
+ return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+ def cast(self, context, msg, topic=None, version=None):
+ """rpc.cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast() does not wait on any return value from the
+ remote method.
+ """
+ self._set_version(msg, version)
+ rpc.cast(context, self._get_topic(topic), msg)
+
+ def fanout_cast(self, context, msg, version=None):
+ """rpc.fanout_cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast() does not wait on any return value
+ from the remote method.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast(context, self.topic, msg)
+
+ def cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+ def fanout_cast_to_server(self, context, server_params, msg, version=None):
+ """rpc.fanout_cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-
-from heat.openstack.common import cfg
-from heat.openstack.common import exception
-from heat.openstack.common import log as logging
-from heat.common import config
-
-LOG = logging.getLogger(__name__)
-
-
-class RemoteError(exception.OpenstackException):
- """Signifies that a remote class has raised an exception.
-
- Contains a string representation of the type of the original exception,
- the value of the original exception, and the traceback. These are
- sent to the parent as a joined string so printing the exception
- contains all of the relevant info.
-
- """
- message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
-
- def __init__(self, exc_type=None, value=None, traceback=None):
- self.exc_type = exc_type
- self.value = value
- self.traceback = traceback
- super(RemoteError, self).__init__(exc_type=exc_type,
- value=value,
- traceback=traceback)
-
-
-class Timeout(exception.OpenstackException):
- """Signifies that a timeout has occurred.
-
- This exception is raised if the rpc_response_timeout is reached while
- waiting for a response from the remote side.
- """
- message = _("Timeout while waiting on RPC response.")
-
-
-class Connection(object):
- """A connection, returned by rpc.create_connection().
-
- This class represents a connection to the message bus used for rpc.
- An instance of this class should never be created by users of the rpc API.
- Use rpc.create_connection() instead.
- """
- def close(self):
- """Close the connection.
-
- This method must be called when the connection will no longer be used.
- It will ensure that any resources associated with the connection, such
- as a network connection, and cleaned up.
- """
- raise NotImplementedError()
-
- def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer on this connection.
-
- A consumer is associated with a message queue on the backend message
- bus. The consumer will read messages from the queue, unpack them, and
- dispatch them to the proxy object. The contents of the message pulled
- off of the queue will determine which method gets called on the proxy
- object.
-
- :param topic: This is a name associated with what to consume from.
- Multiple instances of a service may consume from the same
- topic. For example, all instances of nova-compute consume
- from a queue called "compute". In that case, the
- messages will get distributed amongst the consumers in a
- round-robin fashion if fanout=False. If fanout=True,
- every consumer associated with this topic will get a
- copy of every message.
- :param proxy: The object that will handle all incoming messages.
- :param fanout: Whether or not this is a fanout topic. See the
- documentation for the topic parameter for some
- additional comments on this.
- """
- raise NotImplementedError()
-
- def consume_in_thread(self):
- """Spawn a thread to handle incoming messages.
-
- Spawn a thread that will be responsible for handling all incoming
- messages for consumers that were set up on this connection.
-
- Message dispatching inside of this is expected to be implemented in a
- non-blocking manner. An example implementation would be having this
- thread pull messages in for all of the consumers, but utilize a thread
- pool for dispatching the messages to the proxy objects.
- """
- raise NotImplementedError()
-
-
-def _safe_log(log_func, msg, msg_data):
- """Sanitizes the msg_data field before logging."""
- SANITIZE = {
- 'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',),
- }
- method = msg_data['method']
- if method in SANITIZE:
- msg_data = copy.deepcopy(msg_data)
- args_to_sanitize = SANITIZE[method]
- for arg in args_to_sanitize:
- try:
- msg_data['args'][arg] = "<SANITIZED>"
- except KeyError:
- pass
-
- return log_func(msg, msg_data)
import eventlet
import greenlet
+from heat.openstack.common import rpc
from heat.openstack.common import cfg
from heat.openstack.common import importutils
from heat.openstack.common import log as logging
-from heat import rpc
from heat.common import utils as heat_utils
from heat.common import exception
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
+ rpc_dispatcher = self.manager.create_rpc_dispatcher()
+
# Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, self, fanout=False)
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, self, fanout=False)
+ self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
- self.conn.create_consumer(self.topic, self, fanout=True)
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier
+modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils
# The base module to hold the copy of openstack.common
base=heat