]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Initial work on migrating heat-engine to rpc
authorAngus Salkeld <asalkeld@redhat.com>
Tue, 27 Mar 2012 00:38:48 +0000 (11:38 +1100)
committerAngus Salkeld <asalkeld@redhat.com>
Tue, 27 Mar 2012 00:38:48 +0000 (11:38 +1100)
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
15 files changed:
bin/heat-engine
heat/common/config.py
heat/common/exception.py
heat/common/utils.py
heat/context.py [new file with mode: 0644]
heat/engine/manager.py [new file with mode: 0644]
heat/local.py [new file with mode: 0644]
heat/manager.py [new file with mode: 0644]
heat/rpc/__init__.py [new file with mode: 0644]
heat/rpc/amqp.py [new file with mode: 0644]
heat/rpc/common.py [new file with mode: 0644]
heat/rpc/impl_fake.py [new file with mode: 0644]
heat/rpc/impl_kombu.py [new file with mode: 0644]
heat/rpc/impl_qpid.py [new file with mode: 0644]
heat/service.py [new file with mode: 0644]

index df0176b595893caac32cf53a152bff9e99f34e9b..7f9bbf607d708f5bf752060d3b1c0f51fc20c6a6 100755 (executable)
@@ -21,34 +21,37 @@ which then calls into this engine.
 """
 
 import gettext
+import eventlet
+eventlet.monkey_patch()
+
 import os
 import sys
+import logging
 
 # If ../heat/__init__.py exists, add ../ to Python search path, so that
 # it will override what happens to be installed in /usr/(local/)lib/python...
-possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
                                    os.pardir,
                                    os.pardir))
-if os.path.exists(os.path.join(possible_topdir, 'heat', '__init__.py')):
-    sys.path.insert(0, possible_topdir)
+if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')):
+    sys.path.insert(0, POSSIBLE_TOPDIR)
 
 gettext.install('heat', unicode=1)
 
+from heat import service
+from heat.common import utils
 from heat.common import config
-from heat.common import wsgi
 
+logger = logging.getLogger('heat.engine')
 
 if __name__ == '__main__':
-    try:
-        conf = config.HeatConfigOpts()
-        conf()
-
-        app = config.load_paste_app(conf)
-
-        port = config.DEFAULT_PORT+1
-        print 'Starting Heat Engine on port %s' % port
-        server = wsgi.Server()
-        server.start(app, conf, default_port=port)
-        server.wait()
-    except RuntimeError, e:
-        sys.exit("ERROR: %s" % e)
+
+    config.FLAGS(sys.argv)
+    config.setup_logging(config.FLAGS)
+
+    #utils.monkey_patch()
+    server = service.Service.create(binary='heat-engine',
+                                    topic='engine',
+                                    manager='heat.engine.manager.EngineManager')
+    service.serve(server)
+    service.wait()
index 5f4466375e67554e39c26964a681d692e6e8a13c..eebe86ccfec73c3984ffda791812b3cac70760cf 100644 (file)
@@ -22,6 +22,7 @@ import logging
 import logging.config
 import logging.handlers
 import os
+import socket
 import sys
 
 from heat import version
@@ -37,6 +38,7 @@ paste_deploy_opts = [
     ]
 
 
+
 class HeatConfigOpts(cfg.CommonConfigOpts):
 
     def __init__(self, default_config_files=None, **kwargs):
@@ -46,13 +48,27 @@ class HeatConfigOpts(cfg.CommonConfigOpts):
             default_config_files=default_config_files,
             **kwargs)
 
+class HeatEngineConfigOpts(cfg.CommonConfigOpts):
+    engine_opts = [
+        cfg.StrOpt('host',
+                   default=socket.gethostname(),
+                   help='Name of this node.  This can be an opaque identifier.  '
+                        'It is not necessarily a hostname, FQDN, or IP address.'),
+        cfg.StrOpt('instance_driver',
+                   default='heat.engine.nova',
+                   help='Driver to use for controlling instances'),
+    ]
 
-class HeatCacheConfigOpts(HeatConfigOpts):
-
-    def __init__(self, **kwargs):
+    def __init__(self, default_config_files=None, **kwargs):
+        super(HeatEngineConfigOpts, self).__init__(
+            project='heat',
+            version='%%prog %s' % version.version_string(),
+            **kwargs)
         config_files = cfg.find_config_files(project='heat',
-                                             prog='heat-cache')
-        super(HeatCacheConfigOpts, self).__init__(config_files, **kwargs)
+                                             prog='heat-engine')
+        self.register_cli_opts(self.engine_opts)
+
+FLAGS = HeatEngineConfigOpts()
 
 
 def setup_logging(conf):
index bf33872984b73c5bc0b491dfc508f337957f2962..e806cd497e01a072103066d4b899bbe7519c6b79 100644 (file)
@@ -17,6 +17,7 @@
 
 """Heat exception subclasses"""
 
+import functools
 import urlparse
 
 
@@ -53,6 +54,81 @@ class HeatException(Exception):
     def __str__(self):
         return self._error_string
 
+def wrap_exception(notifier=None, publisher_id=None, event_type=None,
+                   level=None):
+    """This decorator wraps a method to catch any exceptions that may
+    get thrown. It logs the exception as well as optionally sending
+    it to the notification system.
+    """
+    # TODO(sandy): Find a way to import nova.notifier.api so we don't have
+    # to pass it in as a parameter. Otherwise we get a cyclic import of
+    # nova.notifier.api -> nova.utils -> nova.exception :(
+    # TODO(johannes): Also, it would be nice to use
+    # utils.save_and_reraise_exception() without an import loop
+    def inner(f):
+        def wrapped(*args, **kw):
+            try:
+                return f(*args, **kw)
+            except Exception, e:
+                # Save exception since it can be clobbered during processing
+                # below before we can re-raise
+                exc_info = sys.exc_info()
+
+                if notifier:
+                    payload = dict(args=args, exception=e)
+                    payload.update(kw)
+
+                    # Use a temp vars so we don't shadow
+                    # our outer definitions.
+                    temp_level = level
+                    if not temp_level:
+                        temp_level = notifier.ERROR
+
+                    temp_type = event_type
+                    if not temp_type:
+                        # If f has multiple decorators, they must use
+                        # functools.wraps to ensure the name is
+                        # propagated.
+                        temp_type = f.__name__
+
+                    notifier.notify(publisher_id, temp_type, temp_level,
+                                    payload)
+
+                # re-raise original exception since it may have been clobbered
+                raise exc_info[0], exc_info[1], exc_info[2]
+
+        return functools.wraps(f)(wrapped)
+    return inner
+
+
+class NovaException(Exception):
+    """Base Nova Exception
+
+    To correctly use this class, inherit from it and define
+    a 'message' property. That message will get printf'd
+    with the keyword arguments provided to the constructor.
+
+    """
+    message = _("An unknown exception occurred.")
+
+    def __init__(self, message=None, **kwargs):
+        self.kwargs = kwargs
+
+        if 'code' not in self.kwargs:
+            try:
+                self.kwargs['code'] = self.code
+            except AttributeError:
+                pass
+
+        if not message:
+            try:
+                message = self.message % kwargs
+
+            except Exception as e:
+                # at least get the core message out if something happened
+                message = self.message
+
+        super(NovaException, self).__init__(message)
 
 class MissingArgumentError(HeatException):
     message = _("Missing required argument.")
index 4bd3973f0b731e5803ae378133aa8d1623a4b377..a1c55c73d656989485d595190127fed83db058c5 100644 (file)
@@ -20,6 +20,7 @@
 System-level utilities and helper functions.
 """
 
+import datetime
 import sys
 import uuid
 
@@ -75,3 +76,14 @@ def import_object(import_str):
 
 def generate_uuid():
     return str(uuid.uuid4())
