]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Import openstack.common.rpc
authorSteven Dake <sdake@redhat.com>
Tue, 17 Jul 2012 15:27:19 +0000 (08:27 -0700)
committerSteven Dake <sdake@redhat.com>
Tue, 17 Jul 2012 15:27:19 +0000 (08:27 -0700)
Use openstack.common routines.  One of the parameters changed to
create_consumer().

Thanks to Russell Bryant for assistance with sorting out that problem.

Change-Id: I4badc7ca22298cd0aafc57a2335b3d6801289be8
Signed-off-by: Steven Dake <sdake@redhat.com>
21 files changed:
bin/heat-api
bin/heat-engine
bin/heat-metadata
heat/api/v1/stacks.py
heat/common/config.py
heat/manager.py
heat/metadata/api/v1/metadata.py
heat/openstack/common/excutils.py [new file with mode: 0644]
heat/openstack/common/rpc/__init__.py [moved from heat/rpc/__init__.py with 58% similarity]
heat/openstack/common/rpc/amqp.py [moved from heat/rpc/amqp.py with 64% similarity]
heat/openstack/common/rpc/common.py [new file with mode: 0644]
heat/openstack/common/rpc/dispatcher.py [new file with mode: 0644]
heat/openstack/common/rpc/impl_fake.py [moved from heat/rpc/impl_fake.py with 74% similarity]
heat/openstack/common/rpc/impl_kombu.py [moved from heat/rpc/impl_kombu.py with 63% similarity]
heat/openstack/common/rpc/impl_qpid.py [moved from heat/rpc/impl_qpid.py with 60% similarity]
heat/openstack/common/rpc/impl_zmq.py [new file with mode: 0644]
heat/openstack/common/rpc/matchmaker.py [new file with mode: 0644]
heat/openstack/common/rpc/proxy.py [new file with mode: 0644]
heat/rpc/common.py [deleted file]
heat/service.py
openstack-common.conf

index 803191a8a31093cc6ff9db59bf45792db8f4e58f..e252e6042c02bb17011ec229902f96af922ad6e6 100755 (executable)
@@ -40,8 +40,6 @@ from paste import httpserver
 from heat.openstack.common import cfg
 from heat.openstack.common import log as logging
 
-from heat import rpc
-
 LOG = logging.getLogger('heat.api')
 
 if __name__ == '__main__':
@@ -49,7 +47,6 @@ if __name__ == '__main__':
         cfg.CONF(project='heat', prog='heat-api')
         config.setup_logging()
         config.register_api_opts()
-        rpc.configure()
 
         app = config.load_paste_app()
 
index 98d7a2fd177c4b43c5d1e6a7d7cf757d45138202..22f4ae9a4fc48f99f2da8aaa1bf32ecb812134e0 100755 (executable)
@@ -45,8 +45,6 @@ from heat.common import config
 from heat.common import utils
 from heat.db import api as db_api
 
-from heat import rpc
-
 logger = logging.getLogger('heat.engine')
 
 if __name__ == '__main__':
@@ -57,7 +55,6 @@ if __name__ == '__main__':
     config.setup_logging()
     config.register_engine_opts()
     db_api.configure()
-    rpc.configure()
 
     #utils.monkey_patch()
     server = service.Service.create(binary='heat-engine',
index addbd6b4ec594ec457a034840946e6501a722445..2a927e230c37f9dfd27658c6dfcd3ee3a5de1045 100755 (executable)
@@ -33,7 +33,7 @@ if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')):
 
 gettext.install('heat', unicode=1)
 
-from heat import rpc
+from heat.openstack.common import rpc
 from heat.common import config
 from heat.common import wsgi
 from heat.common import context
@@ -69,7 +69,6 @@ if __name__ == '__main__':
 
         config.setup_logging()
         config.register_metadata_opts()
-        rpc.configure()
 
         app = config.load_paste_app()
 
index 84c0a60b932afb36f7f7056cf00d556f10365496..4e133ecfab280ac6dd6e21c8d6fd9da368b35e55 100644 (file)
@@ -29,10 +29,10 @@ from heat.common import wsgi
 from heat.common import config
 from heat.common import context
 from heat import utils
-from heat import rpc
-import heat.rpc.common as rpc_common
 import heat.engine.api as engine_api
 
+from heat.openstack.common import rpc
+import heat.openstack.common.rpc.common as rpc_common
 from heat.openstack.common import log as logging
 
 logger = logging.getLogger('heat.api.v1.stacks')
index e260130504bde14488a7c6bd15cbdec6380ae23d..c5c5a0a6ca45728aa89d54986904c27daf025098 100644 (file)
@@ -29,6 +29,7 @@ import sys
 from heat import version
 from heat.common import wsgi
 from heat.openstack.common import cfg
+from heat.openstack.common import rpc
 
 DEFAULT_PORT = 8000
 
@@ -42,101 +43,6 @@ paste_deploy_opts = [
 bind_opts = [cfg.IntOpt('bind_port', default=8000),
              cfg.StrOpt('bind_host', default='127.0.0.1')]
 
-rpc_opts = [
-    cfg.StrOpt('rpc_backend',
-        default='heat.rpc.impl_qpid',
-        help="The messaging module to use, defaults to kombu."),
-    cfg.IntOpt('rpc_thread_pool_size',
-               default=1024,
-               help='Size of RPC thread pool'),
-    cfg.IntOpt('rpc_conn_pool_size',
-               default=30,
-               help='Size of RPC connection pool'),
-    cfg.IntOpt('rpc_response_timeout',
-               default=60,
-               help='Seconds to wait for a response from call or multicall'),
-    cfg.StrOpt('qpid_hostname',
-               default='localhost',
-               help='Qpid broker hostname'),
-    cfg.StrOpt('qpid_port',
-               default='5672',
-               help='Qpid broker port'),
-    cfg.StrOpt('qpid_username',
-               default='',
-               help='Username for qpid connection'),
-    cfg.StrOpt('qpid_password',
-               default='',
-               help='Password for qpid connection'),
-    cfg.StrOpt('qpid_sasl_mechanisms',
-               default='',
-               help='Space separated list of SASL mechanisms to use for auth'),
-    cfg.BoolOpt('qpid_reconnect',
-                default=True,
-                help='Automatically reconnect'),
-    cfg.IntOpt('qpid_reconnect_timeout',
-               default=0,
-               help='Reconnection timeout in seconds'),
-    cfg.IntOpt('qpid_reconnect_limit',
-               default=0,
-               help='Max reconnections before giving up'),
-    cfg.IntOpt('qpid_reconnect_interval_min',
-               default=0,
-               help='Minimum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval_max',
-               default=0,
-               help='Maximum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval',
-               default=0,
-               help='Equivalent to setting max and min to the same value'),
-    cfg.IntOpt('qpid_heartbeat',
-               default=5,
-               help='Seconds between connection keepalive heartbeats'),
-    cfg.StrOpt('qpid_protocol',
-               default='tcp',
-               help="Transport to use, either 'tcp' or 'ssl'"),
-    cfg.BoolOpt('qpid_tcp_nodelay',
-                default=True,
-                help='Disable Nagle algorithm'),
-    cfg.StrOpt('rabbit_host',
-               default='localhost',
-               help='the RabbitMQ host'),
-    cfg.IntOpt('rabbit_port',
-               default=5672,
-               help='the RabbitMQ port'),
-    cfg.BoolOpt('rabbit_use_ssl',
-                default=False,
-                help='connect over SSL for RabbitMQ'),
-    cfg.StrOpt('rabbit_userid',
-               default='guest',
-               help='the RabbitMQ userid'),
-    cfg.StrOpt('rabbit_password',
-               default='guest',
-               help='the RabbitMQ password'),
-    cfg.StrOpt('rabbit_virtual_host',
-               default='/',
-               help='the RabbitMQ virtual host'),
-    cfg.IntOpt('rabbit_retry_interval',
-               default=1,
-               help='how frequently to retry connecting with RabbitMQ'),
-    cfg.IntOpt('rabbit_retry_backoff',
-               default=2,
-               help='how long to backoff for between retries when connecting '
-                    'to RabbitMQ'),
-    cfg.IntOpt('rabbit_max_retries',
-               default=0,
-               help='maximum retries with trying to connect to RabbitMQ '
-                    '(the default of 0 implies an infinite retry count)'),
-    cfg.StrOpt('control_exchange',
-               default='heat-engine',
-               help='the main RabbitMQ exchange to connect to'),
-    cfg.BoolOpt('rabbit_durable_queues',
-                default=False,
-                help='use durable queues in RabbitMQ'),
-    cfg.BoolOpt('fake_rabbit',
-                default=False,
-                help='If passed, use a fake RabbitMQ provider'),
-]
-
 service_opts = [
 cfg.IntOpt('report_interval',
            default=10,
@@ -198,19 +104,16 @@ cfg.StrOpt('instance_driver',
 def register_metadata_opts():
     cfg.CONF.register_opts(service_opts)
     cfg.CONF.register_opts(bind_opts)
-    cfg.CONF.register_opts(rpc_opts)
 
 
 def register_api_opts():
     cfg.CONF.register_opts(bind_opts)
-    cfg.CONF.register_opts(rpc_opts)
 
 
 def register_engine_opts():
     cfg.CONF.register_opts(engine_opts)
     cfg.CONF.register_opts(db_opts)
     cfg.CONF.register_opts(service_opts)
-    cfg.CONF.register_opts(rpc_opts)
 
 
 def setup_logging():
index 050cc561acc030a571c56bcf65c3630c9f2eecc0..731353797d36fefea0035869e26c715a80c71eaa 100644 (file)
@@ -55,6 +55,7 @@ This module provides Manager, a base class for managers.
 
 from heat import version
 
+from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
 from heat.openstack.common import log as logging
 from heat.openstack.common import cfg
 
@@ -132,6 +133,14 @@ class Manager(object):
         self.host = host
         super(Manager, self).__init__(db_driver)
 
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return rpc_dispatcher.RpcDispatcher([self])
+
     def periodic_tasks(self, context, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
         for task_name, task in self._periodic_tasks:
index 57ff91459dd34a93345ae4da546277f9a70ba71e..5f913079d4404165674037da55f5da8294fed974 100644 (file)
@@ -19,7 +19,7 @@ from webob.exc import Response
 
 from heat.common import wsgi
 from heat.common import context
-from heat import rpc
+from heat.openstack.common import rpc
 
 
 def json_response(http_status, data):
diff --git a/heat/openstack/common/excutils.py b/heat/openstack/common/excutils.py
new file mode 100644 (file)
index 0000000..67c9fa9
--- /dev/null
@@ -0,0 +1,49 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# Copyright 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Exception related utilities.
+"""
+
+import contextlib
+import logging
+import sys
+import traceback
+
+
+@contextlib.contextmanager
+def save_and_reraise_exception():
+    """Save current exception, run some code and then re-raise.
+
+    In some cases the exception context can be cleared, resulting in None
+    being attempted to be reraised after an exception handler is run. This
+    can happen when eventlet switches greenthreads or when running an
+    exception handler, code raises and catches an exception. In both
+    cases the exception context will be cleared.
+
+    To work around this, we save the exception state, run handler code, and
+    then re-raise the original exception. If another exception occurs, the
+    saved exception is logged and the new exception is reraised.
+    """
+    type_, value, tb = sys.exc_info()
+    try:
+        yield
+    except Exception:
+        logging.error('Original exception being dropped: %s' %
+                      (traceback.format_exception(type_, value, tb)))
+        raise
+    raise type_, value, tb
similarity index 58%
rename from heat/rpc/__init__.py
rename to heat/openstack/common/rpc/__init__.py
index a1de541951c300957e0a05301361699edd026041..28a44c04dbcdbb7fe11783765cd3fabdd78366cc 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+    rpc.dispatcher
+    rpc.proxy
+"""
+
 from heat.openstack.common import cfg
 from heat.openstack.common import importutils
 
-from heat.openstack.common import log as logging
 
-LOG = logging.getLogger(__name__)
+rpc_opts = [
+    cfg.StrOpt('rpc_backend',
+               default='%s.impl_kombu' % __package__,
+               help="The messaging module to use, defaults to kombu."),
+    cfg.IntOpt('rpc_thread_pool_size',
+               default=64,
+               help='Size of RPC thread pool'),
+    cfg.IntOpt('rpc_conn_pool_size',
+               default=30,
+               help='Size of RPC connection pool'),
+    cfg.IntOpt('rpc_response_timeout',
+               default=60,
+               help='Seconds to wait for a response from call or multicall'),
+    cfg.IntOpt('rpc_cast_timeout',
+               default=30,
+               help='Seconds to wait before a cast expires (TTL). '
+                    'Only supported by impl_zmq.'),
+    cfg.ListOpt('allowed_rpc_exception_modules',
+                default=['heat.openstack.common.exception',
+                         'nova.exception',
+                         ],
+                help='Modules of exceptions that are permitted to be recreated'
+                     'upon receiving exception data from an rpc call.'),
+    cfg.StrOpt('control_exchange',
+               default='nova',
+               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+    cfg.BoolOpt('fake_rabbit',
+                default=False,
+                help='If passed, use a fake RabbitMQ provider'),
+]
+
+cfg.CONF.register_opts(rpc_opts)
 
 
 def create_connection(new=True):
@@ -36,9 +74,9 @@ def create_connection(new=True):
                 implementation is free to return an existing connection from a
                 pool.
 
-    :returns: An instance of nova.rpc.common.Connection
+    :returns: An instance of openstack.common.rpc.common.Connection
     """
-    return _get_impl().create_connection(new=new)
+    return _get_impl().create_connection(cfg.CONF, new=new)
 
 
 def call(context, topic, msg, timeout=None):
@@ -48,8 +86,9 @@ def call(context, topic, msg, timeout=None):
                     request.
     :param topic: The topic to send the rpc message to.  This correlates to the
                   topic argument of
-                  nova.rpc.common.Connection.create_consumer() and only applies
-                  when the consumer was created with fanout=False.
+                  openstack.common.rpc.common.Connection.create_consumer()
+                  and only applies when the consumer was created with
+                  fanout=False.
     :param msg: This is a dict in the form { "method" : "method_to_invoke",
                                              "args" : dict_of_kwargs }
     :param timeout: int, number of seconds to use for a response timeout.
@@ -57,10 +96,10 @@ def call(context, topic, msg, timeout=None):
 
     :returns: A dict from the remote method.
 
-    :raises: nova.rpc.common.Timeout if a complete response is not received
-             before the timeout is reached.
+    :raises: openstack.common.rpc.common.Timeout if a complete response
+             is not received before the timeout is reached.
     """
-    return _get_impl().call(context, topic, msg, timeout)
+    return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
 
 
 def cast(context, topic, msg):
@@ -70,14 +109,15 @@ def cast(context, topic, msg):
                     request.
     :param topic: The topic to send the rpc message to.  This correlates to the
                   topic argument of
-                  nova.rpc.common.Connection.create_consumer() and only applies
-                  when the consumer was created with fanout=False.
+                  openstack.common.rpc.common.Connection.create_consumer()
+                  and only applies when the consumer was created with
+                  fanout=False.
     :param msg: This is a dict in the form { "method" : "method_to_invoke",
                                              "args" : dict_of_kwargs }
 
     :returns: None
     """
-    return _get_impl().cast(context, topic, msg)
+    return _get_impl().cast(cfg.CONF, context, topic, msg)
 
 
 def fanout_cast(context, topic, msg):
@@ -90,14 +130,15 @@ def fanout_cast(context, topic, msg):
                     request.
     :param topic: The topic to send the rpc message to.  This correlates to the
                   topic argument of
-                  nova.rpc.common.Connection.create_consumer() and only applies
-                  when the consumer was created with fanout=True.
+                  openstack.common.rpc.common.Connection.create_consumer()
+                  and only applies when the consumer was created with
+                  fanout=True.
     :param msg: This is a dict in the form { "method" : "method_to_invoke",
                                              "args" : dict_of_kwargs }
 
     :returns: None
     """
-    return _get_impl().fanout_cast(context, topic, msg)
+    return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
 
 
 def multicall(context, topic, msg, timeout=None):
@@ -111,8 +152,9 @@ def multicall(context, topic, msg, timeout=None):
                     request.
     :param topic: The topic to send the rpc message to.  This correlates to the
                   topic argument of
-                  nova.rpc.common.Connection.create_consumer() and only applies
-                  when the consumer was created with fanout=False.
+                  openstack.common.rpc.common.Connection.create_consumer()
+                  and only applies when the consumer was created with
+                  fanout=False.
     :param msg: This is a dict in the form { "method" : "method_to_invoke",
                                              "args" : dict_of_kwargs }
     :param timeout: int, number of seconds to use for a response timeout.
@@ -123,10 +165,10 @@ def multicall(context, topic, msg, timeout=None):
               returned and X is the Nth value that was returned by the remote
               method.
 
-    :raises: nova.rpc.common.Timeout if a complete response is not received
-             before the timeout is reached.
+    :raises: openstack.common.rpc.common.Timeout if a complete response
+             is not received before the timeout is reached.
     """
-    return _get_impl().multicall(context, topic, msg, timeout)
+    return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
 
 
 def notify(context, topic, msg):
@@ -139,7 +181,7 @@ def notify(context, topic, msg):
 
     :returns: None
     """
-    return _get_impl().notify(context, topic, msg)
+    return _get_impl().notify(cfg.CONF, context, topic, msg)
 
 
 def cleanup():
@@ -167,7 +209,8 @@ def cast_to_server(context, server_params, topic, msg):
 
     :returns: None
     """
-    return _get_impl().cast_to_server(context, server_params, topic, msg)
+    return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+                                      msg)
 
 
 def fanout_cast_to_server(context, server_params, topic, msg):
@@ -182,25 +225,40 @@ def fanout_cast_to_server(context, server_params, topic, msg):
 
     :returns: None
     """
-    return _get_impl().fanout_cast_to_server(context, server_params, topic,
-            msg)
+    return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+                                             topic, msg)
 
 
-_RPCIMPL = None
+def queue_get_for(context, topic, host):
+    """Get a queue name for a given topic + host.
 
+    This function only works if this naming convention is followed on the
+    consumer side, as well.  For example, in nova, every instance of the
+    nova-foo service calls create_consumer() for two topics:
 
-def configure():
-    """Delay import of rpc_backend until FLAGS are loaded."""
-    LOG.debug(_("Configuring RPC %s") % cfg.CONF.rpc_backend)
+        foo
+        foo.<host>
 
-    global _RPCIMPL
-    _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+    Messages sent to the 'foo' topic are distributed to exactly one instance of
+    the nova-foo service.  The services are chosen in a round-robin fashion.
+    Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
+    <host>.
+    """
+    return '%s.%s' % (topic, host)
+
+
+_RPCIMPL = None
 
 
 def _get_impl():
-    """Delay import of rpc_backend until FLAGS are loaded."""
+    """Delay import of rpc_backend until configuration is loaded."""
     global _RPCIMPL
     if _RPCIMPL is None:
-        LOG.error(_("RPC not configured."))
-
+        try:
+            _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+        except ImportError:
+            # For backwards compatibility with older nova config.
+            impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+                                                'nova.openstack.common.rpc')
+            _RPCIMPL = importutils.import_module(impl)
     return _RPCIMPL
similarity index 64%
rename from heat/rpc/amqp.py
rename to heat/openstack/common/rpc/amqp.py
index 781e6443e663e91d3b5042883d733e1e2e9b7b9f..25df5f99c43f079f2302b2cad9bb3de618ac4a06 100644 (file)
@@ -18,7 +18,7 @@
 #    under the License.
 
 """
-Shared code between AMQP based nova.rpc implementations.
+Shared code between AMQP based openstack.common.rpc implementations.
 
 The code in this module is shared between the rpc implemenations based on AMQP.
 Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
@@ -26,68 +26,76 @@ AMQP, but is deprecated and predates this code.
 """
 
 import inspect
+import logging
 import sys
-import traceback
 import uuid
 
-from heat.openstack.common import log as logging
-
 from eventlet import greenpool
 from eventlet import pools
+from eventlet import semaphore
 
-from heat.common import context
-from heat.common import exception
-from heat.common import config
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _
 from heat.openstack.common import local
-from heat.openstack.common import cfg
+from heat.openstack.common.rpc import common as rpc_common
 
-import heat.rpc.common as rpc_common
 
 LOG = logging.getLogger(__name__)
 
 
 class Pool(pools.Pool):
     """Class that implements a Pool of Connections."""
-    def __init__(self, *args, **kwargs):
-        self.connection_cls = kwargs.pop("connection_cls", None)
-        kwargs.setdefault("max_size", cfg.CONF.rpc_conn_pool_size)
+    def __init__(self, conf, connection_cls, *args, **kwargs):
+        self.connection_cls = connection_cls
+        self.conf = conf
+        kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
         kwargs.setdefault("order_as_stack", True)
         super(Pool, self).__init__(*args, **kwargs)
 
     # TODO(comstud): Timeout connections not used in a while
     def create(self):
         LOG.debug('Pool creating new connection')
-        return self.connection_cls()
+        return self.connection_cls(self.conf)
 
     def empty(self):
         while self.free_items:
-            item = self.get()
-            try:
-                item.close()
-            except Exception:
-                pass
+            self.get().close()
+
+
+_pool_create_sem = semaphore.Semaphore()
+
+
+def get_connection_pool(conf, connection_cls):
+    with _pool_create_sem:
+        # Make sure only one thread tries to create the connection pool.
+        if not connection_cls.pool:
+            connection_cls.pool = Pool(conf, connection_cls)
+    return connection_cls.pool
 
 
 class ConnectionContext(rpc_common.Connection):
     """The class that is actually returned to the caller of
-    create_connection().  This is a essentially a wrapper around
-    Connection that supports 'with' and can return a new Connection or
-    one from a pool.  It will also catch when an instance of this class
-    is to be deleted so that we can return Connections to the pool on
-    exceptions and so forth without making the caller be responsible for
-    catching all exceptions and making sure to return a connection to
-    the pool.
+    create_connection().  This is essentially a wrapper around
+    Connection that supports 'with'.  It can also return a new
+    Connection, or one from a pool.  The function will also catch
+    when an instance of this class is to be deleted.  With that
+    we can return Connections to the pool on exceptions and so
+    forth without making the caller be responsible for catching
+    them.  If possible the function makes sure to return a
+    connection to the pool.
     """
 
-    def __init__(self, connection_pool, pooled=True, server_params=None):
+    def __init__(self, conf, connection_pool, pooled=True, server_params=None):
         """Create a new connection, or get one from the pool"""
         self.connection = None
+        self.conf = conf
         self.connection_pool = connection_pool
         if pooled:
             self.connection = connection_pool.get()
         else:
             self.connection = connection_pool.connection_cls(
-                    server_params=server_params)
+                conf,
+                server_params=server_params)
         self.pooled = pooled
 
     def __enter__(self):
@@ -126,6 +134,9 @@ class ConnectionContext(rpc_common.Connection):
     def create_consumer(self, topic, proxy, fanout=False):
         self.connection.create_consumer(topic, proxy, fanout)
 
+    def create_worker(self, topic, proxy, pool_name):
+        self.connection.create_worker(topic, proxy, pool_name)
+
     def consume_in_thread(self):
         self.connection.consume_in_thread()
 
@@ -134,50 +145,54 @@ class ConnectionContext(rpc_common.Connection):
         if self.connection:
             return getattr(self.connection, key)
         else:
-            raise exception.InvalidRPCConnectionReuse()
+            raise rpc_common.InvalidRPCConnectionReuse()
 
 
-def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
+def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+              ending=False):
     """Sends a reply or an error on the channel signified by msg_id.
 
     Failure should be a sys.exc_info() tuple.
 
     """
-    with ConnectionContext(connection_pool) as conn:
+    with ConnectionContext(conf, connection_pool) as conn:
         if failure:
-            message = str(failure[1])
-            tb = ''.join(traceback.format_exception(*failure))
-            LOG.error(_("Returning exception %s to caller"), message)
-            LOG.error(tb)
-            failure = (failure[0].__name__, str(failure[1]), tb)
+            failure = rpc_common.serialize_remote_exception(failure)
 
         try:
             msg = {'result': reply, 'failure': failure}
         except TypeError:
             msg = {'result': dict((k, repr(v))
-                            for k, v in reply.__dict__.iteritems()),
-                    'failure': failure}
+                   for k, v in reply.__dict__.iteritems()),
+                   'failure': failure}
         if ending:
             msg['ending'] = True
         conn.direct_send(msg_id, msg)
 
 
-class RpcContext(context.RequestContext):
+class RpcContext(rpc_common.CommonRpcContext):
     """Context that supports replying to a rpc.call"""
-    def __init__(self, *args, **kwargs):
+    def __init__(self, **kwargs):
         self.msg_id = kwargs.pop('msg_id', None)
-        super(RpcContext, self).__init__(*args, **kwargs)
+        self.conf = kwargs.pop('conf')
+        super(RpcContext, self).__init__(**kwargs)
+
+    def deepcopy(self):
+        values = self.to_dict()
+        values['conf'] = self.conf
+        values['msg_id'] = self.msg_id
+        return self.__class__(**values)
 
     def reply(self, reply=None, failure=None, ending=False,
               connection_pool=None):
         if self.msg_id:
-            msg_reply(self.msg_id, connection_pool, reply, failure,
+            msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
                       ending)
             if ending:
                 self.msg_id = None
 
 
-def unpack_context(msg):
+def unpack_context(conf, msg):
     """Unpack context from msg."""
     context_dict = {}
     for key in list(msg.keys()):
@@ -188,8 +203,9 @@ def unpack_context(msg):
             value = msg.pop(key)
             context_dict[key[9:]] = value
     context_dict['msg_id'] = msg.pop('_msg_id', None)
+    context_dict['conf'] = conf
     ctx = RpcContext.from_dict(context_dict)
-    LOG.debug(_('unpacked context: %s'), ctx.to_dict())
+    rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
     return ctx
 
 
@@ -210,10 +226,11 @@ def pack_context(msg, context):
 class ProxyCallback(object):
     """Calls methods on a proxy object based on method and args."""
 
-    def __init__(self, proxy, connection_pool):
+    def __init__(self, conf, proxy, connection_pool):
         self.proxy = proxy
-        self.pool = greenpool.GreenPool(cfg.CONF.rpc_thread_pool_size)
+        self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
         self.connection_pool = connection_pool
+        self.conf = conf
 
     def __call__(self, message_data):
         """Consumer callback to call a method on a proxy object.
@@ -233,27 +250,29 @@ class ProxyCallback(object):
         if hasattr(local.store, 'context'):
             del local.store.context
         rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
-        ctxt = unpack_context(message_data)
+        ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
         args = message_data.get('args', {})
+        version = message_data.get('version', None)
         if not method:
             LOG.warn(_('no method for message: %s') % message_data)
             ctxt.reply(_('No method for message: %s') % message_data,
                        connection_pool=self.connection_pool)
             return
-        self.pool.spawn_n(self._process_data, ctxt, method, args)
+        self.pool.spawn_n(self._process_data, ctxt, version, method, args)
 
-    @exception.wrap_exception()
-    def _process_data(self, ctxt, method, args):
-        """Thread that magically looks for a method on the proxy
-        object and calls it.
+    def _process_data(self, ctxt, version, method, args):
+        """Process a message in a new thread.
+
+        If the proxy object we have has a dispatch method
+        (see rpc.dispatcher.RpcDispatcher), pass it the version,
+        method, and args and let it dispatch as appropriate.  If not, use
+        the old behavior of magically calling the specified method on the
+        proxy we have here.
         """
         ctxt.update_store()
         try:
-            node_func = getattr(self.proxy, str(method))
-            node_args = dict((str(k), v) for k, v in args.iteritems())
-            # NOTE(vish): magic is fun!
-            rval = node_func(context=ctxt, **node_args)
+            rval = self.proxy.dispatch(ctxt, version, method, **args)
             # Check if the result was a generator
             if inspect.isgenerator(rval):
                 for x in rval:
@@ -266,18 +285,17 @@ class ProxyCallback(object):
             LOG.exception('Exception during message handling')
             ctxt.reply(None, sys.exc_info(),
                        connection_pool=self.connection_pool)
-        return
 
 
 class MulticallWaiter(object):
-    def __init__(self, connection, timeout):
+    def __init__(self, conf, connection, timeout):
         self._connection = connection
-        self._iterator = connection.iterconsume(
-                                timeout=timeout or
-                                cfg.CONF.rpc_response_timeout)
+        self._iterator = connection.iterconsume(timeout=timeout or
+                                                conf.rpc_response_timeout)
         self._result = None
         self._done = False
         self._got_ending = False
+        self._conf = conf
 
     def done(self):
         if self._done:
@@ -290,7 +308,10 @@ class MulticallWaiter(object):
     def __call__(self, data):
         """The consume() callback will call this.  Store the result."""
         if data['failure']:
-            self._result = rpc_common.RemoteError(*data['failure'])
+            failure = data['failure']
+            self._result = rpc_common.deserialize_remote_exception(self._conf,
+                                                                   failure)
+
         elif data.get('ending', False):
             self._got_ending = True
         else:
@@ -301,7 +322,11 @@ class MulticallWaiter(object):
         if self._done:
             raise StopIteration
         while True:
-            self._iterator.next()
+            try:
+                self._iterator.next()
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    self.done()
             if self._got_ending:
                 self.done()
                 raise StopIteration
@@ -312,12 +337,12 @@ class MulticallWaiter(object):
             yield result
 
 
-def create_connection(new, connection_pool):
+def create_connection(conf, new, connection_pool):
     """Create a connection"""
-    return ConnectionContext(connection_pool, pooled=not new)
+    return ConnectionContext(conf, connection_pool, pooled=not new)
 
 
-def multicall(context, topic, msg, timeout, connection_pool):
+def multicall(conf, context, topic, msg, timeout, connection_pool):
     """Make a call that returns multiple times."""
     # Can't use 'with' for multicall, as it returns an iterator
     # that will continue to use the connection.  When it's done,
@@ -329,16 +354,16 @@ def multicall(context, topic, msg, timeout, connection_pool):
     LOG.debug(_('MSG_ID is %s') % (msg_id))
     pack_context(msg, context)
 
-    conn = ConnectionContext(connection_pool)
-    wait_msg = MulticallWaiter(conn, timeout)
+    conn = ConnectionContext(conf, connection_pool)
+    wait_msg = MulticallWaiter(conf, conn, timeout)
     conn.declare_direct_consumer(msg_id, wait_msg)
     conn.topic_send(topic, msg)
     return wait_msg
 
 
-def call(context, topic, msg, timeout, connection_pool):
+def call(conf, context, topic, msg, timeout, connection_pool):
     """Sends a message on a topic and wait for a response."""
-    rv = multicall(context, topic, msg, timeout, connection_pool)
+    rv = multicall(conf, context, topic, msg, timeout, connection_pool)
     # NOTE(vish): return the last result from the multicall
     rv = list(rv)
     if not rv:
@@ -346,46 +371,48 @@ def call(context, topic, msg, timeout, connection_pool):
     return rv[-1]
 
 
-def cast(context, topic, msg, connection_pool):
+def cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a topic without waiting for a response."""
     LOG.debug(_('Making asynchronous cast on %s...'), topic)
     pack_context(msg, context)
-    with ConnectionContext(connection_pool) as conn:
+    with ConnectionContext(conf, connection_pool) as conn:
         conn.topic_send(topic, msg)
 
 
-def fanout_cast(context, topic, msg, connection_pool):
+def fanout_cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a fanout exchange without waiting for a response."""
     LOG.debug(_('Making asynchronous fanout cast...'))
     pack_context(msg, context)
-    with ConnectionContext(connection_pool) as conn:
+    with ConnectionContext(conf, connection_pool) as conn:
         conn.fanout_send(topic, msg)
 
 
-def cast_to_server(context, server_params, topic, msg, connection_pool):
+def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     """Sends a message on a topic to a specific server."""
     pack_context(msg, context)
-    with ConnectionContext(connection_pool, pooled=False,
-            server_params=server_params) as conn:
+    with ConnectionContext(conf, connection_pool, pooled=False,
+                           server_params=server_params) as conn:
         conn.topic_send(topic, msg)
 
 
-def fanout_cast_to_server(context, server_params, topic, msg,
-        connection_pool):
+def fanout_cast_to_server(conf, context, server_params, topic, msg,
+                          connection_pool):
     """Sends a message on a fanout exchange to a specific server."""
     pack_context(msg, context)
-    with ConnectionContext(connection_pool, pooled=False,
-            server_params=server_params) as conn:
+    with ConnectionContext(conf, connection_pool, pooled=False,
+                           server_params=server_params) as conn:
         conn.fanout_send(topic, msg)
 
 
-def notify(context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool):
     """Sends a notification event on a topic."""
-    LOG.debug(_('Sending notification on %s...'), topic)
+    event_type = msg.get('event_type')
+    LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
     pack_context(msg, context)
-    with ConnectionContext(connection_pool) as conn:
+    with ConnectionContext(conf, connection_pool) as conn:
         conn.notify_send(topic, msg)
 
 
 def cleanup(connection_pool):
-    connection_pool.empty()
+    if connection_pool:
+        connection_pool.empty()
diff --git a/heat/openstack/common/rpc/common.py b/heat/openstack/common/rpc/common.py
new file mode 100644 (file)
index 0000000..e559c6f
--- /dev/null
@@ -0,0 +1,315 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+import logging
+import sys
+import traceback
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
+from heat.openstack.common import jsonutils
+from heat.openstack.common import local
+
+
+LOG = logging.getLogger(__name__)
+
+
+class RPCException(Exception):
+    message = _("An unknown RPC related exception occurred.")
+
+    def __init__(self, message=None, **kwargs):
+        self.kwargs = kwargs
+
+        if not message:
+            try:
+                message = self.message % kwargs
+
+            except Exception as e:
+                # kwargs doesn't match a variable in the message
+                # log the issue and the kwargs
+                LOG.exception(_('Exception in string format operation'))
+                for name, value in kwargs.iteritems():
+                    LOG.error("%s: %s" % (name, value))
+                # at least get the core message out if something happened
+                message = self.message
+
+        super(RPCException, self).__init__(message)
+
+
+class RemoteError(RPCException):
+    """Signifies that a remote class has raised an exception.
+
+    Contains a string representation of the type of the original exception,
+    the value of the original exception, and the traceback.  These are
+    sent to the parent as a joined string so printing the exception
+    contains all of the relevant info.
+
+    """
+    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+
+    def __init__(self, exc_type=None, value=None, traceback=None):
+        self.exc_type = exc_type
+        self.value = value
+        self.traceback = traceback
+        super(RemoteError, self).__init__(exc_type=exc_type,
+                                          value=value,
+                                          traceback=traceback)
+
+
+class Timeout(RPCException):
+    """Signifies that a timeout has occurred.
+
+    This exception is raised if the rpc_response_timeout is reached while
+    waiting for a response from the remote side.
+    """
+    message = _("Timeout while waiting on RPC response.")
+
+
+class InvalidRPCConnectionReuse(RPCException):
+    message = _("Invalid reuse of an RPC connection.")
+
+
+class UnsupportedRpcVersion(RPCException):
+    message = _("Specified RPC version, %(version)s, not supported by "
+                "this endpoint.")
+
+
+class Connection(object):
+    """A connection, returned by rpc.create_connection().
+
+    This class represents a connection to the message bus used for rpc.
+    An instance of this class should never be created by users of the rpc API.
+    Use rpc.create_connection() instead.
+    """
+    def close(self):
+        """Close the connection.
+
+        This method must be called when the connection will no longer be used.
+        It will ensure that any resources associated with the connection, such
+        as a network connection, and cleaned up.
+        """
+        raise NotImplementedError()
+
+    def create_consumer(self, conf, topic, proxy, fanout=False):
+        """Create a consumer on this connection.
+
+        A consumer is associated with a message queue on the backend message
+        bus.  The consumer will read messages from the queue, unpack them, and
+        dispatch them to the proxy object.  The contents of the message pulled
+        off of the queue will determine which method gets called on the proxy
+        object.
+
+        :param conf:  An openstack.common.cfg configuration object.
+        :param topic: This is a name associated with what to consume from.
+                      Multiple instances of a service may consume from the same
+                      topic. For example, all instances of nova-compute consume
+                      from a queue called "compute".  In that case, the
+                      messages will get distributed amongst the consumers in a
+                      round-robin fashion if fanout=False.  If fanout=True,
+                      every consumer associated with this topic will get a
+                      copy of every message.
+        :param proxy: The object that will handle all incoming messages.
+        :param fanout: Whether or not this is a fanout topic.  See the
+                       documentation for the topic parameter for some
+                       additional comments on this.
+        """
+        raise NotImplementedError()
+
+    def create_worker(self, conf, topic, proxy, pool_name):
+        """Create a worker on this connection.
+
+        A worker is like a regular consumer of messages directed to a
+        topic, except that it is part of a set of such consumers (the
+        "pool") which may run in parallel. Every pool of workers will
+        receive a given message, but only one worker in the pool will
+        be asked to process it. Load is distributed across the members
+        of the pool in round-robin fashion.
+
+        :param conf:  An openstack.common.cfg configuration object.
+        :param topic: This is a name associated with what to consume from.
+                      Multiple instances of a service may consume from the same
+                      topic.
+        :param proxy: The object that will handle all incoming messages.
+        :param pool_name: String containing the name of the pool of workers
+        """
+        raise NotImplementedError()
+
+    def consume_in_thread(self):
+        """Spawn a thread to handle incoming messages.
+
+        Spawn a thread that will be responsible for handling all incoming
+        messages for consumers that were set up on this connection.
+
+        Message dispatching inside of this is expected to be implemented in a
+        non-blocking manner.  An example implementation would be having this
+        thread pull messages in for all of the consumers, but utilize a thread
+        pool for dispatching the messages to the proxy objects.
+        """
+        raise NotImplementedError()
+
+
+def _safe_log(log_func, msg, msg_data):
+    """Sanitizes the msg_data field before logging."""
+    SANITIZE = {'set_admin_password': ('new_pass',),
+                'run_instance': ('admin_password',), }
+
+    has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
+    has_context_token = '_context_auth_token' in msg_data
+    has_token = 'auth_token' in msg_data
+
+    if not any([has_method, has_context_token, has_token]):
+        return log_func(msg, msg_data)
+
+    msg_data = copy.deepcopy(msg_data)
+
+    if has_method:
+        method = msg_data['method']
+        if method in SANITIZE:
+            args_to_sanitize = SANITIZE[method]
+            for arg in args_to_sanitize:
+                try:
+                    msg_data['args'][arg] = "<SANITIZED>"
+                except KeyError:
+                    pass
+
+    if has_context_token:
+        msg_data['_context_auth_token'] = '<SANITIZED>'
+
+    if has_token:
+        msg_data['auth_token'] = '<SANITIZED>'
+
+    return log_func(msg, msg_data)
+
+
+def serialize_remote_exception(failure_info):
+    """Prepares exception data to be sent over rpc.
+
+    Failure_info should be a sys.exc_info() tuple.
+
+    """
+    tb = traceback.format_exception(*failure_info)
+    failure = failure_info[1]
+    LOG.error(_("Returning exception %s to caller"), unicode(failure))
+    LOG.error(tb)
+
+    kwargs = {}
+    if hasattr(failure, 'kwargs'):
+        kwargs = failure.kwargs
+
+    data = {
+        'class': str(failure.__class__.__name__),
+        'module': str(failure.__class__.__module__),
+        'message': unicode(failure),
+        'tb': tb,
+        'args': failure.args,
+        'kwargs': kwargs
+    }
+
+    json_data = jsonutils.dumps(data)
+
+    return json_data
+
+
+def deserialize_remote_exception(conf, data):
+    failure = jsonutils.loads(str(data))
+
+    trace = failure.get('tb', [])
+    message = failure.get('message', "") + "\n" + "\n".join(trace)
+    name = failure.get('class')
+    module = failure.get('module')
+
+    # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
+    # order to prevent arbitrary code execution.
+    if not module in conf.allowed_rpc_exception_modules:
+        return RemoteError(name, failure.get('message'), trace)
+
+    try:
+        mod = importutils.import_module(module)
+        klass = getattr(mod, name)
+        if not issubclass(klass, Exception):
+            raise TypeError("Can only deserialize Exceptions")
+
+        failure = klass(**failure.get('kwargs', {}))
+    except (AttributeError, TypeError, ImportError):
+        return RemoteError(name, failure.get('message'), trace)
+
+    ex_type = type(failure)
+    str_override = lambda self: message
+    new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+                       {'__str__': str_override, '__unicode__': str_override})
+    try:
+        # NOTE(ameade): Dynamically create a new exception type and swap it in
+        # as the new type for the exception. This only works on user defined
+        # Exceptions and not core python exceptions. This is important because
+        # we cannot necessarily change an exception message so we must override
+        # the __str__ method.
+        failure.__class__ = new_ex_type
+    except TypeError as e:
+        # NOTE(ameade): If a core exception then just add the traceback to the
+        # first exception argument.
+        failure.args = (message,) + failure.args[1:]
+    return failure
+
+
+class CommonRpcContext(object):
+    def __init__(self, **kwargs):
+        self.values = kwargs
+
+    def __getattr__(self, key):
+        try:
+            return self.values[key]
+        except KeyError:
+            raise AttributeError(key)
+
+    def to_dict(self):
+        return copy.deepcopy(self.values)
+
+    @classmethod
+    def from_dict(cls, values):
+        return cls(**values)
+
+    def deepcopy(self):
+        return self.from_dict(self.to_dict())
+
+    def update_store(self):
+        local.store.context = self
+
+    def elevated(self, read_deleted=None, overwrite=False):
+        """Return a version of this context with admin flag set."""
+        # TODO(russellb) This method is a bit of a nova-ism.  It makes
+        # some assumptions about the data in the request context sent
+        # across rpc, while the rest of this class does not.  We could get
+        # rid of this if we changed the nova code that uses this to
+        # convert the RpcContext back to its native RequestContext doing
+        # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
+
+        context = self.deepcopy()
+        context.values['is_admin'] = True
+
+        context.values.setdefault('roles', [])
+
+        if 'admin' not in context.values['roles']:
+            context.values['roles'].append('admin')
+
+        if read_deleted is not None:
+            context.values['read_deleted'] = read_deleted
+
+        return context
diff --git a/heat/openstack/common/rpc/dispatcher.py b/heat/openstack/common/rpc/dispatcher.py
new file mode 100644 (file)
index 0000000..7d1a5a5
--- /dev/null
@@ -0,0 +1,150 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them.  RPC API
+version numbers are in the form:
+
+    Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+    A = X
+
+    B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API.  A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time.  However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+
+
+EXAMPLES:
+
+Nova was the first project to use versioned rpc APIs.  Consider the compute rpc
+API as an example.  The client side is in nova/compute/rpcapi.py and the server
+side is in nova/compute/manager.py.
+
+
+Example 1) Adding a new method.
+
+Adding a new method is a backwards compatible change.  It should be added to
+nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
+X.Y+1.  On the client side, the new method in nova/compute/rpcapi.py should
+have a specific version specified to indicate the minimum API version that must
+be implemented for the method to be supported.  For example:
+
+    def get_host_uptime(self, ctxt, host):
+        topic = _compute_topic(self.topic, ctxt, host, None)
+        return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
+                version='1.1')
+
+In this case, version '1.1' is the first version that supported the
+get_host_uptime() method.
+
+
+Example 2) Adding a new parameter.
+
+Adding a new parameter to an rpc method can be made backwards compatible.  The
+RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
+The implementation of the method must not expect the parameter to be present.
+
+    def some_remote_method(self, arg1, arg2, newarg=None):
+        # The code needs to deal with newarg=None for cases
+        # where an older client sends a message without it.
+        pass
+
+On the client side, the same changes should be made as in example 1.  The
+minimum version that supports the new parameter should be specified.
+"""
+
+from heat.openstack.common.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+    """Dispatch rpc messages according to the requested API version.
+
+    This class can be used as the top level 'manager' for a service.  It
+    contains a list of underlying managers that have an API_VERSION attribute.
+    """
+
+    def __init__(self, callbacks):
+        """Initialize the rpc dispatcher.
+
+        :param callbacks: List of proxy objects that are an instance
+                          of a class with rpc methods exposed.  Each proxy
+                          object should have an RPC_API_VERSION attribute.
+        """
+        self.callbacks = callbacks
+        super(RpcDispatcher, self).__init__()
+
+    @staticmethod
+    def _is_compatible(mversion, version):
+        """Determine whether versions are compatible.
+
+        :param mversion: The API version implemented by a callback.
+        :param version: The API version requested by an incoming message.
+        """
+        version_parts = version.split('.')
+        mversion_parts = mversion.split('.')
+        if int(version_parts[0]) != int(mversion_parts[0]):  # Major
+            return False
+        if int(version_parts[1]) > int(mversion_parts[1]):  # Minor
+            return False
+        return True
+
+    def dispatch(self, ctxt, version, method, **kwargs):
+        """Dispatch a message based on a requested version.
+
+        :param ctxt: The request context
+        :param version: The requested API version from the incoming message
+        :param method: The method requested to be called by the incoming
+                       message.
+        :param kwargs: A dict of keyword arguments to be passed to the method.
+
+        :returns: Whatever is returned by the underlying method that gets
+                  called.
+        """
+        if not version:
+            version = '1.0'
+
+        had_compatible = False
+        for proxyobj in self.callbacks:
+            if hasattr(proxyobj, 'RPC_API_VERSION'):
+                rpc_api_version = proxyobj.RPC_API_VERSION
+            else:
+                rpc_api_version = '1.0'
+            is_compatible = self._is_compatible(rpc_api_version, version)
+            had_compatible = had_compatible or is_compatible
+            if not hasattr(proxyobj, method):
+                continue
+            if is_compatible:
+                return getattr(proxyobj, method)(ctxt, **kwargs)
+
+        if had_compatible:
+            raise AttributeError("No such RPC function '%s'" % method)
+        else:
+            raise rpc_common.UnsupportedRpcVersion(version=version)
similarity index 74%
rename from heat/rpc/impl_fake.py
rename to heat/openstack/common/rpc/impl_fake.py
index 995f47e111cd01f296dc35f490f2d2504b82dc70..c2935960cd8cb33cb9e25e786da2ba65d008a0f6 100644 (file)
@@ -18,29 +18,29 @@ queues.  Casts will block, but this is very useful for tests.
 """
 
 import inspect