+
+def gen_uuid():
+    return uuid.uuid4()
+
+def utcnow():
+    """Overridable version of utils.utcnow."""
+    if utcnow.override_time:
+        return utcnow.override_time
+    return datetime.datetime.utcnow()
+
+utcnow.override_time = None
diff --git a/heat/context.py b/heat/context.py
new file mode 100644 (file)
index 0000000..4ab7afe
--- /dev/null
@@ -0,0 +1,123 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""RequestContext: context for requests that persist through all of nova."""
+
+import copy
+import logging
+
+from heat import local
+from heat.common import utils
+
+
+LOG = logging.getLogger(__name__)
+
+
+def generate_request_id():
+    return 'req-' + str(utils.gen_uuid())
+
+
+class RequestContext(object):
+    """Security context and request information.
+
+    Represents the user taking a given action within the system.
+
+    """
+
+    def __init__(self, user_id, project_id, is_admin=None, read_deleted="no",
+                 roles=None, remote_address=None, timestamp=None,
+                 request_id=None, auth_token=None, overwrite=True, **kwargs):
+        """
+        :param read_deleted: 'no' indicates deleted records are hidden, 'yes'
+            indicates deleted records are visible, 'only' indicates that
+            *only* deleted records are visible.
+
+        :param overwrite: Set to False to ensure that the greenthread local
+            copy of the index is not overwritten.
+
+        :param kwargs: Extra arguments that might be present, but we ignore
+            because they possibly came in from older rpc messages.
+        """
+        if read_deleted not in ('no', 'yes', 'only'):
+            raise ValueError(_("read_deleted can only be one of 'no', "
+                               "'yes' or 'only', not %r") % read_deleted)
+        if kwargs:
+            LOG.warn(_('Arguments dropped when creating context: %s') %
+                    str(kwargs))
+
+        self.user_id = user_id
+        self.project_id = project_id
+        self.roles = roles or []
+        self.is_admin = is_admin
+        if self.is_admin is None:
+            self.is_admin = 'admin' in [x.lower() for x in self.roles]
+        elif self.is_admin and 'admin' not in self.roles:
+            self.roles.append('admin')
+        self.read_deleted = read_deleted
+        self.remote_address = remote_address
+        if not timestamp:
+            timestamp = utils.utcnow()
+        if isinstance(timestamp, basestring):
+            timestamp = utils.parse_strtime(timestamp)
+        self.timestamp = timestamp
+        if not request_id:
+            request_id = generate_request_id()
+        self.request_id = request_id
+        self.auth_token = auth_token
+        if overwrite or not hasattr(local.store, 'context'):
+            self.update_store()
+
+    def update_store(self):
+        local.store.context = self
+
+    def to_dict(self):
+        return {'user_id': self.user_id,
+                'project_id': self.project_id,
+                'is_admin': self.is_admin,
+                'read_deleted': self.read_deleted,
+                'roles': self.roles,
+                'remote_address': self.remote_address,
+                'timestamp': utils.strtime(self.timestamp),
+                'request_id': self.request_id,
+                'auth_token': self.auth_token}
+
+    @classmethod
+    def from_dict(cls, values):
+        return cls(**values)
+
+    def elevated(self, read_deleted=None, overwrite=False):
+        """Return a version of this context with admin flag set."""
+        context = copy.copy(self)
+        context.is_admin = True
+
+        if 'admin' not in context.roles:
+            context.roles.append('admin')
+
+        if read_deleted is not None:
+            context.read_deleted = read_deleted
+
+        return context
+
+
+def get_admin_context(read_deleted="no"):
+    return RequestContext(user_id=None,
+                          project_id=None,
+                          is_admin=True,
+                          read_deleted=read_deleted,
+                          overwrite=False)
diff --git a/heat/engine/manager.py b/heat/engine/manager.py
new file mode 100644 (file)
index 0000000..1c3887d
--- /dev/null
@@ -0,0 +1,65 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Handles all processes relating to instances (guest vms).
+
+The :py:class:`ComputeManager` class is a :py:class:`heat.manager.Manager` that
+handles RPC calls relating to creating instances.  It is responsible for
+building a disk image, launching it via the underlying virtualization driver,
+responding to calls to check its state, attaching persistent storage, and
+terminating it.
+
+**Related Flags**
+
+:instances_path:  Where instances are kept on disk
+:compute_driver:  Name of class that is used to handle virtualization, loaded
+                  by :func:`heat.utils.import_object`
+
+"""
+
+import contextlib
+import functools
+import os
+import socket
+import sys
+import tempfile
+import time
+import traceback
+import logging
+
+from eventlet import greenthread
+
+import heat.context
+from heat import exception
+from heat import manager
+from heat.openstack.common import cfg
+from heat import rpc
+
+LOG = logging.getLogger(__name__)
+
+
+class EngineManager(manager.Manager):
+    """Manages the running instances from creation to destruction."""
+
+    def __init__(self, *args, **kwargs):
+        """Load configuration options and connect to the hypervisor."""
+
+    def create(self, template, stack_id):
+        pass
+
diff --git a/heat/local.py b/heat/local.py
new file mode 100644 (file)
index 0000000..19d9627
--- /dev/null
@@ -0,0 +1,37 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Greenthread local storage of variables using weak references"""
+
+import weakref
+
+from eventlet import corolocal
+
+
+class WeakLocal(corolocal.local):
+    def __getattribute__(self, attr):
+        rval = corolocal.local.__getattribute__(self, attr)
+        if rval:
+            rval = rval()
+        return rval
+
+    def __setattr__(self, attr, value):
+        value = weakref.ref(value)
+        return corolocal.local.__setattr__(self, attr, value)
+
+
+store = WeakLocal()
diff --git a/heat/manager.py b/heat/manager.py
new file mode 100644 (file)
index 0000000..3921281
--- /dev/null
@@ -0,0 +1,175 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Base Manager class.
+
+Managers are responsible for a certain aspect of the system.  It is a logical
+grouping of code relating to a portion of the system.  In general other
+components should be using the manager to make changes to the components that
+it is responsible for.
+
+For example, other components that need to deal with volumes in some way,
+should do so by calling methods on the VolumeManager instead of directly
+changing fields in the database.  This allows us to keep all of the code
+relating to volumes in the same place.
+
+We have adopted a basic strategy of Smart managers and dumb data, which means
+rather than attaching methods to data objects, components should call manager
+methods that act on the data.
+
+Methods on managers that can be executed locally should be called directly. If
+a particular method must execute on a remote host, this should be done via rpc
+to the service that wraps the manager
+
+Managers should be responsible for most of the db access, and
+non-implementation specific data.  Anything implementation specific that can't
+be generalized should be done by the Driver.
+
+In general, we prefer to have one manager with multiple drivers for different
+implementations, but sometimes it makes sense to have multiple managers.  You
+can think of it this way: Abstract different overall strategies at the manager
+level(FlatNetwork vs VlanNetwork), and different implementations at the driver
+level(LinuxNetDriver vs CiscoNetDriver).
+
+Managers will often provide methods for initial setup of a host or periodic
+tasks to a wrapping service.
+
+This module provides Manager, a base class for managers.
+
+"""
+
+import logging
+
+from heat import version
+from heat.common import config
+
+FLAGS = config.FLAGS
+LOG = logging.getLogger(__name__)
+
+
+def periodic_task(*args, **kwargs):
+    """Decorator to indicate that a method is a periodic task.
+
+    This decorator can be used in two ways:
+
+        1. Without arguments '@periodic_task', this will be run on every tick
+           of the periodic scheduler.
+
+        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
+           run on every N ticks of the periodic scheduler.
+    """
+    def decorator(f):
+        f._periodic_task = True
+        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+        return f
+
+    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
+    # and without parens.
+    #
+    # In the 'with-parens' case (with kwargs present), this function needs to
+    # return a decorator function since the interpreter will invoke it like:
+    #
+    #   periodic_task(*args, **kwargs)(f)
+    #
+    # In the 'without-parens' case, the original function will be passed
+    # in as the first argument, like:
+    #
+    #   periodic_task(f)
+    if kwargs:
+        return decorator
+    else:
+        return decorator(args[0])
+
+
+class ManagerMeta(type):
+    def __init__(cls, names, bases, dict_):
+        """Metaclass that allows us to collect decorated periodic tasks."""
+        super(ManagerMeta, cls).__init__(names, bases, dict_)
+
+        # NOTE(sirp): if the attribute is not present then we must be the base
+        # class, so, go ahead an initialize it. If the attribute is present,
+        # then we're a subclass so make a copy of it so we don't step on our
+        # parent's toes.
+        try:
+            cls._periodic_tasks = cls._periodic_tasks[:]
+        except AttributeError:
+            cls._periodic_tasks = []
+
+        try:
+            cls._ticks_to_skip = cls._ticks_to_skip.copy()
+        except AttributeError:
+            cls._ticks_to_skip = {}
+
+        for value in cls.__dict__.values():
+            if getattr(value, '_periodic_task', False):
+                task = value
+                name = task.__name__
+                cls._periodic_tasks.append((name, task))
+                cls._ticks_to_skip[name] = task._ticks_between_runs
+
+
+class Manager(object):
+    __metaclass__ = ManagerMeta
+
+    def __init__(self, host=None, db_driver=None):
+        if not host:
+            host = FLAGS.host
+        self.host = host
+        super(Manager, self).__init__(db_driver)
+
+    def periodic_tasks(self, context, raise_on_error=False):
+        """Tasks to be run at a periodic interval."""
+        for task_name, task in self._periodic_tasks:
+            full_task_name = '.'.join([self.__class__.__name__, task_name])
+
+            ticks_to_skip = self._ticks_to_skip[task_name]
+            if ticks_to_skip > 0:
+                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
+                            " ticks left until next run"), locals())
+                self._ticks_to_skip[task_name] -= 1
+                continue
+
+            self._ticks_to_skip[task_name] = task._ticks_between_runs
+            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+
+            try:
+                task(self, context)
+            except Exception as e:
+                if raise_on_error:
+                    raise
+                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+                              locals())
+
+    def init_host(self):
+        """Handle initialization if this is a standalone service.
+
+        Child classes should override this method.
+
+        """
+        pass
+
+    def service_version(self, context):
+        return version.version_string()
+
+    def service_config(self, context):
+        config = {}
+        for key in FLAGS:
+            config[key] = FLAGS.get(key, None)
+        return config
+
+
diff --git a/heat/rpc/__init__.py b/heat/rpc/__init__.py
new file mode 100644 (file)
index 0000000..6ad0992
--- /dev/null
@@ -0,0 +1,202 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from heat.openstack.common import cfg
+from heat.common import utils
+from heat.common import config
+
+
+rpc_backend_opt = cfg.StrOpt('rpc_backend',
+        default='heat.rpc.impl_qpid',
+        help="The messaging module to use, defaults to kombu.")
+
+FLAGS = config.FLAGS
+FLAGS.register_opt(rpc_backend_opt)
+
+
+def create_connection(new=True):
+    """Create a connection to the message bus used for rpc.
+
+    For some example usage of creating a connection and some consumers on that
+    connection, see nova.service.
+
+    :param new: Whether or not to create a new connection.  A new connection
+                will be created by default.  If new is False, the
+                implementation is free to return an existing connection from a
+                pool.
+
+    :returns: An instance of nova.rpc.common.Connection
+    """
+    return _get_impl().create_connection(new=new)
+
+
+def call(context, topic, msg, timeout=None):
+    """Invoke a remote method that returns something.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param topic: The topic to send the rpc message to.  This correlates to the
+                  topic argument of
+                  nova.rpc.common.Connection.create_consumer() and only applies
+                  when the consumer was created with fanout=False.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+    :param timeout: int, number of seconds to use for a response timeout.
+                    If set, this overrides the rpc_response_timeout option.
+
+    :returns: A dict from the remote method.
+
+    :raises: nova.rpc.common.Timeout if a complete response is not received
+             before the timeout is reached.
+    """
+    return _get_impl().call(context, topic, msg, timeout)
+
+
+def cast(context, topic, msg):
+    """Invoke a remote method that does not return anything.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param topic: The topic to send the rpc message to.  This correlates to the
+                  topic argument of
+                  nova.rpc.common.Connection.create_consumer() and only applies
+                  when the consumer was created with fanout=False.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+
+    :returns: None
+    """
+    return _get_impl().cast(context, topic, msg)
+
+
+def fanout_cast(context, topic, msg):
+    """Broadcast a remote method invocation with no return.
+
+    This method will get invoked on all consumers that were set up with this
+    topic name and fanout=True.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param topic: The topic to send the rpc message to.  This correlates to the
+                  topic argument of
+                  nova.rpc.common.Connection.create_consumer() and only applies
+                  when the consumer was created with fanout=True.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+
+    :returns: None
+    """
+    return _get_impl().fanout_cast(context, topic, msg)
+
+
+def multicall(context, topic, msg, timeout=None):
+    """Invoke a remote method and get back an iterator.
+
+    In this case, the remote method will be returning multiple values in
+    separate messages, so the return values can be processed as the come in via
+    an iterator.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param topic: The topic to send the rpc message to.  This correlates to the
+                  topic argument of
+                  nova.rpc.common.Connection.create_consumer() and only applies
+                  when the consumer was created with fanout=False.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+    :param timeout: int, number of seconds to use for a response timeout.
+                    If set, this overrides the rpc_response_timeout option.
+
+    :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
+              an index that starts at 0 and increases by one for each value
+              returned and X is the Nth value that was returned by the remote
+              method.
+
+    :raises: nova.rpc.common.Timeout if a complete response is not received
+             before the timeout is reached.
+    """
+    return _get_impl().multicall(context, topic, msg, timeout)
+
+
+def notify(context, topic, msg):
+    """Send notification event.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param topic: The topic to send the notification to.
+    :param msg: This is a dict of content of event.
+
+    :returns: None
+    """
+    return _get_impl().notify(context, topic, msg)
+
+
+def cleanup():
+    """Clean up resoruces in use by implementation.
+
+    Clean up any resources that have been allocated by the RPC implementation.
+    This is typically open connections to a messaging service.  This function
+    would get called before an application using this API exits to allow
+    connections to get torn down cleanly.
+
+    :returns: None
+    """
+    return _get_impl().cleanup()
+
+
+def cast_to_server(context, server_params, topic, msg):
+    """Invoke a remote method that does not return anything.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param server_params: Connection information
+    :param topic: The topic to send the notification to.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+
+    :returns: None
+    """
+    return _get_impl().cast_to_server(context, server_params, topic, msg)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+    """Broadcast to a remote method invocation with no return.
+
+    :param context: Information that identifies the user that has made this
+                    request.
+    :param server_params: Connection information
+    :param topic: The topic to send the notification to.
+    :param msg: This is a dict in the form { "method" : "method_to_invoke",
+                                             "args" : dict_of_kwargs }
+
+    :returns: None
+    """
+    return _get_impl().fanout_cast_to_server(context, server_params, topic,
+            msg)
+
+
+_RPCIMPL = None
+
+
+def _get_impl():
+    """Delay import of rpc_backend until FLAGS are loaded."""
+    global _RPCIMPL
+    if _RPCIMPL is None:
+        _RPCIMPL = utils.import_object(FLAGS.rpc_backend)
+    return _RPCIMPL
diff --git a/heat/rpc/amqp.py b/heat/rpc/amqp.py
new file mode 100644 (file)
index 0000000..b4e548b
--- /dev/null
@@ -0,0 +1,384 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 - 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Shared code between AMQP based nova.rpc implementations.
+
+The code in this module is shared between the rpc implemenations based on AMQP.
+Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
+AMQP, but is deprecated and predates this code.
+"""
+
+import inspect
+import logging
+import sys
+import traceback
+import uuid
+
+from eventlet import greenpool
+from eventlet import pools
+
+from heat import context
+from heat.common import exception
+from heat.common import config
+from heat import local
+import heat.rpc.common as rpc_common
+
+LOG = logging.getLogger(__name__)
+FLAGS = config.FLAGS
+
+
+class Pool(pools.Pool):
+    """Class that implements a Pool of Connections."""
+    def __init__(self, *args, **kwargs):
+        self.connection_cls = kwargs.pop("connection_cls", None)
+        kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
+        kwargs.setdefault("order_as_stack", True)
+        super(Pool, self).__init__(*args, **kwargs)
+
+    # TODO(comstud): Timeout connections not used in a while
+    def create(self):
+        LOG.debug('Pool creating new connection')
+        return self.connection_cls()
+
+    def empty(self):
+        while self.free_items:
+            self.get().close()
+
+
+class ConnectionContext(rpc_common.Connection):
+    """The class that is actually returned to the caller of
+    create_connection().  This is a essentially a wrapper around
+    Connection that supports 'with' and can return a new Connection or
+    one from a pool.  It will also catch when an instance of this class
+    is to be deleted so that we can return Connections to the pool on
+    exceptions and so forth without making the caller be responsible for
+    catching all exceptions and making sure to return a connection to
+    the pool.
+    """
+
+    def __init__(self, connection_pool, pooled=True, server_params=None):
+        """Create a new connection, or get one from the pool"""
+        self.connection = None
+        self.connection_pool = connection_pool
+        if pooled:
+            self.connection = connection_pool.get()
+        else:
+            self.connection = connection_pool.connection_cls(
+                    server_params=server_params)
+        self.pooled = pooled
+
+    def __enter__(self):
+        """When with ConnectionContext() is used, return self"""
+        return self
+
+    def _done(self):
+        """If the connection came from a pool, clean it up and put it back.
+        If it did not come from a pool, close it.
+        """
+        if self.connection:
+            if self.pooled:
+                # Reset the connection so it's ready for the next caller
+                # to grab from the pool
+                self.connection.reset()
+                self.connection_pool.put(self.connection)
+            else:
+                try:
+                    self.connection.close()
+                except Exception:
+                    pass
+            self.connection = None
+
+    def __exit__(self, exc_type, exc_value, tb):
+        """End of 'with' statement.  We're done here."""
+        self._done()
+
+    def __del__(self):
+        """Caller is done with this connection.  Make sure we cleaned up."""
+        self._done()
+
+    def close(self):
+        """Caller is done with this connection."""
+        self._done()
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        self.connection.create_consumer(topic, proxy, fanout)
+
+    def consume_in_thread(self):
+        self.connection.consume_in_thread()
+
+    def __getattr__(self, key):
+        """Proxy all other calls to the Connection instance"""
+        if self.connection:
+            return getattr(self.connection, key)
+        else:
+            raise exception.InvalidRPCConnectionReuse()
+
+
+def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
+    """Sends a reply or an error on the channel signified by msg_id.
+
+    Failure should be a sys.exc_info() tuple.
+
+    """
+    with ConnectionContext(connection_pool) as conn:
+        if failure:
+            message = str(failure[1])
+            tb = traceback.format_exception(*failure)
+            LOG.error(_("Returning exception %s to caller"), message)
+            LOG.error(tb)
+            failure = (failure[0].__name__, str(failure[1]), tb)
+
+        try:
+            msg = {'result': reply, 'failure': failure}
+        except TypeError:
+            msg = {'result': dict((k, repr(v))
+                            for k, v in reply.__dict__.iteritems()),
+                    'failure': failure}
+        if ending:
+            msg['ending'] = True
+        conn.direct_send(msg_id, msg)
+
+
+class RpcContext(context.RequestContext):
+    """Context that supports replying to a rpc.call"""
+    def __init__(self, *args, **kwargs):
+        self.msg_id = kwargs.pop('msg_id', None)
+        super(RpcContext, self).__init__(*args, **kwargs)
+
+    def reply(self, reply=None, failure=None, ending=False,
+              connection_pool=None):
+        if self.msg_id:
+            msg_reply(self.msg_id, connection_pool, reply, failure,
+                      ending)
+            if ending:
+                self.msg_id = None
+
+
+def unpack_context(msg):
+    """Unpack context from msg."""
+    context_dict = {}
+    for key in list(msg.keys()):
+        # NOTE(vish): Some versions of python don't like unicode keys
+        #             in kwargs.
+        key = str(key)
+        if key.startswith('_context_'):
+            value = msg.pop(key)
+            context_dict[key[9:]] = value
+    context_dict['msg_id'] = msg.pop('_msg_id', None)
+    ctx = RpcContext.from_dict(context_dict)
+    LOG.debug(_('unpacked context: %s'), ctx.to_dict())
+    return ctx
+
+
+def pack_context(msg, context):
+    """Pack context into msg.
+
+    Values for message keys need to be less than 255 chars, so we pull
+    context out into a bunch of separate keys. If we want to support
+    more arguments in rabbit messages, we may want to do the same
+    for args at some point.
+
+    """
+    context_d = dict([('_context_%s' % key, value)
+                      for (key, value) in context.to_dict().iteritems()])
+    msg.update(context_d)
+
+
+class ProxyCallback(object):
+    """Calls methods on a proxy object based on method and args."""
+
+    def __init__(self, proxy, connection_pool):
+        self.proxy = proxy
+        self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
+        self.connection_pool = connection_pool
+
+    def __call__(self, message_data):
+        """Consumer callback to call a method on a proxy object.
+
+        Parses the message for validity and fires off a thread to call the
+        proxy object method.
+
+        Message data should be a dictionary with two keys:
+            method: string representing the method to call
+            args: dictionary of arg: value
+
+        Example: {'method': 'echo', 'args': {'value': 42}}
+
+        """
+        # It is important to clear the context here, because at this point
+        # the previous context is stored in local.store.context
+        if hasattr(local.store, 'context'):
+            del local.store.context
+        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+        ctxt = unpack_context(message_data)
+        method = message_data.get('method')
+        args = message_data.get('args', {})
+        if not method:
+            LOG.warn(_('no method for message: %s') % message_data)
+            ctxt.reply(_('No method for message: %s') % message_data,
+                       connection_pool=self.connection_pool)
+            return
+        self.pool.spawn_n(self._process_data, ctxt, method, args)
+
+    @exception.wrap_exception()
+    def _process_data(self, ctxt, method, args):
+        """Thread that magically looks for a method on the proxy
+        object and calls it.
+        """
+        ctxt.update_store()
+        try:
+            node_func = getattr(self.proxy, str(method))
+            node_args = dict((str(k), v) for k, v in args.iteritems())
+            # NOTE(vish): magic is fun!
+            rval = node_func(context=ctxt, **node_args)
+            # Check if the result was a generator
+            if inspect.isgenerator(rval):
+                for x in rval:
+                    ctxt.reply(x, None, connection_pool=self.connection_pool)
+            else:
+                ctxt.reply(rval, None, connection_pool=self.connection_pool)
+            # This final None tells multicall that it is done.
+            ctxt.reply(ending=True, connection_pool=self.connection_pool)
+        except Exception as e:
+            LOG.exception('Exception during message handling')
+            ctxt.reply(None, sys.exc_info(),
+                       connection_pool=self.connection_pool)
+        return
+
+
+class MulticallWaiter(object):
+    def __init__(self, connection, timeout):
+        self._connection = connection
+        self._iterator = connection.iterconsume(
+                                timeout=timeout or FLAGS.rpc_response_timeout)
+        self._result = None
+        self._done = False
+        self._got_ending = False
+
+    def done(self):
+        if self._done:
+            return
+        self._done = True
+        self._iterator.close()
+        self._iterator = None
+        self._connection.close()
+
+    def __call__(self, data):
+        """The consume() callback will call this.  Store the result."""
+        if data['failure']:
+            self._result = rpc_common.RemoteError(*data['failure'])
+        elif data.get('ending', False):
+            self._got_ending = True
+        else:
+            self._result = data['result']
+
+    def __iter__(self):
+        """Return a result until we get a 'None' response from consumer"""
+        if self._done:
+            raise StopIteration
+        while True:
+            self._iterator.next()
+            if self._got_ending:
+                self.done()
+                raise StopIteration
+            result = self._result
+            if isinstance(result, Exception):
+                self.done()
+                raise result
+            yield result
+
+
+def create_connection(new, connection_pool):
+    """Create a connection"""
+    return ConnectionContext(connection_pool, pooled=not new)
+
+
+def multicall(context, topic, msg, timeout, connection_pool):
+    """Make a call that returns multiple times."""
+    # Can't use 'with' for multicall, as it returns an iterator
+    # that will continue to use the connection.  When it's done,
+    # connection.close() will get called which will put it back into
+    # the pool
+    LOG.debug(_('Making asynchronous call on %s ...'), topic)
+    msg_id = uuid.uuid4().hex
+    msg.update({'_msg_id': msg_id})
+    LOG.debug(_('MSG_ID is %s') % (msg_id))
+    pack_context(msg, context)
+
+    conn = ConnectionContext(connection_pool)
+    wait_msg = MulticallWaiter(conn, timeout)
+    conn.declare_direct_consumer(msg_id, wait_msg)
+    conn.topic_send(topic, msg)
+    return wait_msg
+
+
+def call(context, topic, msg, timeout, connection_pool):
+    """Sends a message on a topic and wait for a response."""
+    rv = multicall(context, topic, msg, timeout, connection_pool)
+    # NOTE(vish): return the last result from the multicall
+    rv = list(rv)
+    if not rv:
+        return
+    return rv[-1]
+
+
+def cast(context, topic, msg, connection_pool):
+    """Sends a message on a topic without waiting for a response."""
+    LOG.debug(_('Making asynchronous cast on %s...'), topic)
+    pack_context(msg, context)
+    with ConnectionContext(connection_pool) as conn:
+        conn.topic_send(topic, msg)
+
+
+def fanout_cast(context, topic, msg, connection_pool):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    LOG.debug(_('Making asynchronous fanout cast...'))
+    pack_context(msg, context)
+    with ConnectionContext(connection_pool) as conn:
+        conn.fanout_send(topic, msg)
+
+
+def cast_to_server(context, server_params, topic, msg, connection_pool):
+    """Sends a message on a topic to a specific server."""
+    pack_context(msg, context)
+    with ConnectionContext(connection_pool, pooled=False,
+            server_params=server_params) as conn:
+        conn.topic_send(topic, msg)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg,
+        connection_pool):
+    """Sends a message on a fanout exchange to a specific server."""
+    pack_context(msg, context)
+    with ConnectionContext(connection_pool, pooled=False,
+            server_params=server_params) as conn:
+        conn.fanout_send(topic, msg)
+
+
+def notify(context, topic, msg, connection_pool):
+    """Sends a notification event on a topic."""
+    LOG.debug(_('Sending notification on %s...'), topic)
+    pack_context(msg, context)
+    with ConnectionContext(connection_pool) as conn:
+        conn.notify_send(topic, msg)
+
+
+def cleanup(connection_pool):
+    connection_pool.empty()
diff --git a/heat/rpc/common.py b/heat/rpc/common.py
new file mode 100644 (file)
index 0000000..a425a20
--- /dev/null
@@ -0,0 +1,144 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+import logging
+
+from heat import exception
+from heat.openstack.common import cfg
+from heat.common import config
+
+
+LOG = logging.getLogger(__name__)
+
+rpc_opts = [
+    cfg.IntOpt('rpc_thread_pool_size',
+               default=1024,
+               help='Size of RPC thread pool'),
+    cfg.IntOpt('rpc_conn_pool_size',
+               default=30,
+               help='Size of RPC connection pool'),
+    cfg.IntOpt('rpc_response_timeout',
+               default=60,
+               help='Seconds to wait for a response from call or multicall'),
+    ]
+
+config.FLAGS.register_opts(rpc_opts)
+
+
+class RemoteError(exception.NovaException):
+    """Signifies that a remote class has raised an exception.
+
+    Contains a string representation of the type of the original exception,
+    the value of the original exception, and the traceback.  These are
+    sent to the parent as a joined string so printing the exception
+    contains all of the relevant info.
+
+    """
+    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+
+    def __init__(self, exc_type=None, value=None, traceback=None):
+        self.exc_type = exc_type
+        self.value = value
+        self.traceback = traceback
+        super(RemoteError, self).__init__(exc_type=exc_type,
+                                          value=value,
+                                          traceback=traceback)
+
+
+class Timeout(exception.NovaException):
+    """Signifies that a timeout has occurred.
+
+    This exception is raised if the rpc_response_timeout is reached while
+    waiting for a response from the remote side.
+    """
+    message = _("Timeout while waiting on RPC response.")
+
+
+class Connection(object):
+    """A connection, returned by rpc.create_connection().
+
+    This class represents a connection to the message bus used for rpc.
+    An instance of this class should never be created by users of the rpc API.
+    Use rpc.create_connection() instead.
+    """
+    def close(self):
+        """Close the connection.
+
+        This method must be called when the connection will no longer be used.
+        It will ensure that any resources associated with the connection, such
+        as a network connection, and cleaned up.
+        """
+        raise NotImplementedError()
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer on this connection.
+
+        A consumer is associated with a message queue on the backend message
+        bus.  The consumer will read messages from the queue, unpack them, and
+        dispatch them to the proxy object.  The contents of the message pulled
+        off of the queue will determine which method gets called on the proxy
+        object.
+
+        :param topic: This is a name associated with what to consume from.
+                      Multiple instances of a service may consume from the same
+                      topic. For example, all instances of nova-compute consume
+                      from a queue called "compute".  In that case, the
+                      messages will get distributed amongst the consumers in a
+                      round-robin fashion if fanout=False.  If fanout=True,
+                      every consumer associated with this topic will get a
+                      copy of every message.
+        :param proxy: The object that will handle all incoming messages.
+        :param fanout: Whether or not this is a fanout topic.  See the
+                       documentation for the topic parameter for some
+                       additional comments on this.
+        """
+        raise NotImplementedError()
+
+    def consume_in_thread(self):
+        """Spawn a thread to handle incoming messages.
+
+        Spawn a thread that will be responsible for handling all incoming
+        messages for consumers that were set up on this connection.
+
+        Message dispatching inside of this is expected to be implemented in a
+        non-blocking manner.  An example implementation would be having this
+        thread pull messages in for all of the consumers, but utilize a thread
+        pool for dispatching the messages to the proxy objects.
+        """
+        raise NotImplementedError()
+
+
+def _safe_log(log_func, msg, msg_data):
+    """Sanitizes the msg_data field before logging."""
+    SANITIZE = {
+                'set_admin_password': ('new_pass',),
+                'run_instance': ('admin_password',),
+               }
+    method = msg_data['method']
+    if method in SANITIZE:
+        msg_data = copy.deepcopy(msg_data)
+        args_to_sanitize = SANITIZE[method]
+        for arg in args_to_sanitize:
+            try:
+                msg_data['args'][arg] = "<SANITIZED>"
+            except KeyError:
+                pass
+
+    return log_func(msg, msg_data)
diff --git a/heat/rpc/impl_fake.py b/heat/rpc/impl_fake.py
new file mode 100644 (file)
index 0000000..a6c2aba
--- /dev/null
@@ -0,0 +1,188 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack LLC
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""Fake RPC implementation which calls proxy methods directly with no
+queues.  Casts will block, but this is very useful for tests.
+"""
+
+import inspect
+import json
+import signal
+import sys
+import time
+import traceback
+
+import eventlet
+
+from heat import context
+from heat.common import config
+from heat.rpc import common as rpc_common
+
+CONSUMERS = {}
+
+FLAGS = config.FLAGS
+
+
+class RpcContext(context.RequestContext):
+    def __init__(self, *args, **kwargs):
+        super(RpcContext, self).__init__(*args, **kwargs)
+        self._response = []
+        self._done = False
+
+    def reply(self, reply=None, failure=None, ending=False):
+        if ending:
+            self._done = True
+        if not self._done:
+            self._response.append((reply, failure))
+
+
+class Consumer(object):
+    def __init__(self, topic, proxy):
+        self.topic = topic
+        self.proxy = proxy
+
+    def call(self, context, method, args, timeout):
+        node_func = getattr(self.proxy, method)
+        node_args = dict((str(k), v) for k, v in args.iteritems())
+        done = eventlet.event.Event()
+
+        def _inner():
+            ctxt = RpcContext.from_dict(context.to_dict())
+            try:
+                rval = node_func(context=ctxt, **node_args)
+                res = []
+                # Caller might have called ctxt.reply() manually
+                for (reply, failure) in ctxt._response:
+                    if failure:
+                        raise failure[0], failure[1], failure[2]
+                    res.append(reply)
+                # if ending not 'sent'...we might have more data to
+                # return from the function itself
+                if not ctxt._done:
+                    if inspect.isgenerator(rval):
+                        for val in rval:
+                            res.append(val)
+                    else:
+                        res.append(rval)
+                done.send(res)
+            except Exception:
+                exc_info = sys.exc_info()
+                done.send_exception(
+                        rpc_common.RemoteError(exc_info[0].__name__,
+                            str(exc_info[1]),
+                            ''.join(traceback.format_exception(*exc_info))))
+
+        thread = eventlet.greenthread.spawn(_inner)
+
+        if timeout:
+            start_time = time.time()
+            while not done.ready():
+                eventlet.greenthread.sleep(1)
+                cur_time = time.time()
+                if (cur_time - start_time) > timeout:
+                    thread.kill()
+                    raise rpc_common.Timeout()
+
+        return done.wait()
+
+
+class Connection(object):
+    """Connection object."""
+
+    def __init__(self):
+        self.consumers = []
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        consumer = Consumer(topic, proxy)
+        self.consumers.append(consumer)
+        if topic not in CONSUMERS:
+            CONSUMERS[topic] = []
+        CONSUMERS[topic].append(consumer)
+
+    def close(self):
+        for consumer in self.consumers:
+            CONSUMERS[consumer.topic].remove(consumer)
+        self.consumers = []
+
+    def consume_in_thread(self):
+        pass
+
+
+def create_connection(new=True):
+    """Create a connection"""
+    return Connection()
+
+
+def check_serialize(msg):
+    """Make sure a message intended for rpc can be serialized."""
+    json.dumps(msg)
+
+
+def multicall(context, topic, msg, timeout=None):
+    """Make a call that returns multiple times."""
+
+    check_serialize(msg)
+
+    method = msg.get('method')
+    if not method:
+        return
+    args = msg.get('args', {})
+
+    try:
+        consumer = CONSUMERS[topic][0]
+    except (KeyError, IndexError):
+        return iter([None])
+    else:
+        return consumer.call(context, method, args, timeout)
+
+
+def call(context, topic, msg, timeout=None):
+    """Sends a message on a topic and wait for a response."""
+    rv = multicall(context, topic, msg, timeout)
+    # NOTE(vish): return the last result from the multicall
+    rv = list(rv)
+    if not rv:
+        return
+    return rv[-1]
+
+
+def cast(context, topic, msg):
+    try:
+        call(context, topic, msg)
+    except rpc_common.RemoteError:
+        pass
+
+
+def notify(context, topic, msg):
+    check_serialize(msg)
+
+
+def cleanup():
+    pass
+
+
+def fanout_cast(context, topic, msg):
+    """Cast to all consumers of a topic"""
+    check_serialize(msg)
+    method = msg.get('method')
+    if not method:
+        return
+    args = msg.get('args', {})
+
+    for consumer in CONSUMERS.get(topic, []):
+        try:
+            consumer.call(context, method, args, None)
+        except rpc_common.RemoteError:
+            pass
diff --git a/heat/rpc/impl_kombu.py b/heat/rpc/impl_kombu.py
new file mode 100644 (file)
index 0000000..7199273
--- /dev/null
@@ -0,0 +1,705 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack LLC
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import itertools
+import socket
+import ssl
+import sys
+import time
+import uuid
+
+import eventlet
+import greenlet
+import kombu
+import kombu.entity
+import kombu.messaging
+import kombu.connection
+
+from heat.common import config
+from heat.openstack.common import cfg
+from heat.rpc import amqp as rpc_amqp
+from heat.rpc import common as rpc_common
+
+kombu_opts = [
+    cfg.StrOpt('kombu_ssl_version',
+               default='',
+               help='SSL version to use (valid only if SSL enabled)'),
+    cfg.StrOpt('kombu_ssl_keyfile',
+               default='',
+               help='SSL key file (valid only if SSL enabled)'),
+    cfg.StrOpt('kombu_ssl_certfile',
+               default='',
+               help='SSL cert file (valid only if SSL enabled)'),
+    cfg.StrOpt('kombu_ssl_ca_certs',
+               default='',
+               help=('SSL certification authority file '
+                    '(valid only if SSL enabled)')),
+    ]
+
+FLAGS = config.FLAGS
+FLAGS.register_opts(kombu_opts)
+LOG = rpc_common.LOG
+
+
+class ConsumerBase(object):
+    """Consumer base class."""
+
+    def __init__(self, channel, callback, tag, **kwargs):
+        """Declare a queue on an amqp channel.
+
+        'channel' is the amqp channel to use
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        queue name, exchange name, and other kombu options are
+        passed in here as a dictionary.
+        """
+        self.callback = callback
+        self.tag = str(tag)
+        self.kwargs = kwargs
+        self.queue = None
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-declare the queue after a rabbit reconnect"""
+        self.channel = channel
+        self.kwargs['channel'] = channel
+        self.queue = kombu.entity.Queue(**self.kwargs)
+        self.queue.declare()
+
+    def consume(self, *args, **kwargs):
+        """Actually declare the consumer on the amqp channel.  This will
+        start the flow of messages from the queue.  Using the
+        Connection.iterconsume() iterator will process the messages,
+        calling the appropriate callback.
+
+        If a callback is specified in kwargs, use that.  Otherwise,
+        use the callback passed during __init__()
+
+        If kwargs['nowait'] is True, then this call will block until
+        a message is read.
+
+        Messages will automatically be acked if the callback doesn't
+        raise an exception
+        """
+
+        options = {'consumer_tag': self.tag}
+        options['nowait'] = kwargs.get('nowait', False)
+        callback = kwargs.get('callback', self.callback)
+        if not callback:
+            raise ValueError("No callback defined")
+
+        def _callback(raw_message):
+            message = self.channel.message_to_python(raw_message)
+            try:
+                callback(message.payload)
+                message.ack()
+            except Exception:
+                LOG.exception(_("Failed to process message... skipping it."))
+
+        self.queue.consume(*args, callback=_callback, **options)
+
+    def cancel(self):
+        """Cancel the consuming from the queue, if it has started"""
+        try:
+            self.queue.cancel(self.tag)
+        except KeyError, e:
+            # NOTE(comstud): Kludge to get around a amqplib bug
+            if str(e) != "u'%s'" % self.tag:
+                raise
+        self.queue = None
+
+
+class DirectConsumer(ConsumerBase):
+    """Queue/consumer class for 'direct'"""
+
+    def __init__(self, channel, msg_id, callback, tag, **kwargs):
+        """Init a 'direct' queue.
+
+        'channel' is the amqp channel to use
+        'msg_id' is the msg_id to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        # Default options
+        options = {'durable': False,
+                'auto_delete': True,
+                'exclusive': True}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(
+                name=msg_id,
+                type='direct',
+                durable=options['durable'],
+                auto_delete=options['auto_delete'])
+        super(DirectConsumer, self).__init__(
+                channel,
+                callback,
+                tag,
+                name=msg_id,
+                exchange=exchange,
+                routing_key=msg_id,
+                **options)
+
+
+class TopicConsumer(ConsumerBase):
+    """Consumer class for 'topic'"""
+
+    def __init__(self, channel, topic, callback, tag, **kwargs):
+        """Init a 'topic' queue.
+
+        'channel' is the amqp channel to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        # Default options
+        options = {'durable': FLAGS.rabbit_durable_queues,
+                'auto_delete': False,
+                'exclusive': False}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(
+                name=FLAGS.control_exchange,
+                type='topic',
+                durable=options['durable'],
+                auto_delete=options['auto_delete'])
+        super(TopicConsumer, self).__init__(
+                channel,
+                callback,
+                tag,
+                name=topic,
+                exchange=exchange,
+                routing_key=topic,
+                **options)
+
+
+class FanoutConsumer(ConsumerBase):
+    """Consumer class for 'fanout'"""
+
+    def __init__(self, channel, topic, callback, tag, **kwargs):
+        """Init a 'fanout' queue.
+
+        'channel' is the amqp channel to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        unique = uuid.uuid4().hex
+        exchange_name = '%s_fanout' % topic
+        queue_name = '%s_fanout_%s' % (topic, unique)
+
+        # Default options
+        options = {'durable': False,
+                'auto_delete': True,
+                'exclusive': True}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(
+                name=exchange_name,
+                type='fanout',
+                durable=options['durable'],
+                auto_delete=options['auto_delete'])
+        super(FanoutConsumer, self).__init__(
+                channel,
+                callback,
+                tag,
+                name=queue_name,
+                exchange=exchange,
+                routing_key=topic,
+                **options)
+
+
+class Publisher(object):
+    """Base Publisher class"""
+
+    def __init__(self, channel, exchange_name, routing_key, **kwargs):
+        """Init the Publisher class with the exchange_name, routing_key,
+        and other options
+        """
+        self.exchange_name = exchange_name
+        self.routing_key = routing_key
+        self.kwargs = kwargs
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-establish the Producer after a rabbit reconnection"""
+        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
+                **self.kwargs)
+        self.producer = kombu.messaging.Producer(exchange=self.exchange,
+                channel=channel, routing_key=self.routing_key)
+
+    def send(self, msg):
+        """Send a message"""
+        self.producer.publish(msg)
+
+
+class DirectPublisher(Publisher):
+    """Publisher class for 'direct'"""
+    def __init__(self, channel, msg_id, **kwargs):
+        """init a 'direct' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+
+        options = {'durable': False,
+                'auto_delete': True,
+                'exclusive': True}
+        options.update(kwargs)
+        super(DirectPublisher, self).__init__(channel,
+                msg_id,
+                msg_id,
+                type='direct',
+                **options)
+
+
+class TopicPublisher(Publisher):
+    """Publisher class for 'topic'"""
+    def __init__(self, channel, topic, **kwargs):
+        """init a 'topic' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': FLAGS.rabbit_durable_queues,
+                'auto_delete': False,
+                'exclusive': False}
+        options.update(kwargs)
+        super(TopicPublisher, self).__init__(channel,
+                FLAGS.control_exchange,
+                topic,
+                type='topic',
+                **options)
+
+
+class FanoutPublisher(Publisher):
+    """Publisher class for 'fanout'"""
+    def __init__(self, channel, topic, **kwargs):
+        """init a 'fanout' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': False,
+                'auto_delete': True,
+                'exclusive': True}
+        options.update(kwargs)
+        super(FanoutPublisher, self).__init__(channel,
+                '%s_fanout' % topic,
+                None,
+                type='fanout',
+                **options)
+
+
+class NotifyPublisher(TopicPublisher):
+    """Publisher class for 'notify'"""
+
+    def __init__(self, *args, **kwargs):
+        self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
+        super(NotifyPublisher, self).__init__(*args, **kwargs)
+
+    def reconnect(self, channel):
+        super(NotifyPublisher, self).reconnect(channel)
+
+        # NOTE(jerdfelt): Normally the consumer would create the queue, but
+        # we do this to ensure that messages don't get dropped if the
+        # consumer is started after we do
+        queue = kombu.entity.Queue(channel=channel,
+                exchange=self.exchange,
+                durable=self.durable,
+                name=self.routing_key,
+                routing_key=self.routing_key)
+        queue.declare()
+
+
+class Connection(object):
+    """Connection object."""
+
+    def __init__(self, server_params=None):
+        self.consumers = []
+        self.consumer_thread = None
+        self.max_retries = FLAGS.rabbit_max_retries
+        # Try forever?
+        if self.max_retries <= 0:
+            self.max_retries = None
+        self.interval_start = FLAGS.rabbit_retry_interval
+        self.interval_stepping = FLAGS.rabbit_retry_backoff
+        # max retry-interval = 30 seconds
+        self.interval_max = 30
+        self.memory_transport = False
+
+        if server_params is None:
+            server_params = {}
+
+        # Keys to translate from server_params to kombu params
+        server_params_to_kombu_params = {'username': 'userid'}
+
+        params = {}
+        for sp_key, value in server_params.iteritems():
+            p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+            params[p_key] = value
+
+        params.setdefault('hostname', FLAGS.rabbit_host)
+        params.setdefault('port', FLAGS.rabbit_port)
+        params.setdefault('userid', FLAGS.rabbit_userid)
+        params.setdefault('password', FLAGS.rabbit_password)
+        params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
+
+        self.params = params
+
+        if FLAGS.fake_rabbit:
+            self.params['transport'] = 'memory'
+            self.memory_transport = True
+        else:
+            self.memory_transport = False
+
+        if FLAGS.rabbit_use_ssl:
+            self.params['ssl'] = self._fetch_ssl_params()
+
+        self.connection = None
+        self.reconnect()
+
+    def _fetch_ssl_params(self):
+        """Handles fetching what ssl params
+        should be used for the connection (if any)"""
+        ssl_params = dict()
+
+        # http://docs.python.org/library/ssl.html - ssl.wrap_socket
+        if FLAGS.kombu_ssl_version:
+            ssl_params['ssl_version'] = FLAGS.kombu_ssl_version
+        if FLAGS.kombu_ssl_keyfile:
+            ssl_params['keyfile'] = FLAGS.kombu_ssl_keyfile
+        if FLAGS.kombu_ssl_certfile:
+            ssl_params['certfile'] = FLAGS.kombu_ssl_certfile
+        if FLAGS.kombu_ssl_ca_certs:
+            ssl_params['ca_certs'] = FLAGS.kombu_ssl_ca_certs
+            # We might want to allow variations in the
+            # future with this?
+            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
+
+        if not ssl_params:
+            # Just have the default behavior
+            return True
+        else:
+            # Return the extended behavior
+            return ssl_params
+
+    def _connect(self):
+        """Connect to rabbit.  Re-establish any queues that may have
+        been declared before if we are reconnecting.  Exceptions should
+        be handled by the caller.
+        """
+        if self.connection:
+            LOG.info(_("Reconnecting to AMQP server on "
+                    "%(hostname)s:%(port)d") % self.params)
+            try:
+                self.connection.close()
+            except self.connection_errors:
+                pass
+            # Setting this in case the next statement fails, though
+            # it shouldn't be doing any network operations, yet.
+            self.connection = None
+        self.connection = kombu.connection.BrokerConnection(
+                **self.params)
+        self.connection_errors = self.connection.connection_errors
+        if self.memory_transport:
+            # Kludge to speed up tests.
+            self.connection.transport.polling_interval = 0.0
+        self.consumer_num = itertools.count(1)
+        self.connection.connect()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        for consumer in self.consumers:
+            consumer.reconnect(self.channel)
+        LOG.info(_('Connected to AMQP server on '
+                '%(hostname)s:%(port)d') % self.params)
+
+    def reconnect(self):
+        """Handles reconnecting and re-establishing queues.
+        Will retry up to self.max_retries number of times.
+        self.max_retries = 0 means to retry forever.
+        Sleep between tries, starting at self.interval_start
+        seconds, backing off self.interval_stepping number of seconds
+        each attempt.
+        """
+
+        attempt = 0
+        while True:
+            attempt += 1
+            try:
+                self._connect()
+                return
+            except self.connection_errors, e:
+                pass
+            except Exception, e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+
+            log_info = {}
+            log_info['err_str'] = str(e)
+            log_info['max_retries'] = self.max_retries
+            log_info.update(self.params)
+
+            if self.max_retries and attempt == self.max_retries:
+                LOG.exception(_('Unable to connect to AMQP server on '
+                        '%(hostname)s:%(port)d after %(max_retries)d '
+                        'tries: %(err_str)s') % log_info)
+                # NOTE(comstud): Copied from original code.  There's
+                # really no better recourse because if this was a queue we
+                # need to consume on, we have no way to consume anymore.
+                sys.exit(1)
+
+            if attempt == 1:
+                sleep_time = self.interval_start or 1
+            elif attempt > 1:
+                sleep_time += self.interval_stepping
+            if self.interval_max:
+                sleep_time = min(sleep_time, self.interval_max)
+
+            log_info['sleep_time'] = sleep_time
+            LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
+                    ' unreachable: %(err_str)s. Trying again in '
+                    '%(sleep_time)d seconds.') % log_info)
+            time.sleep(sleep_time)
+
+    def ensure(self, error_callback, method, *args, **kwargs):
+        while True:
+            try:
+                return method(*args, **kwargs)
+            except (self.connection_errors, socket.timeout), e:
+                pass
+            except Exception, e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+            if error_callback:
+                error_callback(e)
+            self.reconnect()
+
+    def get_channel(self):
+        """Convenience call for bin/clear_rabbit_queues"""
+        return self.channel
+
+    def close(self):
+        """Close/release this connection"""
+        self.cancel_consumer_thread()
+        self.connection.release()
+        self.connection = None
+
+    def reset(self):
+        """Reset a connection so it can be used again"""
+        self.cancel_consumer_thread()
+        self.channel.close()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        self.consumers = []
+
+    def declare_consumer(self, consumer_cls, topic, callback):
+        """Create a Consumer using the class that was passed in and
+        add it to our list of consumers
+        """
+
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+                "%(err_str)s") % log_info)
+
+        def _declare_consumer():
+            consumer = consumer_cls(self.channel, topic, callback,
+                    self.consumer_num.next())
+            self.consumers.append(consumer)
+            return consumer
+
+        return self.ensure(_connect_error, _declare_consumer)
+
+    def iterconsume(self, limit=None, timeout=None):
+        """Return an iterator that will consume from all queues/consumers"""
+
+        info = {'do_consume': True}
+
+        def _error_callback(exc):
+            if isinstance(exc, socket.timeout):
+                LOG.exception(_('Timed out waiting for RPC response: %s') %
+                        str(exc))
+                raise rpc_common.Timeout()
+            else:
+                LOG.exception(_('Failed to consume message from queue: %s') %
+                        str(exc))
+                info['do_consume'] = True
+
+        def _consume():
+            if info['do_consume']:
+                queues_head = self.consumers[:-1]
+                queues_tail = self.consumers[-1]
+                for queue in queues_head:
+                    queue.consume(nowait=True)
+                queues_tail.consume(nowait=False)
+                info['do_consume'] = False
+            return self.connection.drain_events(timeout=timeout)
+
+        for iteration in itertools.count(0):
+            if limit and iteration >= limit:
+                raise StopIteration
+            yield self.ensure(_error_callback, _consume)
+
+    def cancel_consumer_thread(self):
+        """Cancel a consumer thread"""
+        if self.consumer_thread is not None:
+            self.consumer_thread.kill()
+            try:
+                self.consumer_thread.wait()
+            except greenlet.GreenletExit:
+                pass
+            self.consumer_thread = None
+
+    def publisher_send(self, cls, topic, msg, **kwargs):
+        """Send to a publisher based on the publisher class"""
+
+        def _error_callback(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.exception(_("Failed to publish message to topic "
+                "'%(topic)s': %(err_str)s") % log_info)
+
+        def _publish():
+            publisher = cls(self.channel, topic, **kwargs)
+            publisher.send(msg)
+
+        self.ensure(_error_callback, _publish)
+
+    def declare_direct_consumer(self, topic, callback):
+        """Create a 'direct' queue.
+        In nova's use, this is generally a msg_id queue used for
+        responses for call/multicall
+        """
+        self.declare_consumer(DirectConsumer, topic, callback)
+
+    def declare_topic_consumer(self, topic, callback=None):
+        """Create a 'topic' consumer."""
+        self.declare_consumer(TopicConsumer, topic, callback)
+
+    def declare_fanout_consumer(self, topic, callback):
+        """Create a 'fanout' consumer"""
+        self.declare_consumer(FanoutConsumer, topic, callback)
+
+    def direct_send(self, msg_id, msg):
+        """Send a 'direct' message"""
+        self.publisher_send(DirectPublisher, msg_id, msg)
+
+    def topic_send(self, topic, msg):
+        """Send a 'topic' message"""
+        self.publisher_send(TopicPublisher, topic, msg)
+
+    def fanout_send(self, topic, msg):
+        """Send a 'fanout' message"""
+        self.publisher_send(FanoutPublisher, topic, msg)
+
+    def notify_send(self, topic, msg, **kwargs):
+        """Send a notify message on a topic"""
+        self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+
+    def consume(self, limit=None):
+        """Consume from all queues/consumers"""
+        it = self.iterconsume(limit=limit)
+        while True:
+            try:
+                it.next()
+            except StopIteration:
+                return
+
+    def consume_in_thread(self):
+        """Consumer from all queues/consumers in a greenthread"""
+        def _consumer_thread():
+            try:
+                self.consume()
+            except greenlet.GreenletExit:
+                return
+        if self.consumer_thread is None:
+            self.consumer_thread = eventlet.spawn(_consumer_thread)
+        return self.consumer_thread
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer that calls a method in a proxy object"""
+        if fanout:
+            self.declare_fanout_consumer(topic,
+                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+        else:
+            self.declare_topic_consumer(topic,
+                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+
+
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+
+
+def create_connection(new=True):
+    """Create a connection"""
+    return rpc_amqp.create_connection(new, Connection.pool)
+
+
+def multicall(context, topic, msg, timeout=None):
+    """Make a call that returns multiple times."""
+    return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+
+
+def call(context, topic, msg, timeout=None):
+    """Sends a message on a topic and wait for a response."""
+    return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+
+
+def cast(context, topic, msg):
+    """Sends a message on a topic without waiting for a response."""
+    return rpc_amqp.cast(context, topic, msg, Connection.pool)
+
+
+def fanout_cast(context, topic, msg):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+
+
+def cast_to_server(context, server_params, topic, msg):
+    """Sends a message on a topic to a specific server."""
+    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+            Connection.pool)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+    """Sends a message on a fanout exchange to a specific server."""
+    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+            Connection.pool)
+
+
+def notify(context, topic, msg):
+    """Sends a notification event on a topic."""
+    return rpc_amqp.notify(context, topic, msg, Connection.pool)
+
+
+def cleanup():
+    return rpc_amqp.cleanup(Connection.pool)
diff --git a/heat/rpc/impl_qpid.py b/heat/rpc/impl_qpid.py
new file mode 100644 (file)
index 0000000..95b9ee5
--- /dev/null
@@ -0,0 +1,552 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack LLC
+#    Copyright 2011 - 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import itertools
+import time
+import uuid
+import json
+import logging
+
+import eventlet
+import greenlet
+import qpid.messaging
+import qpid.messaging.exceptions
+
+from heat.common import config
+from heat.openstack.common import cfg
+from heat.rpc import amqp as rpc_amqp
+from heat.rpc import common as rpc_common
+
+LOG = logging.getLogger(__name__)
+
+qpid_opts = [
+    cfg.StrOpt('qpid_hostname',
+               default='localhost',
+               help='Qpid broker hostname'),
+    cfg.StrOpt('qpid_port',
+               default='5672',
+               help='Qpid broker port'),
+    cfg.StrOpt('qpid_username',
+               default='',
+               help='Username for qpid connection'),
+    cfg.StrOpt('qpid_password',
+               default='',
+               help='Password for qpid connection'),
+    cfg.StrOpt('qpid_sasl_mechanisms',
+               default='',
+               help='Space separated list of SASL mechanisms to use for auth'),
+    cfg.BoolOpt('qpid_reconnect',
+                default=True,
+                help='Automatically reconnect'),
+    cfg.IntOpt('qpid_reconnect_timeout',
+               default=0,
+               help='Reconnection timeout in seconds'),
+    cfg.IntOpt('qpid_reconnect_limit',
+               default=0,
+               help='Max reconnections before giving up'),
+    cfg.IntOpt('qpid_reconnect_interval_min',
+               default=0,
+               help='Minimum seconds between reconnection attempts'),
+    cfg.IntOpt('qpid_reconnect_interval_max',
+               default=0,
+               help='Maximum seconds between reconnection attempts'),
+    cfg.IntOpt('qpid_reconnect_interval',
+               default=0,
+               help='Equivalent to setting max and min to the same value'),
+    cfg.IntOpt('qpid_heartbeat',
+               default=5,
+               help='Seconds between connection keepalive heartbeats'),
+    cfg.StrOpt('qpid_protocol',
+               default='tcp',
+               help="Transport to use, either 'tcp' or 'ssl'"),
+    cfg.BoolOpt('qpid_tcp_nodelay',
+                default=True,
+                help='Disable Nagle algorithm'),
+    ]
+
+FLAGS = config.FLAGS
+FLAGS.register_opts(qpid_opts)
+
+
+class ConsumerBase(object):
+    """Consumer base class."""
+
+    def __init__(self, session, callback, node_name, node_opts,
+                 link_name, link_opts):
+        """Declare a queue on an amqp session.
+
+        'session' is the amqp session to use
+        'callback' is the callback to call when messages are received
+        'node_name' is the first part of the Qpid address string, before ';'
+        'node_opts' will be applied to the "x-declare" section of "node"
+                    in the address string.
+        'link_name' goes into the "name" field of the "link" in the address
+                    string
+        'link_opts' will be applied to the "x-declare" section of "link"
+                    in the address string.
+        """
+        self.callback = callback
+        self.receiver = None
+        self.session = None
+
+        addr_opts = {
+            "create": "always",
+            "node": {
+                "type": "topic",
+                "x-declare": {
+                    "durable": True,
+                    "auto-delete": True,
+                },
+            },
+            "link": {
+                "name": link_name,
+                "durable": True,
+                "x-declare": {
+                    "durable": False,
+                    "auto-delete": True,
+                    "exclusive": False,
+                },
+            },
+        }
+        addr_opts["node"]["x-declare"].update(node_opts)
+        addr_opts["link"]["x-declare"].update(link_opts)
+
+        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+
+        self.reconnect(session)
+
+    def reconnect(self, session):
+        """Re-declare the receiver after a qpid reconnect"""
+        self.session = session
+        self.receiver = session.receiver(self.address)
+        self.receiver.capacity = 1
+
+    def consume(self):
+        """Fetch the message and pass it to the callback object"""
+        message = self.receiver.fetch()
+        self.callback(message.content)
+
+    def get_receiver(self):
+        return self.receiver
+
+
+class DirectConsumer(ConsumerBase):
+    """Queue/consumer class for 'direct'"""
+
+    def __init__(self, session, msg_id, callback):
+        """Init a 'direct' queue.
+
+        'session' is the amqp session to use
+        'msg_id' is the msg_id to listen on
+        'callback' is the callback to call when messages are received
+        """
+
+        super(DirectConsumer, self).__init__(session, callback,
+                        "%s/%s" % (msg_id, msg_id),
+                        {"type": "direct"},
+                        msg_id,
+                        {"exclusive": True})
+
+
+class TopicConsumer(ConsumerBase):
+    """Consumer class for 'topic'"""
+
+    def __init__(self, session, topic, callback):
+        """Init a 'topic' queue.
+
+        'session' is the amqp session to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        """
+
+        super(TopicConsumer, self).__init__(session, callback,
+                        "%s/%s" % (FLAGS.control_exchange, topic), {},
+                        topic, {})
+
+
+class FanoutConsumer(ConsumerBase):
+    """Consumer class for 'fanout'"""
+
+    def __init__(self, session, topic, callback):
+        """Init a 'fanout' queue.
+
+        'session' is the amqp session to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        """
+
+        super(FanoutConsumer, self).__init__(session, callback,
+                        "%s_fanout" % topic,
+                        {"durable": False, "type": "fanout"},
+                        "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+                        {"exclusive": True})
+
+
+class Publisher(object):
+    """Base Publisher class"""
+
+    def __init__(self, session, node_name, node_opts=None):
+        """Init the Publisher class with the exchange_name, routing_key,
+        and other options
+        """
+        self.sender = None
+        self.session = session
+
+        addr_opts = {
+            "create": "always",
+            "node": {
+                "type": "topic",
+                "x-declare": {
+                    "durable": False,
+                    # auto-delete isn't implemented for exchanges in qpid,
+                    # but put in here anyway
+                    "auto-delete": True,
+                },
+            },
+        }
+        if node_opts:
+            addr_opts["node"]["x-declare"].update(node_opts)
+
+        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
+
+        self.reconnect(session)
+
+    def reconnect(self, session):
+        """Re-establish the Sender after a reconnection"""
+        self.sender = session.sender(self.address)
+
+    def send(self, msg):
+        """Send a message"""
+        self.sender.send(msg)
+
+
+class DirectPublisher(Publisher):
+    """Publisher class for 'direct'"""
+    def __init__(self, session, msg_id):
+        """Init a 'direct' publisher."""
+        super(DirectPublisher, self).__init__(session, msg_id,
+                                              {"type": "Direct"})
+
+
+class TopicPublisher(Publisher):
+    """Publisher class for 'topic'"""
+    def __init__(self, session, topic):
+        """init a 'topic' publisher.
+        """
+        super(TopicPublisher, self).__init__(session,
+                                "%s/%s" % (FLAGS.control_exchange, topic))
+
+
+class FanoutPublisher(Publisher):
+    """Publisher class for 'fanout'"""
+    def __init__(self, session, topic):
+        """init a 'fanout' publisher.
+        """
+        super(FanoutPublisher, self).__init__(session,
+                                "%s_fanout" % topic, {"type": "fanout"})
+
+
+class NotifyPublisher(Publisher):
+    """Publisher class for notifications"""
+    def __init__(self, session, topic):
+        """init a 'topic' publisher.
+        """
+        super(NotifyPublisher, self).__init__(session,
+                                "%s/%s" % (FLAGS.control_exchange, topic),
+                                {"durable": True})
+
+
+class Connection(object):
+    """Connection object."""
+
+    def __init__(self, server_params=None):
+        self.session = None
+        self.consumers = {}
+        self.consumer_thread = None
+
+        if server_params is None:
+            server_params = {}
+
+        default_params = dict(hostname=FLAGS.qpid_hostname,
+                port=FLAGS.qpid_port,
+                username=FLAGS.qpid_username,
+                password=FLAGS.qpid_password)
+
+        params = server_params
+        for key in default_params.keys():
+            params.setdefault(key, default_params[key])
+
+        self.broker = params['hostname'] + ":" + str(params['port'])
+        # Create the connection - this does not open the connection
+        self.connection = qpid.messaging.Connection(self.broker)
+
+        # Check if flags are set and if so set them for the connection
+        # before we call open
+        self.connection.username = params['username']
+        self.connection.password = params['password']
+        self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
+        self.connection.reconnect = FLAGS.qpid_reconnect
+        if FLAGS.qpid_reconnect_timeout:
+            self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
+        if FLAGS.qpid_reconnect_limit:
+            self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
+        if FLAGS.qpid_reconnect_interval_max:
+            self.connection.reconnect_interval_max = (
+                    FLAGS.qpid_reconnect_interval_max)
+        if FLAGS.qpid_reconnect_interval_min:
+            self.connection.reconnect_interval_min = (
+                    FLAGS.qpid_reconnect_interval_min)
+        if FLAGS.qpid_reconnect_interval:
+            self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
+        self.connection.hearbeat = FLAGS.qpid_heartbeat
+        self.connection.protocol = FLAGS.qpid_protocol
+        self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
+
+        # Open is part of reconnect -
+        # NOTE(WGH) not sure we need this with the reconnect flags
+        self.reconnect()
+
+    def _register_consumer(self, consumer):
+        self.consumers[str(consumer.get_receiver())] = consumer
+
+    def _lookup_consumer(self, receiver):
+        return self.consumers[str(receiver)]
+
+    def reconnect(self):
+        """Handles reconnecting and re-establishing sessions and queues"""
+        if self.connection.opened():
+            try:
+                self.connection.close()
+            except qpid.messaging.exceptions.ConnectionError:
+                pass
+
+        while True:
+            try:
+                self.connection.open()
+            except qpid.messaging.exceptions.ConnectionError, e:
+                LOG.error(_('Unable to connect to AMQP server: %s ') % e)
+                time.sleep(FLAGS.qpid_reconnect_interval or 1)
+            else:
+                break
+
+        LOG.info(_('Connected to AMQP server on %s') % self.broker)
+
+        self.session = self.connection.session()
+
+        for consumer in self.consumers.itervalues():
+            consumer.reconnect(self.session)
+
+        if self.consumers:
+            LOG.debug(_("Re-established AMQP queues"))
+
+    def ensure(self, error_callback, method, *args, **kwargs):
+        while True:
+            try:
+                return method(*args, **kwargs)
+            except (qpid.messaging.exceptions.Empty,
+                    qpid.messaging.exceptions.ConnectionError), e:
+                if error_callback:
+                    error_callback(e)
+                self.reconnect()
+
+    def close(self):
+        """Close/release this connection"""
+        self.cancel_consumer_thread()
+        self.connection.close()
+        self.connection = None
+
+    def reset(self):
+        """Reset a connection so it can be used again"""
+        self.cancel_consumer_thread()
+        self.session.close()
+        self.session = self.connection.session()
+        self.consumers = {}
+
+    def declare_consumer(self, consumer_cls, topic, callback):
+        """Create a Consumer using the class that was passed in and
+        add it to our list of consumers
+        """
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+                "%(err_str)s") % log_info)
+
+        def _declare_consumer():
+            consumer = consumer_cls(self.session, topic, callback)
+            self._register_consumer(consumer)
+            return consumer
+
+        return self.ensure(_connect_error, _declare_consumer)
+
+    def iterconsume(self, limit=None, timeout=None):
+        """Return an iterator that will consume from all queues/consumers"""
+
+        def _error_callback(exc):
+            if isinstance(exc, qpid.messaging.exceptions.Empty):
+                LOG.exception(_('Timed out waiting for RPC response: %s') %
+                        str(exc))
+                raise rpc_common.Timeout()
+            else:
+                LOG.exception(_('Failed to consume message from queue: %s') %
+                        str(exc))
+
+        def _consume():
+            nxt_receiver = self.session.next_receiver(timeout=timeout)
+            try:
+                self._lookup_consumer(nxt_receiver).consume()
+            except Exception:
+                LOG.exception(_("Error processing message.  Skipping it."))
+
+        for iteration in itertools.count(0):
+            if limit and iteration >= limit:
+                raise StopIteration
+            yield self.ensure(_error_callback, _consume)
+
+    def cancel_consumer_thread(self):
+        """Cancel a consumer thread"""
+        if self.consumer_thread is not None:
+            self.consumer_thread.kill()
+            try:
+                self.consumer_thread.wait()
+            except greenlet.GreenletExit:
+                pass
+            self.consumer_thread = None
+
+    def publisher_send(self, cls, topic, msg):
+        """Send to a publisher based on the publisher class"""
+
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.exception(_("Failed to publish message to topic "
+                "'%(topic)s': %(err_str)s") % log_info)
+
+        def _publisher_send():
+            publisher = cls(self.session, topic)
+            publisher.send(msg)
+
+        return self.ensure(_connect_error, _publisher_send)
+
+    def declare_direct_consumer(self, topic, callback):
+        """Create a 'direct' queue.
+        In nova's use, this is generally a msg_id queue used for
+        responses for call/multicall
+        """
+        self.declare_consumer(DirectConsumer, topic, callback)
+
+    def declare_topic_consumer(self, topic, callback=None):
+        """Create a 'topic' consumer."""
+        self.declare_consumer(TopicConsumer, topic, callback)
+
+    def declare_fanout_consumer(self, topic, callback):
+        """Create a 'fanout' consumer"""
+        self.declare_consumer(FanoutConsumer, topic, callback)
+
+    def direct_send(self, msg_id, msg):
+        """Send a 'direct' message"""
+        self.publisher_send(DirectPublisher, msg_id, msg)
+
+    def topic_send(self, topic, msg):
+        """Send a 'topic' message"""
+        self.publisher_send(TopicPublisher, topic, msg)
+
+    def fanout_send(self, topic, msg):
+        """Send a 'fanout' message"""
+        self.publisher_send(FanoutPublisher, topic, msg)
+
+    def notify_send(self, topic, msg, **kwargs):
+        """Send a notify message on a topic"""
+        self.publisher_send(NotifyPublisher, topic, msg)
+
+    def consume(self, limit=None):
+        """Consume from all queues/consumers"""
+        it = self.iterconsume(limit=limit)
+        while True:
+            try:
+                it.next()
+            except StopIteration:
+                return
+
+    def consume_in_thread(self):
+        """Consumer from all queues/consumers in a greenthread"""
+        def _consumer_thread():
+            try:
+                self.consume()
+            except greenlet.GreenletExit:
+                return
+        if self.consumer_thread is None:
+            self.consumer_thread = eventlet.spawn(_consumer_thread)
+        return self.consumer_thread
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer that calls a method in a proxy object"""
+        if fanout:
+            consumer = FanoutConsumer(self.session, topic,
+                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+        else:
+            consumer = TopicConsumer(self.session, topic,
+                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
+        self._register_consumer(consumer)
+        return consumer
+
+
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
+
+
+def create_connection(new=True):
+    """Create a connection"""
+    return rpc_amqp.create_connection(new, Connection.pool)
+
+
+def multicall(context, topic, msg, timeout=None):
+    """Make a call that returns multiple times."""
+    return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
+
+
+def call(context, topic, msg, timeout=None):
+    """Sends a message on a topic and wait for a response."""
+    return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
+
+
+def cast(context, topic, msg):
+    """Sends a message on a topic without waiting for a response."""
+    return rpc_amqp.cast(context, topic, msg, Connection.pool)
+
+
+def fanout_cast(context, topic, msg):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+
+
+def cast_to_server(context, server_params, topic, msg):
+    """Sends a message on a topic to a specific server."""
+    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+            Connection.pool)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+    """Sends a message on a fanout exchange to a specific server."""
+    return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
+            msg, Connection.pool)
+
+
+def notify(context, topic, msg):
+    """Sends a notification event on a topic."""
+    return rpc_amqp.notify(context, topic, msg, Connection.pool)
+
+
+def cleanup():
+    return rpc_amqp.cleanup(Connection.pool)
diff --git a/heat/service.py b/heat/service.py
new file mode 100644 (file)
index 0000000..5f0cfbd
--- /dev/null
@@ -0,0 +1,275 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import inspect
+import os
+
+import eventlet
+import logging
+import greenlet
+
+from heat.openstack.common import cfg
+
+from heat.common import utils
+from heat.common import config
+
+from heat import context
+from heat import exception
+from heat import rpc
+from heat import version
+
+LOG = logging.getLogger(__name__)
+
+service_opts = [
+    cfg.IntOpt('report_interval',
+               default=10,
+               help='seconds between nodes reporting state to datastore'),
+    cfg.IntOpt('periodic_interval',
+               default=60,
+               help='seconds between running periodic tasks'),
+    cfg.StrOpt('ec2_listen',
+               default="0.0.0.0",
+               help='IP address for EC2 API to listen'),
+    cfg.IntOpt('ec2_listen_port',
+               default=8773,
+               help='port for ec2 api to listen'),
+    cfg.StrOpt('osapi_compute_listen',
+               default="0.0.0.0",
+               help='IP address for OpenStack API to listen'),
+    cfg.IntOpt('osapi_compute_listen_port',
+               default=8774,
+               help='list port for osapi compute'),
+    cfg.StrOpt('metadata_manager',
+               default='nova.api.manager.MetadataManager',
+               help='OpenStack metadata service manager'),
+    cfg.StrOpt('metadata_listen',
+               default="0.0.0.0",
+               help='IP address for metadata api to listen'),
+    cfg.IntOpt('metadata_listen_port',
+               default=8775,
+               help='port for metadata api to listen'),
+    cfg.StrOpt('osapi_volume_listen',
+               default="0.0.0.0",
+               help='IP address for OpenStack Volume API to listen'),
+    cfg.IntOpt('osapi_volume_listen_port',
+               default=8776,
+               help='port for os volume api to listen'),
+    ]
+
+FLAGS = config.FLAGS
+FLAGS.register_opts(service_opts)
+
+
+class Launcher(object):
+    """Launch one or more services and wait for them to complete."""
+
+    def __init__(self):
+        """Initialize the service launcher.
+
+        :returns: None
+
+        """
+        self._services = []
+
+    @staticmethod
+    def run_server(server):
+        """Start and wait for a server to finish.
+
+        :param service: Server to run and wait for.
+        :returns: None
+
+        """
+        server.start()
+        server.wait()
+
+    def launch_server(self, server):
+        """Load and start the given server.
+
+        :param server: The server you would like to start.
+        :returns: None
+
+        """
+        gt = eventlet.spawn(self.run_server, server)
+        self._services.append(gt)
+
+    def stop(self):
+        """Stop all services which are currently running.
+
+        :returns: None
+
+        """
+        for service in self._services:
+            service.kill()
+
+    def wait(self):
+        """Waits until all services have been stopped, and then returns.
+
+        :returns: None
+
+        """
+        for service in self._services:
+            try:
+                service.wait()
+            except greenlet.GreenletExit:
+                pass
+
+
+class Service(object):
+    """Service object for binaries running on hosts.
+
+    A service takes a manager and enables rpc by listening to queues based
+    on topic. It also periodically runs tasks on the manager and reports
+    it state to the database services table."""
+
+    def __init__(self, host, binary, topic, manager,
+                 periodic_interval=None, *args, **kwargs):
+        self.host = host
+        self.binary = binary
+        self.topic = topic
+        self.manager_class_name = manager
+        manager_class = utils.import_class(self.manager_class_name)
+        self.manager = manager_class(host=self.host, *args, **kwargs)
+        self.periodic_interval = periodic_interval
+        super(Service, self).__init__(*args, **kwargs)
+        self.saved_args, self.saved_kwargs = args, kwargs
+        self.timers = []
+
+    def start(self):
+        vcs_string = version.version_string_with_vcs()
+        LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'),
+                  {'topic': self.topic, 'vcs_string': vcs_string})
+        # TODO do we need this ? -> utils.cleanup_file_locks()
+        self.manager.init_host()
+        self.model_disconnected = False
+        ctxt = context.get_admin_context()
+        # self._create_service_ref(ctxt)
+
+        self.conn = rpc.create_connection(new=True)
+        LOG.debug(_("Creating Consumer connection for Service %s") %
+                  self.topic)
+
+        # Share this same connection for these Consumers
+        self.conn.create_consumer(self.topic, self, fanout=False)
+
+        node_topic = '%s.%s' % (self.topic, self.host)
+        self.conn.create_consumer(node_topic, self, fanout=False)
+
+        self.conn.create_consumer(self.topic, self, fanout=True)
+
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+
+        if self.periodic_interval:
+            periodic = utils.LoopingCall(self.periodic_tasks)
+            periodic.start(interval=self.periodic_interval, now=False)
+            self.timers.append(periodic)
+
+    def __getattr__(self, key):
+        manager = self.__dict__.get('manager', None)
+        return getattr(manager, key)
+
+    @classmethod
+    def create(cls, host=None, binary=None, topic=None, manager=None,
+               periodic_interval=None):
+        """Instantiates class and passes back application object.
+
+        :param host: defaults to FLAGS.host
+        :param binary: defaults to basename of executable
+        :param topic: defaults to bin_name - 'heat-' part
+        :param manager: defaults to FLAGS.<topic>_manager
+        :param periodic_interval: defaults to FLAGS.periodic_interval
+
+        """
+        if not host:
+            host = FLAGS.host
+        if not binary:
+            binary = os.path.basename(inspect.stack()[-1][1])
+        if not topic:
+            topic = binary.rpartition('heat-')[2]
+        if not manager:
+            manager = FLAGS.get('%s_manager' % topic, None)
+        if not periodic_interval:
+            periodic_interval = FLAGS.periodic_interval
+        service_obj = cls(host, binary, topic, manager,
+                          periodic_interval)
+
+        return service_obj
+
+    def kill(self):
+        self.stop()
+
+    def stop(self):
+        # Try to shut the connection down, but if we get any sort of
+        # errors, go ahead and ignore them.. as we're shutting down anyway
+        try:
+            self.conn.close()
+        except Exception:
+            pass
+        for x in self.timers:
+            try:
+                x.stop()
+            except Exception:
+                pass
+        self.timers = []
+
+    def wait(self):
+        for x in self.timers:
+            try:
+                x.wait()
+            except Exception:
+                pass
+
+    def periodic_tasks(self, raise_on_error=False):
+        """Tasks to be run at a periodic interval."""
+        ctxt = context.get_admin_context()
+        self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
+
+
+# NOTE(vish): the global launcher is to maintain the existing
+#             functionality of calling service.serve +
+#             service.wait
+_launcher = None
+
+
+def serve(*servers):
+    global _launcher
+    if not _launcher:
+        _launcher = Launcher()
+    for server in servers:
+        _launcher.launch_server(server)
+
+
+def wait():
+    LOG.debug(_('Full set of FLAGS:'))
+    for flag in FLAGS:
+        flag_get = FLAGS.get(flag, None)
+        # hide flag contents from log if contains a password
+        # should use secret flag when switch over to openstack-common
+        if ("_password" in flag or "_key" in flag or
+                (flag == "sql_connection" and "mysql:" in flag_get)):
+            LOG.debug(_('%(flag)s : FLAG SET ') % locals())
+        else:
+            LOG.debug('%(flag)s : %(flag_get)s' % locals())
+    try:
+        _launcher.wait()
+    except KeyboardInterrupt:
+        _launcher.stop()
+    rpc.cleanup()