-import json
-import signal
-import sys
 import time
-import traceback
 
 import eventlet
 
-from heat.common import context
-from heat.common import config
-from heat.rpc import common as rpc_common
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import common as rpc_common
 
 CONSUMERS = {}
 
-FLAGS = config.FLAGS
 
-
-class RpcContext(context.RequestContext):
-    def __init__(self, *args, **kwargs):
-        super(RpcContext, self).__init__(*args, **kwargs)
+class RpcContext(rpc_common.CommonRpcContext):
+    def __init__(self, **kwargs):
+        super(RpcContext, self).__init__(**kwargs)
         self._response = []
         self._done = False
 
+    def deepcopy(self):
+        values = self.to_dict()
+        new_inst = self.__class__(**values)
+        new_inst._response = self._response
+        new_inst._done = self._done
+        return new_inst
+
     def reply(self, reply=None, failure=None, ending=False):
         if ending:
             self._done = True
@@ -53,15 +53,13 @@ class Consumer(object):
         self.topic = topic
         self.proxy = proxy
 
-    def call(self, context, method, args, timeout):
-        node_func = getattr(self.proxy, method)
-        node_args = dict((str(k), v) for k, v in args.iteritems())
+    def call(self, context, version, method, args, timeout):
         done = eventlet.event.Event()
 
         def _inner():
             ctxt = RpcContext.from_dict(context.to_dict())
             try:
-                rval = node_func(context=ctxt, **node_args)
+                rval = self.proxy.dispatch(context, version, method, **args)
                 res = []
                 # Caller might have called ctxt.reply() manually
                 for (reply, failure) in ctxt._response:
@@ -77,12 +75,8 @@ class Consumer(object):
                     else:
                         res.append(rval)
                 done.send(res)
-            except Exception:
-                exc_info = sys.exc_info()
-                done.send_exception(
-                        rpc_common.RemoteError(exc_info[0].__name__,
-                            str(exc_info[1]),
-                            ''.join(traceback.format_exception(*exc_info))))
+            except Exception as e:
+                done.send_exception(e)
 
         thread = eventlet.greenthread.spawn(_inner)
 
@@ -120,17 +114,17 @@ class Connection(object):
         pass
 
 
-def create_connection(new=True):
+def create_connection(conf, new=True):
     """Create a connection"""
     return Connection()
 
 
 def check_serialize(msg):
     """Make sure a message intended for rpc can be serialized."""
-    json.dumps(msg)
+    jsonutils.dumps(msg)
 
 
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
     """Make a call that returns multiple times."""
 
     check_serialize(msg)
@@ -139,18 +133,19 @@ def multicall(context, topic, msg, timeout=None):
     if not method:
         return
     args = msg.get('args', {})
+    version = msg.get('version', None)
 
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
         return iter([None])
     else:
-        return consumer.call(context, method, args, timeout)
+        return consumer.call(context, version, method, args, timeout)
 
 
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
     """Sends a message on a topic and wait for a response."""
-    rv = multicall(context, topic, msg, timeout)
+    rv = multicall(conf, context, topic, msg, timeout)
     # NOTE(vish): return the last result from the multicall
     rv = list(rv)
     if not rv:
@@ -158,14 +153,14 @@ def call(context, topic, msg, timeout=None):
     return rv[-1]
 
 
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
     try:
-        call(context, topic, msg)
-    except rpc_common.RemoteError:
+        call(conf, context, topic, msg)
+    except Exception:
         pass
 
 
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
     check_serialize(msg)
 
 
@@ -173,16 +168,17 @@ def cleanup():
     pass
 
 
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
     """Cast to all consumers of a topic"""
     check_serialize(msg)
     method = msg.get('method')
     if not method:
         return
     args = msg.get('args', {})
+    version = msg.get('version', None)
 
     for consumer in CONSUMERS.get(topic, []):
         try:
-            consumer.call(context, method, args, None)
-        except rpc_common.RemoteError:
+            consumer.call(context, version, method, args, None)
+        except Exception:
             pass
similarity index 63%
rename from heat/rpc/impl_kombu.py
rename to heat/openstack/common/rpc/impl_kombu.py
index 7199273a1ba2cef0143dac9cf24e7cb91cb932df..20003d77e4d1b07cd5311a1a99b183998991d612 100644 (file)
@@ -14,6 +14,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import functools
 import itertools
 import socket
 import ssl
@@ -24,14 +25,14 @@ import uuid
 import eventlet
 import greenlet
 import kombu
+import kombu.connection
 import kombu.entity
 import kombu.messaging
-import kombu.connection
 
-from heat.common import config
 from heat.openstack.common import cfg
-from heat.rpc import amqp as rpc_amqp
-from heat.rpc import common as rpc_common
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common.rpc import amqp as rpc_amqp
+from heat.openstack.common.rpc import common as rpc_common
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
@@ -46,11 +47,44 @@ kombu_opts = [
     cfg.StrOpt('kombu_ssl_ca_certs',
                default='',
                help=('SSL certification authority file '
-                    '(valid only if SSL enabled)')),
-    ]
+                     '(valid only if SSL enabled)')),
+    cfg.StrOpt('rabbit_host',
+               default='localhost',
+               help='the RabbitMQ host'),
+    cfg.IntOpt('rabbit_port',
+               default=5672,
+               help='the RabbitMQ port'),
+    cfg.BoolOpt('rabbit_use_ssl',
+                default=False,
+                help='connect over SSL for RabbitMQ'),
+    cfg.StrOpt('rabbit_userid',
+               default='guest',
+               help='the RabbitMQ userid'),
+    cfg.StrOpt('rabbit_password',
+               default='guest',
+               help='the RabbitMQ password'),
+    cfg.StrOpt('rabbit_virtual_host',
+               default='/',
+               help='the RabbitMQ virtual host'),
+    cfg.IntOpt('rabbit_retry_interval',
+               default=1,
+               help='how frequently to retry connecting with RabbitMQ'),
+    cfg.IntOpt('rabbit_retry_backoff',
+               default=2,
+               help='how long to backoff for between retries when connecting '
+                    'to RabbitMQ'),
+    cfg.IntOpt('rabbit_max_retries',
+               default=0,
+               help='maximum retries with trying to connect to RabbitMQ '
+                    '(the default of 0 implies an infinite retry count)'),
+    cfg.BoolOpt('rabbit_durable_queues',
+                default=False,
+                help='use durable queues in RabbitMQ'),
+
+]
+
+cfg.CONF.register_opts(kombu_opts)
 
-FLAGS = config.FLAGS
-FLAGS.register_opts(kombu_opts)
 LOG = rpc_common.LOG
 
 
@@ -126,7 +160,7 @@ class ConsumerBase(object):
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'"""
 
-    def __init__(self, channel, msg_id, callback, tag, **kwargs):
+    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
         """Init a 'direct' queue.
 
         'channel' is the amqp channel to use
@@ -138,61 +172,61 @@ class DirectConsumer(ConsumerBase):
         """
         # Default options
         options = {'durable': False,
-                'auto_delete': True,
-                'exclusive': True}
+                   'auto_delete': True,
+                   'exclusive': True}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(
-                name=msg_id,
-                type='direct',
-                durable=options['durable'],
-                auto_delete=options['auto_delete'])
-        super(DirectConsumer, self).__init__(
-                channel,
-                callback,
-                tag,
-                name=msg_id,
-                exchange=exchange,
-                routing_key=msg_id,
-                **options)
+        exchange = kombu.entity.Exchange(name=msg_id,
+                                         type='direct',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(DirectConsumer, self).__init__(channel,
+                                             callback,
+                                             tag,
+                                             name=msg_id,
+                                             exchange=exchange,
+                                             routing_key=msg_id,
+                                             **options)
 
 
 class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
-    def __init__(self, channel, topic, callback, tag, **kwargs):
+    def __init__(self, conf, channel, topic, callback, tag, name=None,
+                 **kwargs):
         """Init a 'topic' queue.
 
-        'channel' is the amqp channel to use
-        'topic' is the topic to listen on
-        'callback' is the callback to call when messages are received
-        'tag' is a unique ID for the consumer on the channel
+        :param channel: the amqp channel to use
+        :param topic: the topic to listen on
+        :paramtype topic: str
+        :param callback: the callback to call when messages are received
+        :param tag: a unique ID for the consumer on the channel
+        :param name: optional queue name, defaults to topic
+        :paramtype name: str
 
-        Other kombu options may be passed
+        Other kombu options may be passed as keyword arguments
         """
         # Default options
-        options = {'durable': FLAGS.rabbit_durable_queues,
-                'auto_delete': False,
-                'exclusive': False}
+        options = {'durable': conf.rabbit_durable_queues,
+                   'auto_delete': False,
+                   'exclusive': False}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(
-                name=FLAGS.control_exchange,
-                type='topic',
-                durable=options['durable'],
-                auto_delete=options['auto_delete'])
-        super(TopicConsumer, self).__init__(
-                channel,
-                callback,
-                tag,
-                name=topic,
-                exchange=exchange,
-                routing_key=topic,
-                **options)
+        exchange = kombu.entity.Exchange(name=conf.control_exchange,
+                                         type='topic',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(TopicConsumer, self).__init__(channel,
+                                            callback,
+                                            tag,
+                                            name=name or topic,
+                                            exchange=exchange,
+                                            routing_key=topic,
+                                            **options)
 
 
 class FanoutConsumer(ConsumerBase):
     """Consumer class for 'fanout'"""
 
-    def __init__(self, channel, topic, callback, tag, **kwargs):
+    def __init__(self, conf, channel, topic, callback, tag, **kwargs):
         """Init a 'fanout' queue.
 
         'channel' is the amqp channel to use
@@ -208,22 +242,17 @@ class FanoutConsumer(ConsumerBase):
 
         # Default options
         options = {'durable': False,
-                'auto_delete': True,
-                'exclusive': True}
+                   'auto_delete': True,
+                   'exclusive': True}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(
-                name=exchange_name,
-                type='fanout',
-                durable=options['durable'],
-                auto_delete=options['auto_delete'])
-        super(FanoutConsumer, self).__init__(
-                channel,
-                callback,
-                tag,
-                name=queue_name,
-                exchange=exchange,
-                routing_key=topic,
-                **options)
+        exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(FanoutConsumer, self).__init__(channel, callback, tag,
+                                             name=queue_name,
+                                             exchange=exchange,
+                                             routing_key=topic,
+                                             **options)
 
 
 class Publisher(object):
@@ -241,9 +270,10 @@ class Publisher(object):
     def reconnect(self, channel):
         """Re-establish the Producer after a rabbit reconnection"""
         self.exchange = kombu.entity.Exchange(name=self.exchange_name,
-                **self.kwargs)
+                                              **self.kwargs)
         self.producer = kombu.messaging.Producer(exchange=self.exchange,
-                channel=channel, routing_key=self.routing_key)
+                                                 channel=channel,
+                                                 routing_key=self.routing_key)
 
     def send(self, msg):
         """Send a message"""
@@ -252,65 +282,56 @@ class Publisher(object):
 
 class DirectPublisher(Publisher):
     """Publisher class for 'direct'"""
-    def __init__(self, channel, msg_id, **kwargs):
+    def __init__(self, conf, channel, msg_id, **kwargs):
         """init a 'direct' publisher.
 
         Kombu options may be passed as keyword args to override defaults
         """
 
         options = {'durable': False,
-                'auto_delete': True,
-                'exclusive': True}
+                   'auto_delete': True,
+                   'exclusive': True}
         options.update(kwargs)
-        super(DirectPublisher, self).__init__(channel,
-                msg_id,
-                msg_id,
-                type='direct',
-                **options)
+        super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+                                              type='direct', **options)
 
 
 class TopicPublisher(Publisher):
     """Publisher class for 'topic'"""
-    def __init__(self, channel, topic, **kwargs):
+    def __init__(self, conf, channel, topic, **kwargs):
         """init a 'topic' publisher.
 
         Kombu options may be passed as keyword args to override defaults
         """
-        options = {'durable': FLAGS.rabbit_durable_queues,
-                'auto_delete': False,
-                'exclusive': False}
+        options = {'durable': conf.rabbit_durable_queues,
+                   'auto_delete': False,
+                   'exclusive': False}
         options.update(kwargs)
-        super(TopicPublisher, self).__init__(channel,
-                FLAGS.control_exchange,
-                topic,
-                type='topic',
-                **options)
+        super(TopicPublisher, self).__init__(channel, conf.control_exchange,
+                                             topic, type='topic', **options)
 
 
 class FanoutPublisher(Publisher):
     """Publisher class for 'fanout'"""
-    def __init__(self, channel, topic, **kwargs):
+    def __init__(self, conf, channel, topic, **kwargs):
         """init a 'fanout' publisher.
 
         Kombu options may be passed as keyword args to override defaults
         """
         options = {'durable': False,
-                'auto_delete': True,
-                'exclusive': True}
+                   'auto_delete': True,
+                   'exclusive': True}
         options.update(kwargs)
-        super(FanoutPublisher, self).__init__(channel,
-                '%s_fanout' % topic,
-                None,
-                type='fanout',
-                **options)
+        super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+                                              None, type='fanout', **options)
 
 
 class NotifyPublisher(TopicPublisher):
     """Publisher class for 'notify'"""
 
-    def __init__(self, *args, **kwargs):
-        self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
-        super(NotifyPublisher, self).__init__(*args, **kwargs)
+    def __init__(self, conf, channel, topic, **kwargs):
+        self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
     def reconnect(self, channel):
         super(NotifyPublisher, self).reconnect(channel)
@@ -319,25 +340,28 @@ class NotifyPublisher(TopicPublisher):
         # we do this to ensure that messages don't get dropped if the
         # consumer is started after we do
         queue = kombu.entity.Queue(channel=channel,
-                exchange=self.exchange,
-                durable=self.durable,
-                name=self.routing_key,
-                routing_key=self.routing_key)
+                                   exchange=self.exchange,
+                                   durable=self.durable,
+                                   name=self.routing_key,
+                                   routing_key=self.routing_key)
         queue.declare()
 
 
 class Connection(object):
     """Connection object."""
 
-    def __init__(self, server_params=None):
+    pool = None
+
+    def __init__(self, conf, server_params=None):
         self.consumers = []
         self.consumer_thread = None
-        self.max_retries = FLAGS.rabbit_max_retries
+        self.conf = conf
+        self.max_retries = self.conf.rabbit_max_retries
         # Try forever?
         if self.max_retries <= 0:
             self.max_retries = None
-        self.interval_start = FLAGS.rabbit_retry_interval
-        self.interval_stepping = FLAGS.rabbit_retry_backoff
+        self.interval_start = self.conf.rabbit_retry_interval
+        self.interval_stepping = self.conf.rabbit_retry_backoff
         # max retry-interval = 30 seconds
         self.interval_max = 30
         self.memory_transport = False
@@ -353,21 +377,21 @@ class Connection(object):
             p_key = server_params_to_kombu_params.get(sp_key, sp_key)
             params[p_key] = value
 
-        params.setdefault('hostname', FLAGS.rabbit_host)
-        params.setdefault('port', FLAGS.rabbit_port)
-        params.setdefault('userid', FLAGS.rabbit_userid)
-        params.setdefault('password', FLAGS.rabbit_password)
-        params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
+        params.setdefault('hostname', self.conf.rabbit_host)
+        params.setdefault('port', self.conf.rabbit_port)
+        params.setdefault('userid', self.conf.rabbit_userid)
+        params.setdefault('password', self.conf.rabbit_password)
+        params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
 
         self.params = params
 
-        if FLAGS.fake_rabbit:
+        if self.conf.fake_rabbit:
             self.params['transport'] = 'memory'
             self.memory_transport = True
         else:
             self.memory_transport = False
 
-        if FLAGS.rabbit_use_ssl:
+        if self.conf.rabbit_use_ssl:
             self.params['ssl'] = self._fetch_ssl_params()
 
         self.connection = None
@@ -379,14 +403,14 @@ class Connection(object):
         ssl_params = dict()
 
         # http://docs.python.org/library/ssl.html - ssl.wrap_socket
-        if FLAGS.kombu_ssl_version:
-            ssl_params['ssl_version'] = FLAGS.kombu_ssl_version
-        if FLAGS.kombu_ssl_keyfile:
-            ssl_params['keyfile'] = FLAGS.kombu_ssl_keyfile
-        if FLAGS.kombu_ssl_certfile:
-            ssl_params['certfile'] = FLAGS.kombu_ssl_certfile
-        if FLAGS.kombu_ssl_ca_certs:
-            ssl_params['ca_certs'] = FLAGS.kombu_ssl_ca_certs
+        if self.conf.kombu_ssl_version:
+            ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+        if self.conf.kombu_ssl_keyfile:
+            ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
+        if self.conf.kombu_ssl_certfile:
+            ssl_params['certfile'] = self.conf.kombu_ssl_certfile
+        if self.conf.kombu_ssl_ca_certs:
+            ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
             # We might want to allow variations in the
             # future with this?
             ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@@ -405,7 +429,7 @@ class Connection(object):
         """
         if self.connection:
             LOG.info(_("Reconnecting to AMQP server on "
-                    "%(hostname)s:%(port)d") % self.params)
+                     "%(hostname)s:%(port)d") % self.params)
             try:
                 self.connection.close()
             except self.connection_errors:
@@ -413,8 +437,7 @@ class Connection(object):
             # Setting this in case the next statement fails, though
             # it shouldn't be doing any network operations, yet.
             self.connection = None
-        self.connection = kombu.connection.BrokerConnection(
-                **self.params)
+        self.connection = kombu.connection.BrokerConnection(**self.params)
         self.connection_errors = self.connection.connection_errors
         if self.memory_transport:
             # Kludge to speed up tests.
@@ -427,8 +450,8 @@ class Connection(object):
             self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
             consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on '
-                '%(hostname)s:%(port)d') % self.params)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
+                 self.params)
 
     def reconnect(self):
         """Handles reconnecting and re-establishing queues.
@@ -445,7 +468,7 @@ class Connection(object):
             try:
                 self._connect()
                 return
-            except self.connection_errors, e:
+            except (self.connection_errors, IOError), e:
                 pass
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
@@ -464,8 +487,8 @@ class Connection(object):
 
             if self.max_retries and attempt == self.max_retries:
                 LOG.exception(_('Unable to connect to AMQP server on '
-                        '%(hostname)s:%(port)d after %(max_retries)d '
-                        'tries: %(err_str)s') % log_info)
+                              '%(hostname)s:%(port)d after %(max_retries)d '
+                              'tries: %(err_str)s') % log_info)
                 # NOTE(comstud): Copied from original code.  There's
                 # really no better recourse because if this was a queue we
                 # need to consume on, we have no way to consume anymore.
@@ -480,15 +503,15 @@ class Connection(object):
 
             log_info['sleep_time'] = sleep_time
             LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
-                    ' unreachable: %(err_str)s. Trying again in '
-                    '%(sleep_time)d seconds.') % log_info)
+                          ' unreachable: %(err_str)s. Trying again in '
+                          '%(sleep_time)d seconds.') % log_info)
             time.sleep(sleep_time)
 
     def ensure(self, error_callback, method, *args, **kwargs):
         while True:
             try:
                 return method(*args, **kwargs)
-            except (self.connection_errors, socket.timeout), e:
+            except (self.connection_errors, socket.timeout, IOError), e:
                 pass
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
@@ -531,11 +554,11 @@ class Connection(object):
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
             LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
-                "%(err_str)s") % log_info)
+                      "%(err_str)s") % log_info)
 
         def _declare_consumer():
-            consumer = consumer_cls(self.channel, topic, callback,
-                    self.consumer_num.next())
+            consumer = consumer_cls(self.conf, self.channel, topic, callback,
+                                    self.consumer_num.next())
             self.consumers.append(consumer)
             return consumer
 
@@ -549,11 +572,11 @@ class Connection(object):
         def _error_callback(exc):
             if isinstance(exc, socket.timeout):
                 LOG.exception(_('Timed out waiting for RPC response: %s') %
-                        str(exc))
+                              str(exc))
                 raise rpc_common.Timeout()
             else:
                 LOG.exception(_('Failed to consume message from queue: %s') %
-                        str(exc))
+                              str(exc))
                 info['do_consume'] = True
 
         def _consume():
@@ -587,10 +610,10 @@ class Connection(object):
         def _error_callback(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
             LOG.exception(_("Failed to publish message to topic "
-                "'%(topic)s': %(err_str)s") % log_info)
+                          "'%(topic)s': %(err_str)s") % log_info)
 
         def _publish():
-            publisher = cls(self.channel, topic, **kwargs)
+            publisher = cls(self.conf, self.channel, topic, **kwargs)
             publisher.send(msg)
 
         self.ensure(_error_callback, _publish)
@@ -602,9 +625,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
         """Create a 'topic' consumer."""
-        self.declare_consumer(TopicConsumer, topic, callback)
+        self.declare_consumer(functools.partial(TopicConsumer,
+                                                name=queue_name,
+                                                ),
+                              topic, callback)
 
     def declare_fanout_consumer(self, topic, callback):
         """Create a 'fanout' consumer"""
@@ -648,57 +674,77 @@ class Connection(object):
 
     def create_consumer(self, topic, proxy, fanout=False):
         """Create a consumer that calls a method in a proxy object"""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+
         if fanout:
-            self.declare_fanout_consumer(topic,
-                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+            self.declare_fanout_consumer(topic, proxy_cb)
         else:
-            self.declare_topic_consumer(topic,
-                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
-
+            self.declare_topic_consumer(topic, proxy_cb)
 
-Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+    def create_worker(self, topic, proxy, pool_name):
+        """Create a worker that calls a method in a proxy object"""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
 
-def create_connection(new=True):
+def create_connection(conf, new=True):
     """Create a connection"""
-    return rpc_amqp.create_connection(new, Connection.pool)
+    return rpc_amqp.create_connection(
+        conf, new,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
     """Make a call that returns multiple times."""
-    return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+    return rpc_amqp.multicall(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
     """Sends a message on a topic and wait for a response."""
-    return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+    return rpc_amqp.call(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
     """Sends a message on a topic without waiting for a response."""
-    return rpc_amqp.cast(context, topic, msg, Connection.pool)
+    return rpc_amqp.cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
     """Sends a message on a fanout exchange without waiting for a response."""
-    return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+    return rpc_amqp.fanout_cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def cast_to_server(context, server_params, topic, msg):
+def cast_to_server(conf, context, server_params, topic, msg):
     """Sends a message on a topic to a specific server."""
-    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
-            Connection.pool)
+    return rpc_amqp.cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def fanout_cast_to_server(context, server_params, topic, msg):
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
     """Sends a message on a fanout exchange to a specific server."""
-    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
-            Connection.pool)
+    return rpc_amqp.cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
     """Sends a notification event on a topic."""
-    return rpc_amqp.notify(context, topic, msg, Connection.pool)
+    return rpc_amqp.notify(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
 def cleanup():
similarity index 60%
rename from heat/rpc/impl_qpid.py
rename to heat/openstack/common/rpc/impl_qpid.py
index 8e954156f9fae5c702a07169c26b4f3dc0194578..d4f4cc77e9ff010443a8b39770be5c97e84a6de3 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import functools
 import itertools
+import logging
 import time
 import uuid
-import json
 
 import eventlet
 import greenlet
 import qpid.messaging
 import qpid.messaging.exceptions
 
-from heat.common import config
 from heat.openstack.common import cfg
-from heat.rpc import amqp as rpc_amqp
-from heat.rpc import common as rpc_common
-
-from heat.openstack.common import log as logging
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import amqp as rpc_amqp
+from heat.openstack.common.rpc import common as rpc_common
 
 LOG = logging.getLogger(__name__)
 
+qpid_opts = [
+    cfg.StrOpt('qpid_hostname',
+               default='localhost',
+               help='Qpid broker hostname'),
+    cfg.StrOpt('qpid_port',
+               default='5672',
+               help='Qpid broker port'),
+    cfg.StrOpt('qpid_username',
+               default='',
+               help='Username for qpid connection'),
+    cfg.StrOpt('qpid_password',
+               default='',
+               help='Password for qpid connection'),
+    cfg.StrOpt('qpid_sasl_mechanisms',
+               default='',
+               help='Space separated list of SASL mechanisms to use for auth'),
+    cfg.BoolOpt('qpid_reconnect',
+                default=True,
+                help='Automatically reconnect'),
+    cfg.IntOpt('qpid_reconnect_timeout',
+               default=0,
+               help='Reconnection timeout in seconds'),
+    cfg.IntOpt('qpid_reconnect_limit',
+               default=0,
+               help='Max reconnections before giving up'),
+    cfg.IntOpt('qpid_reconnect_interval_min',
+               default=0,
+               help='Minimum seconds between reconnection attempts'),
+    cfg.IntOpt('qpid_reconnect_interval_max',
+               default=0,
+               help='Maximum seconds between reconnection attempts'),
+    cfg.IntOpt('qpid_reconnect_interval',
+               default=0,
+               help='Equivalent to setting max and min to the same value'),
+    cfg.IntOpt('qpid_heartbeat',
+               default=5,
+               help='Seconds between connection keepalive heartbeats'),
+    cfg.StrOpt('qpid_protocol',
+               default='tcp',
+               help="Transport to use, either 'tcp' or 'ssl'"),
+    cfg.BoolOpt('qpid_tcp_nodelay',
+                default=True,
+                help='Disable Nagle algorithm'),
+]
+
+cfg.CONF.register_opts(qpid_opts)
+
 
 class ConsumerBase(object):
     """Consumer base class."""
@@ -78,7 +125,7 @@ class ConsumerBase(object):
         addr_opts["node"]["x-declare"].update(node_opts)
         addr_opts["link"]["x-declare"].update(link_opts)
 
-        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
         self.reconnect(session)
 
@@ -91,7 +138,12 @@ class ConsumerBase(object):
     def consume(self):
         """Fetch the message and pass it to the callback object"""
         message = self.receiver.fetch()
-        self.callback(message.content)
+        try:
+            self.callback(message.content)
+        except Exception:
+            LOG.exception(_("Failed to process message... skipping it."))
+        finally:
+            self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
@@ -100,7 +152,7 @@ class ConsumerBase(object):
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'"""
 
-    def __init__(self, session, msg_id, callback):
+    def __init__(self, conf, session, msg_id, callback):
         """Init a 'direct' queue.
 
         'session' is the amqp session to use
@@ -109,32 +161,35 @@ class DirectConsumer(ConsumerBase):
         """
 
         super(DirectConsumer, self).__init__(session, callback,
-                        "%s/%s" % (msg_id, msg_id),
-                        {"type": "direct"},
-                        msg_id,
-                        {"exclusive": True})
+                                             "%s/%s" % (msg_id, msg_id),
+                                             {"type": "direct"},
+                                             msg_id,
+                                             {"exclusive": True})
 
 
 class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
-    def __init__(self, session, topic, callback):
+    def __init__(self, conf, session, topic, callback, name=None):
         """Init a 'topic' queue.
 
-        'session' is the amqp session to use
-        'topic' is the topic to listen on
-        'callback' is the callback to call when messages are received
+        :param session: the amqp session to use
+        :param topic: is the topic to listen on
+        :paramtype topic: str
+        :param callback: the callback to call when messages are received
+        :param name: optional queue name, defaults to topic
         """
 
         super(TopicConsumer, self).__init__(session, callback,
-                        "%s/%s" % (cfg.CONF.control_exchange, topic), {},
-                        topic, {})
+                                            "%s/%s" % (conf.control_exchange,
+                                                       topic),
+                                            {}, name or topic, {})
 
 
 class FanoutConsumer(ConsumerBase):
     """Consumer class for 'fanout'"""
 
-    def __init__(self, session, topic, callback):
+    def __init__(self, conf, session, topic, callback):
         """Init a 'fanout' queue.
 
         'session' is the amqp session to use
@@ -142,11 +197,12 @@ class FanoutConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(FanoutConsumer, self).__init__(session, callback,
-                        "%s_fanout" % topic,
-                        {"durable": False, "type": "fanout"},
-                        "%s_fanout_%s" % (topic, uuid.uuid4().hex),
-                        {"exclusive": True})
+        super(FanoutConsumer, self).__init__(
+            session, callback,
+            "%s_fanout" % topic,
+            {"durable": False, "type": "fanout"},
+            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+            {"exclusive": True})
 
 
 class Publisher(object):
@@ -174,7 +230,7 @@ class Publisher(object):
         if node_opts:
             addr_opts["node"]["x-declare"].update(node_opts)
 
-        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
         self.reconnect(session)
 
@@ -189,7 +245,7 @@ class Publisher(object):
 
 class DirectPublisher(Publisher):
     """Publisher class for 'direct'"""
-    def __init__(self, session, msg_id):
+    def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
         super(DirectPublisher, self).__init__(session, msg_id,
                                               {"type": "Direct"})
@@ -197,47 +253,53 @@ class DirectPublisher(Publisher):
 
 class TopicPublisher(Publisher):
     """Publisher class for 'topic'"""
-    def __init__(self, session, topic):
+    def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(TopicPublisher, self).__init__(session,
-                            "%s/%s" % (cfg.CONF.control_exchange, topic))
+        super(TopicPublisher, self).__init__(
+            session,
+            "%s/%s" % (conf.control_exchange, topic))
 
 
 class FanoutPublisher(Publisher):
     """Publisher class for 'fanout'"""
-    def __init__(self, session, topic):
+    def __init__(self, conf, session, topic):
         """init a 'fanout' publisher.
         """
-        super(FanoutPublisher, self).__init__(session,
-                                "%s_fanout" % topic, {"type": "fanout"})
+        super(FanoutPublisher, self).__init__(
+            session,
+            "%s_fanout" % topic, {"type": "fanout"})
 
 
 class NotifyPublisher(Publisher):
     """Publisher class for notifications"""
-    def __init__(self, session, topic):
+    def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(NotifyPublisher, self).__init__(session,
-                            "%s/%s" % (cfg.CONF.control_exchange, topic),
-                                {"durable": True})
+        super(NotifyPublisher, self).__init__(
+            session,
+            "%s/%s" % (conf.control_exchange, topic),
+            {"durable": True})
 
 
 class Connection(object):
     """Connection object."""
 
-    def __init__(self, server_params=None):
+    pool = None
+
+    def __init__(self, conf, server_params=None):
         self.session = None
         self.consumers = {}
         self.consumer_thread = None
+        self.conf = conf
 
         if server_params is None:
             server_params = {}
 
-        default_params = dict(hostname=cfg.CONF.qpid_hostname,
-                port=cfg.CONF.qpid_port,
-                username=cfg.CONF.qpid_username,
-                password=cfg.CONF.qpid_password)
+        default_params = dict(hostname=self.conf.qpid_hostname,
+                              port=self.conf.qpid_port,
+                              username=self.conf.qpid_username,
+                              password=self.conf.qpid_password)
 
         params = server_params
         for key in default_params.keys():
@@ -251,25 +313,25 @@ class Connection(object):
         # before we call open
         self.connection.username = params['username']
         self.connection.password = params['password']
-        self.connection.sasl_mechanisms = cfg.CONF.qpid_sasl_mechanisms
-        self.connection.reconnect = cfg.CONF.qpid_reconnect
-        if cfg.CONF.qpid_reconnect_timeout:
-            self.connection.reconnect_timeout = \
-                                cfg.CONF.qpid_reconnect_timeout
-        if cfg.CONF.qpid_reconnect_limit:
-            self.connection.reconnect_limit = cfg.CONF.qpid_reconnect_limit
-        if cfg.CONF.qpid_reconnect_interval_max:
+        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
+        self.connection.reconnect = self.conf.qpid_reconnect
+        if self.conf.qpid_reconnect_timeout:
+            self.connection.reconnect_timeout = (
+                self.conf.qpid_reconnect_timeout)
+        if self.conf.qpid_reconnect_limit:
+            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
+        if self.conf.qpid_reconnect_interval_max:
             self.connection.reconnect_interval_max = (
-                    cfg.CONF.qpid_reconnect_interval_max)
-        if cfg.CONF.qpid_reconnect_interval_min:
+                self.conf.qpid_reconnect_interval_max)
+        if self.conf.qpid_reconnect_interval_min:
             self.connection.reconnect_interval_min = (
-                    cfg.CONF.qpid_reconnect_interval_min)
-        if cfg.CONF.qpid_reconnect_interval:
-            self.connection.reconnect_interval = \
-                                cfg.CONF.qpid_reconnect_interval
-        self.connection.hearbeat = cfg.CONF.qpid_heartbeat
-        self.connection.protocol = cfg.CONF.qpid_protocol
-        self.connection.tcp_nodelay = cfg.CONF.qpid_tcp_nodelay
+                self.conf.qpid_reconnect_interval_min)
+        if self.conf.qpid_reconnect_interval:
+            self.connection.reconnect_interval = (
+                self.conf.qpid_reconnect_interval)
+        self.connection.hearbeat = self.conf.qpid_heartbeat
+        self.connection.protocol = self.conf.qpid_protocol
+        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
         # Open is part of reconnect -
         # NOTE(WGH) not sure we need this with the reconnect flags
@@ -293,12 +355,12 @@ class Connection(object):
             try:
                 self.connection.open()
             except qpid.messaging.exceptions.ConnectionError, e:
-                LOG.error(_('Unable to connect to AMQP server: %s ') % e)
-                time.sleep(cfg.CONF.qpid_reconnect_interval or 1)
+                LOG.error(_('Unable to connect to AMQP server: %s'), e)
+                time.sleep(self.conf.qpid_reconnect_interval or 1)
             else:
                 break
 
-        LOG.info(_('Connected to AMQP server on %s') % self.broker)
+        LOG.info(_('Connected to AMQP server on %s'), self.broker)
 
         self.session = self.connection.session()
 
@@ -338,10 +400,10 @@ class Connection(object):
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
             LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
-                "%(err_str)s") % log_info)
+                      "%(err_str)s") % log_info)
 
         def _declare_consumer():
-            consumer = consumer_cls(self.session, topic, callback)
+            consumer = consumer_cls(self.conf, self.session, topic, callback)
             self._register_consumer(consumer)
             return consumer
 
@@ -353,11 +415,11 @@ class Connection(object):
         def _error_callback(exc):
             if isinstance(exc, qpid.messaging.exceptions.Empty):
                 LOG.exception(_('Timed out waiting for RPC response: %s') %
-                        str(exc))
+                              str(exc))
                 raise rpc_common.Timeout()
             else:
                 LOG.exception(_('Failed to consume message from queue: %s') %
-                        str(exc))
+                              str(exc))
 
         def _consume():
             nxt_receiver = self.session.next_receiver(timeout=timeout)
@@ -387,10 +449,10 @@ class Connection(object):
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
             LOG.exception(_("Failed to publish message to topic "
-                "'%(topic)s': %(err_str)s") % log_info)
+                          "'%(topic)s': %(err_str)s") % log_info)
 
         def _publisher_send():
-            publisher = cls(self.session, topic)
+            publisher = cls(self.conf, self.session, topic)
             publisher.send(msg)
 
         return self.ensure(_connect_error, _publisher_send)
@@ -402,9 +464,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
         """Create a 'topic' consumer."""
-        self.declare_consumer(TopicConsumer, topic, callback)
+        self.declare_consumer(functools.partial(TopicConsumer,
+                                                name=queue_name,
+                                                ),
+                              topic, callback)
 
     def declare_fanout_consumer(self, topic, callback):
         """Create a 'fanout' consumer"""
@@ -448,59 +513,86 @@ class Connection(object):
 
     def create_consumer(self, topic, proxy, fanout=False):
         """Create a consumer that calls a method in a proxy object"""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+
         if fanout:
-            consumer = FanoutConsumer(self.session, topic,
-                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
         else:
-            consumer = TopicConsumer(self.session, topic,
-                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
+
         self._register_consumer(consumer)
+
         return consumer
 
+    def create_worker(self, topic, proxy, pool_name):
+        """Create a worker that calls a method in a proxy object"""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+
+        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+                                 name=pool_name)
 
-Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+        self._register_consumer(consumer)
+
+        return consumer
 
 
-def create_connection(new=True):
+def create_connection(conf, new=True):
     """Create a connection"""
-    return rpc_amqp.create_connection(new, Connection.pool)
+    return rpc_amqp.create_connection(
+        conf, new,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def multicall(context, topic, msg, timeout=None):
+def multicall(conf, context, topic, msg, timeout=None):
     """Make a call that returns multiple times."""
-    return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+    return rpc_amqp.multicall(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def call(context, topic, msg, timeout=None):
+def call(conf, context, topic, msg, timeout=None):
     """Sends a message on a topic and wait for a response."""
-    return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+    return rpc_amqp.call(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def cast(context, topic, msg):
+def cast(conf, context, topic, msg):
     """Sends a message on a topic without waiting for a response."""
-    return rpc_amqp.cast(context, topic, msg, Connection.pool)
+    return rpc_amqp.cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def fanout_cast(context, topic, msg):
+def fanout_cast(conf, context, topic, msg):
     """Sends a message on a fanout exchange without waiting for a response."""
-    return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+    return rpc_amqp.fanout_cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def cast_to_server(context, server_params, topic, msg):
+def cast_to_server(conf, context, server_params, topic, msg):
     """Sends a message on a topic to a specific server."""
-    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
-            Connection.pool)
+    return rpc_amqp.cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def fanout_cast_to_server(context, server_params, topic, msg):
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
     """Sends a message on a fanout exchange to a specific server."""
-    return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
-            msg, Connection.pool)
+    return rpc_amqp.fanout_cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(context, topic, msg):
+def notify(conf, context, topic, msg):
     """Sends a notification event on a topic."""
-    return rpc_amqp.notify(context, topic, msg, Connection.pool)
+    return rpc_amqp.notify(conf, context, topic, msg,
+                           rpc_amqp.get_connection_pool(conf, Connection))
 
 
 def cleanup():
diff --git a/heat/openstack/common/rpc/impl_zmq.py b/heat/openstack/common/rpc/impl_zmq.py
new file mode 100644 (file)
index 0000000..71bc8f2
--- /dev/null
@@ -0,0 +1,721 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 Cloudscaling Group, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import pprint
+import socket
+import string
+import sys
+import types
+import uuid
+
+import eventlet
+from eventlet.green import zmq
+import greenlet
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
+from heat.openstack.common import jsonutils
+from heat.openstack.common.rpc import common as rpc_common
+
+
+# for convenience, are not modified.
+pformat = pprint.pformat
+Timeout = eventlet.timeout.Timeout
+LOG = rpc_common.LOG
+RemoteError = rpc_common.RemoteError
+RPCException = rpc_common.RPCException
+
+zmq_opts = [
+    cfg.StrOpt('rpc_zmq_bind_address', default='*',
+               help='ZeroMQ bind address. Should be a wildcard (*), '
+                    'an ethernet interface, or IP. '
+                    'The "host" option should point or resolve to this '
+                    'address.'),
+
+    # The module.Class to use for matchmaking.
+    cfg.StrOpt(
+        'rpc_zmq_matchmaker',
+        default=('heat.openstack.common.rpc.'
+                 'matchmaker.MatchMakerLocalhost'),
+        help='MatchMaker driver',
+        ),
+
+    # The following port is unassigned by IANA as of 2012-05-21
+    cfg.IntOpt('rpc_zmq_port', default=9501,
+               help='ZeroMQ receiver listening port'),
+
+    cfg.IntOpt('rpc_zmq_contexts', default=1,
+               help='Number of ZeroMQ contexts, defaults to 1'),
+
+    cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+               help='Directory for holding IPC sockets'),
+    cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+               help='Name of this node. Must be a valid hostname, FQDN, or '
+                    'IP address')
+]
+
+
+# These globals are defined in register_opts(conf),
+# a mandatory initialization call
+FLAGS = None
+ZMQ_CTX = None  # ZeroMQ Context, must be global.
+matchmaker = None  # memoized matchmaker object
+
+
+def _serialize(data):
+    """
+    Serialization wrapper
+    We prefer using JSON, but it cannot encode all types.
+    Error if a developer passes us bad data.
+    """
+    try:
+        return str(jsonutils.dumps(data, ensure_ascii=True))
+    except TypeError:
+        LOG.error(_("JSON serialization failed."))
+        raise
+
+
+def _deserialize(data):
+    """
+    Deserialization wrapper
+    """
+    LOG.debug(_("Deserializing: %s"), data)
+    return jsonutils.loads(data)
+
+
+class ZmqSocket(object):
+    """
+    A tiny wrapper around ZeroMQ to simplify the send/recv protocol
+    and connection management.
+
+    Can be used as a Context (supports the 'with' statement).
+    """
+
+    def __init__(self, addr, zmq_type, bind=True, subscribe=None):
+        self.sock = ZMQ_CTX.socket(zmq_type)
+        self.addr = addr
+        self.type = zmq_type
+        self.subscriptions = []
+
+        # Support failures on sending/receiving on wrong socket type.
+        self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
+        self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
+        self.can_sub = zmq_type in (zmq.SUB, )
+
+        # Support list, str, & None for subscribe arg (cast to list)
+        do_sub = {
+            list: subscribe,
+            str: [subscribe],
+            type(None): []
+        }[type(subscribe)]
+
+        for f in do_sub:
+            self.subscribe(f)
+
+        str_data = {'addr': addr, 'type': self.socket_s(),
+                    'subscribe': subscribe, 'bind': bind}
+
+        LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
+        LOG.debug(_("-> Subscribed to %(subscribe)s"),  str_data)
+        LOG.debug(_("-> bind: %(bind)s"), str_data)
+
+        try:
+            if bind:
+                self.sock.bind(addr)
+            else:
+                self.sock.connect(addr)
+        except Exception:
+            raise RPCException(_("Could not open socket."))
+
+    def socket_s(self):
+        """Get socket type as string."""
+        t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
+                  'DEALER')
+        return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
+
+    def subscribe(self, msg_filter):
+        """Subscribe."""
+        if not self.can_sub:
+            raise RPCException("Cannot subscribe on this socket.")
+        LOG.debug(_("Subscribing to %s"), msg_filter)
+
+        try:
+            self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
+        except Exception:
+            return
+
+        self.subscriptions.append(msg_filter)
+
+    def unsubscribe(self, msg_filter):
+        """Unsubscribe."""
+        if msg_filter not in self.subscriptions:
+            return
+        self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
+        self.subscriptions.remove(msg_filter)
+
+    def close(self):
+        if self.sock is None or self.sock.closed:
+            return
+
+        # We must unsubscribe, or we'll leak descriptors.
+        if len(self.subscriptions) > 0:
+            for f in self.subscriptions:
+                try:
+                    self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
+                except Exception:
+                    pass
+            self.subscriptions = []
+
+        # Linger -1 prevents lost/dropped messages
+        try:
+            self.sock.close(linger=-1)
+        except Exception:
+            pass
+        self.sock = None
+
+    def recv(self):
+        if not self.can_recv:
+            raise RPCException(_("You cannot recv on this socket."))
+        return self.sock.recv_multipart()
+
+    def send(self, data):
+        if not self.can_send:
+            raise RPCException(_("You cannot send on this socket."))
+        self.sock.send_multipart(data)
+
+
+class ZmqClient(object):
+    """Client for ZMQ sockets."""
+
+    def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+        self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+    def cast(self, msg_id, topic, data):
+        self.outq.send([str(msg_id), str(topic), str('cast'),
+                        _serialize(data)])
+
+    def close(self):
+        self.outq.close()
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+    """Context that supports replying to a rpc.call."""
+    def __init__(self, **kwargs):
+        self.replies = []
+        super(RpcContext, self).__init__(**kwargs)
+
+    def deepcopy(self):
+        values = self.to_dict()
+        values['replies'] = self.replies
+        return self.__class__(**values)
+
+    def reply(self, reply=None, failure=None, ending=False):
+        if ending:
+            return
+        self.replies.append(reply)
+
+    @classmethod
+    def marshal(self, ctx):
+        ctx_data = ctx.to_dict()
+        return _serialize(ctx_data)
+
+    @classmethod
+    def unmarshal(self, data):
+        return RpcContext.from_dict(_deserialize(data))
+
+
+class InternalContext(object):
+    """Used by ConsumerBase as a private context for - methods."""
+
+    def __init__(self, proxy):
+        self.proxy = proxy
+        self.msg_waiter = None
+
+    def _get_response(self, ctx, proxy, topic, data):
+        """Process a curried message and cast the result to topic."""
+        LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+        data.setdefault('version', None)
+        data.setdefault('args', [])
+
+        try:
+            result = proxy.dispatch(
+                ctx, data['version'], data['method'], **data['args'])
+            return ConsumerBase.normalize_reply(result, ctx.replies)
+        except greenlet.GreenletExit:
+            # ignore these since they are just from shutdowns
+            pass
+        except Exception:
+            return {'exc':
+                    rpc_common.serialize_remote_exception(sys.exc_info())}
+
+    def reply(self, ctx, proxy,
+              msg_id=None, context=None, topic=None, msg=None):
+        """Reply to a casted call."""
+        # Our real method is curried into msg['args']
+
+        child_ctx = RpcContext.unmarshal(msg[0])
+        response = ConsumerBase.normalize_reply(
+            self._get_response(child_ctx, proxy, topic, msg[1]),
+            ctx.replies)
+
+        LOG.debug(_("Sending reply"))
+        cast(FLAGS, ctx, topic, {
+            'method': '-process_reply',
+            'args': {
+                'msg_id': msg_id,
+                'response': response
+            }
+        })
+
+
+class ConsumerBase(object):
+    """Base Consumer."""
+
+    def __init__(self):
+        self.private_ctx = InternalContext(None)
+
+    @classmethod
+    def normalize_reply(self, result, replies):
+        #TODO(ewindisch): re-evaluate and document this method.
+        if isinstance(result, types.GeneratorType):
+            return list(result)
+        elif replies:
+            return replies
+        else:
+            return [result]
+
+    def process(self, style, target, proxy, ctx, data):
+        # Method starting with - are
+        # processed internally. (non-valid method name)
+        method = data['method']
+
+        # Internal method
+        # uses internal context for safety.
+        if data['method'][0] == '-':
+            # For reply / process_reply
+            method = method[1:]
+            if method == 'reply':
+                self.private_ctx.reply(ctx, proxy, **data['args'])
+            return
+
+        data.setdefault('version', None)
+        data.setdefault('args', [])
+        proxy.dispatch(ctx, data['version'],
+                       data['method'], **data['args'])
+
+
+class ZmqBaseReactor(ConsumerBase):
+    """
+    A consumer class implementing a
+    centralized casting broker (PULL-PUSH)
+    for RoundRobin requests.
+    """
+
+    def __init__(self, conf):
+        super(ZmqBaseReactor, self).__init__()
+
+        self.conf = conf
+        self.mapping = {}
+        self.proxies = {}
+        self.threads = []
+        self.sockets = []
+        self.subscribe = {}
+
+        self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
+
+    def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
+                 zmq_type_out=None, in_bind=True, out_bind=True,
+                 subscribe=None):
+
+        LOG.info(_("Registering reactor"))
+
+        if zmq_type_in not in (zmq.PULL, zmq.SUB):
+            raise RPCException("Bad input socktype")
+
+        # Items push in.
+        inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
+                        subscribe=subscribe)
+
+        self.proxies[inq] = proxy
+        self.sockets.append(inq)
+
+        LOG.info(_("In reactor registered"))
+
+        if not out_addr:
+            return
+
+        if zmq_type_out not in (zmq.PUSH, zmq.PUB):
+            raise RPCException("Bad output socktype")
+
+        # Items push out.
+        outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
+
+        self.mapping[inq] = outq
+        self.mapping[outq] = inq
+        self.sockets.append(outq)
+
+        LOG.info(_("Out reactor registered"))
+
+    def consume_in_thread(self):
+        def _consume(sock):
+            LOG.info(_("Consuming socket"))
+            while True:
+                self.consume(sock)
+
+        for k in self.proxies.keys():
+            self.threads.append(
+                self.pool.spawn(_consume, k)
+            )
+
+    def wait(self):
+        for t in self.threads:
+            t.wait()
+
+    def close(self):
+        for s in self.sockets:
+            s.close()
+
+        for t in self.threads:
+            t.kill()
+
+
+class ZmqProxy(ZmqBaseReactor):
+    """
+    A consumer class implementing a
+    topic-based proxy, forwarding to
+    IPC sockets.
+    """
+
+    def __init__(self, conf):
+        super(ZmqProxy, self).__init__(conf)
+
+        self.topic_proxy = {}
+        ipc_dir = conf.rpc_zmq_ipc_dir
+
+        self.topic_proxy['zmq_replies'] = \
+            ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
+                      zmq.PUB, bind=True)
+        self.sockets.append(self.topic_proxy['zmq_replies'])
+
+    def consume(self, sock):
+        ipc_dir = self.conf.rpc_zmq_ipc_dir
+
+        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+        data = sock.recv()
+        msg_id, topic, style, in_msg = data
+        topic = topic.split('.', 1)[0]
+
+        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+        # Handle zmq_replies magic
+        if topic.startswith('fanout~'):
+            sock_type = zmq.PUB
+        elif topic.startswith('zmq_replies'):
+            sock_type = zmq.PUB
+            inside = _deserialize(in_msg)
+            msg_id = inside[-1]['args']['msg_id']
+            response = inside[-1]['args']['response']
+            LOG.debug(_("->response->%s"), response)
+            data = [str(msg_id), _serialize(response)]
+        else:
+            sock_type = zmq.PUSH
+
+        if not topic in self.topic_proxy:
+            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+                             sock_type, bind=True)
+            self.topic_proxy[topic] = outq
+            self.sockets.append(outq)
+            LOG.info(_("Created topic proxy: %s"), topic)
+
+            # It takes some time for a pub socket to open,
+            # before we can have any faith in doing a send() to it.
+            if sock_type == zmq.PUB:
+                eventlet.sleep(.5)
+
+        LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
+        self.topic_proxy[topic].send(data)
+        LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+
+
+class ZmqReactor(ZmqBaseReactor):
+    """
+    A consumer class implementing a
+    consumer for messages. Can also be
+    used as a 1:1 proxy
+    """
+
+    def __init__(self, conf):
+        super(ZmqReactor, self).__init__(conf)
+
+    def consume(self, sock):
+        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+        data = sock.recv()
+        LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+        if sock in self.mapping:
+            LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
+                'data': data})
+            self.mapping[sock].send(data)
+            return
+
+        msg_id, topic, style, in_msg = data
+
+        ctx, request = _deserialize(in_msg)
+        ctx = RpcContext.unmarshal(ctx)
+
+        proxy = self.proxies[sock]
+
+        self.pool.spawn_n(self.process, style, topic,
+                          proxy, ctx, request)
+
+
+class Connection(rpc_common.Connection):
+    """Manages connections and threads."""
+
+    def __init__(self, conf):
+        self.conf = conf
+        self.reactor = ZmqReactor(conf)
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        # Only consume on the base topic name.
+        topic = topic.split('.', 1)[0]
+
+        LOG.info(_("Create Consumer for topic (%(topic)s)") %
+                 {'topic': topic})
+
+        # Subscription scenarios
+        if fanout:
+            subscribe = ('', fanout)[type(fanout) == str]
+            sock_type = zmq.SUB
+            topic = 'fanout~' + topic
+        else:
+            sock_type = zmq.PULL
+            subscribe = None
+
+        # Receive messages from (local) proxy
+        inaddr = "ipc://%s/zmq_topic_%s" % \
+            (self.conf.rpc_zmq_ipc_dir, topic)
+
+        LOG.debug(_("Consumer is a zmq.%s"),
+                  ['PULL', 'SUB'][sock_type == zmq.SUB])
+
+        self.reactor.register(proxy, inaddr, sock_type,
+                              subscribe=subscribe, in_bind=False)
+
+    def close(self):
+        self.reactor.close()
+
+    def wait(self):
+        self.reactor.wait()
+
+    def consume_in_thread(self):
+        self.reactor.consume_in_thread()
+
+
+def _cast(addr, context, msg_id, topic, msg, timeout=None):
+    timeout_cast = timeout or FLAGS.rpc_cast_timeout
+    payload = [RpcContext.marshal(context), msg]
+
+    with Timeout(timeout_cast, exception=rpc_common.Timeout):
+        try:
+            conn = ZmqClient(addr)
+
+            # assumes cast can't return an exception
+            conn.cast(msg_id, topic, payload)
+        except zmq.ZMQError:
+            raise RPCException("Cast failed. ZMQ Socket Exception")
+        finally:
+            if 'conn' in vars():
+                conn.close()
+
+
+def _call(addr, context, msg_id, topic, msg, timeout=None):
+    # timeout_response is how long we wait for a response
+    timeout = timeout or FLAGS.rpc_response_timeout
+
+    # The msg_id is used to track replies.
+    msg_id = str(uuid.uuid4().hex)
+
+    # Replies always come into the reply service.
+    reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+
+    LOG.debug(_("Creating payload"))
+    # Curry the original request into a reply method.
+    mcontext = RpcContext.marshal(context)
+    payload = {
+        'method': '-reply',
+        'args': {
+            'msg_id': msg_id,
+            'context': mcontext,
+            'topic': reply_topic,
+            'msg': [mcontext, msg]
+        }
+    }
+
+    LOG.debug(_("Creating queue socket for reply waiter"))
+
+    # Messages arriving async.
+    # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
+    with Timeout(timeout, exception=rpc_common.Timeout):
+        try:
+            msg_waiter = ZmqSocket(
+                "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+                zmq.SUB, subscribe=msg_id, bind=False
+            )
+
+            LOG.debug(_("Sending cast"))
+            _cast(addr, context, msg_id, topic, payload)
+
+            LOG.debug(_("Cast sent; Waiting reply"))
+            # Blocks until receives reply
+            msg = msg_waiter.recv()
+            LOG.debug(_("Received message: %s"), msg)
+            LOG.debug(_("Unpacking response"))
+            responses = _deserialize(msg[-1])
+        # ZMQError trumps the Timeout error.
+        except zmq.ZMQError:
+            raise RPCException("ZMQ Socket Error")
+        finally:
+            if 'msg_waiter' in vars():
+                msg_waiter.close()
+
+    # It seems we don't need to do all of the following,
+    # but perhaps it would be useful for multicall?
+    # One effect of this is that we're checking all
+    # responses for Exceptions.
+    for resp in responses:
+        if isinstance(resp, types.DictType) and 'exc' in resp:
+            raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+
+    return responses[-1]
+
+
+def _multi_send(method, context, topic, msg, timeout=None):
+    """
+    Wraps the sending of messages,
+    dispatches to the matchmaker and sends
+    message to all relevant hosts.
+    """
+    conf = FLAGS
+    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+
+    queues = matchmaker.queues(topic)
+    LOG.debug(_("Sending message(s) to: %s"), queues)
+
+    # Don't stack if we have no matchmaker results
+    if len(queues) == 0:
+        LOG.warn(_("No matchmaker results. Not casting."))
+        # While not strictly a timeout, callers know how to handle
+        # this exception and a timeout isn't too big a lie.
+        raise rpc_common.Timeout, "No match from matchmaker."
+
+    # This supports brokerless fanout (addresses > 1)
+    for queue in queues:
+        (_topic, ip_addr) = queue
+        _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
+
+        if method.__name__ == '_cast':
+            eventlet.spawn_n(method, _addr, context,
+                             _topic, _topic, msg, timeout)
+            return
+        return method(_addr, context, _topic, _topic, msg, timeout)
+
+
+def create_connection(conf, new=True):
+    return Connection(conf)
+
+
+def multicall(conf, *args, **kwargs):
+    """Multiple calls."""
+    register_opts(conf)
+    return _multi_send(_call, *args, **kwargs)
+
+
+def call(conf, *args, **kwargs):
+    """Send a message, expect a response."""
+    register_opts(conf)
+    data = _multi_send(_call, *args, **kwargs)
+    return data[-1]
+
+
+def cast(conf, *args, **kwargs):
+    """Send a message expecting no reply."""
+    register_opts(conf)
+    _multi_send(_cast, *args, **kwargs)
+
+
+def fanout_cast(conf, context, topic, msg, **kwargs):
+    """Send a message to all listening and expect no reply."""
+    register_opts(conf)
+    # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
+    # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
+    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+def notify(conf, context, topic, msg, **kwargs):
+    """
+    Send notification event.
+    Notifications are sent to topic-priority.
+    This differs from the AMQP drivers which send to topic.priority.
+    """
+    register_opts(conf)
+    # NOTE(ewindisch): dot-priority in rpc notifier does not
+    # work with our assumptions.
+    topic.replace('.', '-')
+    cast(conf, context, topic, msg, **kwargs)
+
+
+def cleanup():
+    """Clean up resources in use by implementation."""
+    global ZMQ_CTX
+    global matchmaker
+    matchmaker = None
+    ZMQ_CTX.destroy()
+    ZMQ_CTX = None
+
+
+def register_opts(conf):
+    """Registration of options for this driver."""
+    #NOTE(ewindisch): ZMQ_CTX and matchmaker
+    # are initialized here as this is as good
+    # an initialization method as any.
+
+    # We memoize through these globals
+    global ZMQ_CTX
+    global matchmaker
+    global FLAGS
+
+    if not FLAGS:
+        conf.register_opts(zmq_opts)
+        FLAGS = conf
+    # Don't re-set, if this method is called twice.
+    if not ZMQ_CTX:
+        ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+    if not matchmaker:
+        # rpc_zmq_matchmaker should be set to a 'module.Class'
+        mm_path = conf.rpc_zmq_matchmaker.split('.')
+        mm_module = '.'.join(mm_path[:-1])
+        mm_class = mm_path[-1]
+
+        # Only initialize a class.
+        if mm_path[-1][0] not in string.ascii_uppercase:
+            LOG.error(_("Matchmaker could not be loaded.\n"
+                      "rpc_zmq_matchmaker is not a class."))
+            raise RPCException(_("Error loading Matchmaker."))
+
+        mm_impl = importutils.import_module(mm_module)
+        mm_constructor = getattr(mm_impl, mm_class)
+        matchmaker = mm_constructor()
diff --git a/heat/openstack/common/rpc/matchmaker.py b/heat/openstack/common/rpc/matchmaker.py
new file mode 100644 (file)
index 0000000..05534e3
--- /dev/null
@@ -0,0 +1,258 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 Cloudscaling Group, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import contextlib
+import itertools
+import json
+import logging
+
+from heat.openstack.common import cfg
+from heat.openstack.common.gettextutils import _
+
+
+matchmaker_opts = [
+    # Matchmaker ring file
+    cfg.StrOpt('matchmaker_ringfile',
+               default='/etc/nova/matchmaker_ring.json',
+               help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts)
+LOG = logging.getLogger(__name__)
+contextmanager = contextlib.contextmanager
+
+
+class MatchMakerException(Exception):
+    """Signified a match could not be found."""
+    message = _("Match not found by MatchMaker.")
+
+
+class Exchange(object):
+    """
+    Implements lookups.
+    Subclass this to support hashtables, dns, etc.
+    """
+    def __init__(self):
+        pass
+
+    def run(self, key):
+        raise NotImplementedError()
+
+
+class Binding(object):
+    """
+    A binding on which to perform a lookup.
+    """
+    def __init__(self):
+        pass
+
+    def test(self, key):
+        raise NotImplementedError()
+
+
+class MatchMakerBase(object):
+    """Match Maker Base Class."""
+
+    def __init__(self):
+        # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+        self.bindings = []
+
+    def add_binding(self, binding, rule, last=True):
+        self.bindings.append((binding, rule, False, last))
+
+    #NOTE(ewindisch): kept the following method in case we implement the
+    #                 underlying support.
+    #def add_negate_binding(self, binding, rule, last=True):
+    #    self.bindings.append((binding, rule, True, last))
+
+    def queues(self, key):
+        workers = []
+
+        # bit is for negate bindings - if we choose to implement it.
+        # last stops processing rules if this matches.
+        for (binding, exchange, bit, last) in self.bindings:
+            if binding.test(key):
+                workers.extend(exchange.run(key))
+
+                # Support last.
+                if last:
+                    return workers
+        return workers
+
+
+class DirectBinding(Binding):
+    """
+    Specifies a host in the key via a '.' character
+    Although dots are used in the key, the behavior here is
+    that it maps directly to a host, thus direct.
+    """
+    def test(self, key):
+        if '.' in key:
+            return True
+        return False
+
+
+class TopicBinding(Binding):
+    """
+    Where a 'bare' key without dots.
+    AMQP generally considers topic exchanges to be those *with* dots,
+    but we deviate here in terminology as the behavior here matches
+    that of a topic exchange (whereas where there are dots, behavior
+    matches that of a direct exchange.
+    """
+    def test(self, key):
+        if '.' not in key:
+            return True
+        return False
+
+
+class FanoutBinding(Binding):
+    """Match on fanout keys, where key starts with 'fanout.' string."""
+    def test(self, key):
+        if key.startswith('fanout~'):
+            return True
+        return False
+
+
+class StubExchange(Exchange):
+    """Exchange that does nothing."""
+    def run(self, key):
+        return [(key, None)]
+
+
+class RingExchange(Exchange):
+    """
+    Match Maker where hosts are loaded from a static file containing
+    a hashmap (JSON formatted).
+
+    __init__ takes optional ring dictionary argument, otherwise
+    loads the ringfile from CONF.mathcmaker_ringfile.
+    """
+    def __init__(self, ring=None):
+        super(RingExchange, self).__init__()
+
+        if ring:
+            self.ring = ring
+        else:
+            fh = open(CONF.matchmaker_ringfile, 'r')
+            self.ring = json.load(fh)
+            fh.close()
+
+        self.ring0 = {}
+        for k in self.ring.keys():
+            self.ring0[k] = itertools.cycle(self.ring[k])
+
+    def _ring_has(self, key):
+        if key in self.ring0:
+            return True
+        return False
+
+
+class RoundRobinRingExchange(RingExchange):
+    """A Topic Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(RoundRobinRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        if not self._ring_has(key):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (key, )
+            )
+            return []
+        host = next(self.ring0[key])
+        return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+    """Fanout Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(FanoutRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        # Assume starts with "fanout~", strip it for lookup.
+        nkey = key.split('fanout~')[1:][0]
+        if not self._ring_has(nkey):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (nkey, )
+            )
+            return []
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class LocalhostExchange(Exchange):
+    """Exchange where all direct topics are local."""
+    def __init__(self):
+        super(Exchange, self).__init__()
+
+    def run(self, key):
+        return [(key.split('.')[0] + '.localhost', 'localhost')]
+
+
+class DirectExchange(Exchange):
+    """
+    Exchange where all topic keys are split, sending to second half.
+    i.e. "compute.host" sends a message to "compute" running on "host"
+    """
+    def __init__(self):
+        super(Exchange, self).__init__()
+
+    def run(self, key):
+        b, e = key.split('.', 1)
+        return [(b, e)]
+
+
+class MatchMakerRing(MatchMakerBase):
+    """
+    Match Maker where hosts are loaded from a static hashmap.
+    """
+    def __init__(self, ring=None):
+        super(MatchMakerRing, self).__init__()
+        self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
+        self.add_binding(DirectBinding(), DirectExchange())
+        self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
+
+
+class MatchMakerLocalhost(MatchMakerBase):
+    """
+    Match Maker where all bare topics resolve to localhost.
+    Useful for testing.
+    """
+    def __init__(self):
+        super(MatchMakerLocalhost, self).__init__()
+        self.add_binding(FanoutBinding(), LocalhostExchange())
+        self.add_binding(DirectBinding(), DirectExchange())
+        self.add_binding(TopicBinding(), LocalhostExchange())
+
+
+class MatchMakerStub(MatchMakerBase):
+    """
+    Match Maker where topics are untouched.
+    Useful for testing, or for AMQP/brokered queues.
+    Will not work where knowledge of hosts is known (i.e. zeromq)
+    """
+    def __init__(self):
+        super(MatchMakerLocalhost, self).__init__()
+
+        self.add_binding(FanoutBinding(), StubExchange())
+        self.add_binding(DirectBinding(), StubExchange())
+        self.add_binding(TopicBinding(), StubExchange())
diff --git a/heat/openstack/common/rpc/proxy.py b/heat/openstack/common/rpc/proxy.py
new file mode 100644 (file)
index 0000000..db96a27
--- /dev/null
@@ -0,0 +1,161 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+    rpc/dispatcher.py
+"""
+
+
+from heat.openstack.common import rpc
+
+
+class RpcProxy(object):
+    """A helper class for rpc clients.
+
+    This class is a wrapper around the RPC client API.  It allows you to
+    specify the topic and API version in a single place.  This is intended to
+    be used as a base class for a class that implements the client side of an
+    rpc API.
+    """
+
+    def __init__(self, topic, default_version):
+        """Initialize an RpcProxy.
+
+        :param topic: The topic to use for all messages.
+        :param default_version: The default API version to request in all
+               outgoing messages.  This can be overridden on a per-message
+               basis.
+        """
+        self.topic = topic
+        self.default_version = default_version
+        super(RpcProxy, self).__init__()
+
+    def _set_version(self, msg, vers):
+        """Helper method to set the version in a message.
+
+        :param msg: The message having a version added to it.
+        :param vers: The version number to add to the message.
+        """
+        msg['version'] = vers if vers else self.default_version
+
+    def _get_topic(self, topic):
+        """Return the topic to use for a message."""
+        return topic if topic else self.topic
+
+    @staticmethod
+    def make_msg(method, **kwargs):
+        return {'method': method, 'args': kwargs}
+
+    def call(self, context, msg, topic=None, version=None, timeout=None):
+        """rpc.call() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param timeout: (Optional) A timeout to use when waiting for the
+               response.  If no timeout is specified, a default timeout will be
+               used that is usually sufficient.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: The return value from the remote method.
+        """
+        self._set_version(msg, version)
+        return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+    def multicall(self, context, msg, topic=None, version=None, timeout=None):
+        """rpc.multicall() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param timeout: (Optional) A timeout to use when waiting for the
+               response.  If no timeout is specified, a default timeout will be
+               used that is usually sufficient.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: An iterator that lets you process each of the returned values
+                  from the remote method as they arrive.
+        """
+        self._set_version(msg, version)
+        return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+    def cast(self, context, msg, topic=None, version=None):
+        """rpc.cast() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.cast() does not wait on any return value from the
+                  remote method.
+        """
+        self._set_version(msg, version)
+        rpc.cast(context, self._get_topic(topic), msg)
+
+    def fanout_cast(self, context, msg, version=None):
+        """rpc.fanout_cast() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.fanout_cast() does not wait on any return value
+                  from the remote method.
+        """
+        self._set_version(msg, version)
+        rpc.fanout_cast(context, self.topic, msg)
+
+    def cast_to_server(self, context, server_params, msg, topic=None,
+                       version=None):
+        """rpc.cast_to_server() a remote method.
+
+        :param context: The request context
+        :param server_params: Server parameters.  See rpc.cast_to_server() for
+               details.
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.cast_to_server() does not wait on any
+                  return values.
+        """
+        self._set_version(msg, version)
+        rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+    def fanout_cast_to_server(self, context, server_params, msg, version=None):
+        """rpc.fanout_cast_to_server() a remote method.
+
+        :param context: The request context
+        :param server_params: Server parameters.  See rpc.cast_to_server() for
+               details.
+        :param msg: The message to send, including the method and args.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.fanout_cast_to_server() does not wait on any
+                  return values.
+        """
+        self._set_version(msg, version)
+        rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
diff --git a/heat/rpc/common.py b/heat/rpc/common.py
deleted file mode 100644 (file)
index f0792c8..0000000
+++ /dev/null
@@ -1,129 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 Red Hat, Inc.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import copy
-
-from heat.openstack.common import cfg
-from heat.openstack.common import exception
-from heat.openstack.common import log as logging
-from heat.common import config
-
-LOG = logging.getLogger(__name__)
-
-
-class RemoteError(exception.OpenstackException):
-    """Signifies that a remote class has raised an exception.
-
-    Contains a string representation of the type of the original exception,
-    the value of the original exception, and the traceback.  These are
-    sent to the parent as a joined string so printing the exception
-    contains all of the relevant info.
-
-    """
-    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
-
-    def __init__(self, exc_type=None, value=None, traceback=None):
-        self.exc_type = exc_type
-        self.value = value
-        self.traceback = traceback
-        super(RemoteError, self).__init__(exc_type=exc_type,
-                                          value=value,
-                                          traceback=traceback)
-
-
-class Timeout(exception.OpenstackException):
-    """Signifies that a timeout has occurred.
-
-    This exception is raised if the rpc_response_timeout is reached while
-    waiting for a response from the remote side.
-    """
-    message = _("Timeout while waiting on RPC response.")
-
-
-class Connection(object):
-    """A connection, returned by rpc.create_connection().
-
-    This class represents a connection to the message bus used for rpc.
-    An instance of this class should never be created by users of the rpc API.
-    Use rpc.create_connection() instead.
-    """
-    def close(self):
-        """Close the connection.
-
-        This method must be called when the connection will no longer be used.
-        It will ensure that any resources associated with the connection, such
-        as a network connection, and cleaned up.
-        """
-        raise NotImplementedError()
-
-    def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer on this connection.
-
-        A consumer is associated with a message queue on the backend message
-        bus.  The consumer will read messages from the queue, unpack them, and
-        dispatch them to the proxy object.  The contents of the message pulled
-        off of the queue will determine which method gets called on the proxy
-        object.
-
-        :param topic: This is a name associated with what to consume from.
-                      Multiple instances of a service may consume from the same
-                      topic. For example, all instances of nova-compute consume
-                      from a queue called "compute".  In that case, the
-                      messages will get distributed amongst the consumers in a
-                      round-robin fashion if fanout=False.  If fanout=True,
-                      every consumer associated with this topic will get a
-                      copy of every message.
-        :param proxy: The object that will handle all incoming messages.
-        :param fanout: Whether or not this is a fanout topic.  See the
-                       documentation for the topic parameter for some
-                       additional comments on this.
-        """
-        raise NotImplementedError()
-
-    def consume_in_thread(self):
-        """Spawn a thread to handle incoming messages.
-
-        Spawn a thread that will be responsible for handling all incoming
-        messages for consumers that were set up on this connection.
-
-        Message dispatching inside of this is expected to be implemented in a
-        non-blocking manner.  An example implementation would be having this
-        thread pull messages in for all of the consumers, but utilize a thread
-        pool for dispatching the messages to the proxy objects.
-        """
-        raise NotImplementedError()
-
-
-def _safe_log(log_func, msg, msg_data):
-    """Sanitizes the msg_data field before logging."""
-    SANITIZE = {
-                'set_admin_password': ('new_pass',),
-                'run_instance': ('admin_password',),
-               }
-    method = msg_data['method']
-    if method in SANITIZE:
-        msg_data = copy.deepcopy(msg_data)
-        args_to_sanitize = SANITIZE[method]
-        for arg in args_to_sanitize:
-            try:
-                msg_data['args'][arg] = "<SANITIZED>"
-            except KeyError:
-                pass
-
-    return log_func(msg, msg_data)
index 968609e29ca5dff0df14aaa22740a9def604bfa6..41c7a23b9551f90838ef25c426cb17389af8cc2e 100644 (file)
@@ -25,10 +25,10 @@ import os
 import eventlet
 import greenlet
 
+from heat.openstack.common import rpc
 from heat.openstack.common import cfg
 from heat.openstack.common import importutils
 from heat.openstack.common import log as logging
-from heat import rpc
 
 from heat.common import utils as heat_utils
 from heat.common import exception
@@ -129,13 +129,15 @@ class Service(object):
         LOG.debug(_("Creating Consumer connection for Service %s") %
                   self.topic)
 
+        rpc_dispatcher = self.manager.create_rpc_dispatcher()
+
         # Share this same connection for these Consumers
-        self.conn.create_consumer(self.topic, self, fanout=False)
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
 
         node_topic = '%s.%s' % (self.topic, self.host)
-        self.conn.create_consumer(node_topic, self, fanout=False)
+        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
 
-        self.conn.create_consumer(self.topic, self, fanout=True)
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
 
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
index 7a14d218eba50a6250ab39fa37abac82a6009637..a87665559738c915e6dd8c43aff6622e264e9059 100644 (file)
@@ -1,7 +1,7 @@
 [DEFAULT]
 
 # The list of modules to copy from openstack-common
-modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier
+modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils
 
 # The base module to hold the copy of openstack.common
 base=heat