]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update rpc and notifier libs from openstack.common
authorDoug Hellmann <doug.hellmann@dreamhost.com>
Tue, 2 Oct 2012 16:34:30 +0000 (12:34 -0400)
committerDoug Hellmann <doug.hellmann@dreamhost.com>
Tue, 2 Oct 2012 16:34:30 +0000 (12:34 -0400)
Bring in the latest versions of the rpc and notifications
libraries to fix broken imports in the current version
of master.

Change-Id: I6f545df4622eabdf2f7bf4e9cf155db20bd2c4c1
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
quantum/openstack/common/notifier/api.py
quantum/openstack/common/rpc/__init__.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/service.py [new file with mode: 0644]

index 38fbde049db973996dd7430c193ccb37b80596a5..d3d4d0f6d092c5b3c5f6a88a266ee5cf926a74b1 100644 (file)
@@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload):
             driver.notify(context, msg)
         except Exception, e:
             LOG.exception(_("Problem '%(e)s' attempting to "
-              "send to notification system. Payload=%(payload)s") %
-                            locals())
+                            "send to notification system. "
+                            "Payload=%(payload)s") % locals())
 
 
 _drivers = None
@@ -169,7 +169,7 @@ def add_driver(notification_driver):
         except ImportError as e:
             LOG.exception(_("Failed to load notifier %s. "
                             "These notifications will not be sent.") %
-                            notification_driver)
+                          notification_driver)
     else:
         # Driver is already loaded; just add the object.
         _drivers[notification_driver] = notification_driver
index 6442f1b2cc7d9a6c38aad80466e1cb1e308d1dce..89940ef1aa97c63f4d6e9f5700d9b1fbe64b5b22 100644 (file)
@@ -49,15 +49,21 @@ rpc_opts = [
     cfg.ListOpt('allowed_rpc_exception_modules',
                 default=['quantum.openstack.common.exception',
                          'nova.exception',
+                         'cinder.exception',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
                      'upon receiving exception data from an rpc call.'),
-    cfg.StrOpt('control_exchange',
-               default='nova',
-               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
+    #
+    # The following options are not registered here, but are expected to be
+    # present. The project using this library must register these options with
+    # the configuration so that project-specific defaults may be defined.
+    #
+    #cfg.StrOpt('control_exchange',
+    #           default='nova',
+    #           help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
 ]
 
 cfg.CONF.register_opts(rpc_opts)
index b6d2751cb0ca19631f68dab15bdcee7d285d3586..e763369622b03faa5bd758004971c0e1351e30e3 100644 (file)
@@ -34,6 +34,7 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import semaphore
 
+from quantum.openstack.common import cfg
 from quantum.openstack.common import excutils
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import local
@@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool):
 def cleanup(connection_pool):
     if connection_pool:
         connection_pool.empty()
+
+
+def get_control_exchange(conf):
+    try:
+        return conf.control_exchange
+    except cfg.NoSuchOptError:
+        return 'openstack'
index 64e671818b03a85a4e3a1c86a46092e50511fe24..1de47ba180c59301c38cf1999924a014ffc2a7b9 100644 (file)
@@ -29,11 +29,11 @@ import kombu.connection
 import kombu.entity
 import kombu.messaging
 
-from openstack.common import cfg
-from openstack.common.gettextutils import _
-from openstack.common.rpc import amqp as rpc_amqp
-from openstack.common.rpc import common as rpc_common
-from openstack.common import network_utils
+from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common.rpc import amqp as rpc_amqp
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common import network_utils
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
@@ -790,4 +790,3 @@ def notify(conf, context, topic, msg):
 
 def cleanup():
     return rpc_amqp.cleanup(Connection.pool)
-
index af82564634bfc72f4c0082d1d5a87691747051ec..efdf21211d9a8e4e3ecdeeaffd9d694f2b5c3fad 100644 (file)
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
 class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
-    def __init__(self, conf, session, topic, callback, name=None):
+    def __init__(self, conf, session, topic, callback, name=None,
+                 exchange_name=None):
         """Init a 'topic' queue.
 
         :param session: the amqp session to use
@@ -180,9 +181,9 @@ class TopicConsumer(ConsumerBase):
         :param name: optional queue name, defaults to topic
         """
 
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
         super(TopicConsumer, self).__init__(session, callback,
-                                            "%s/%s" % (conf.control_exchange,
-                                                       topic),
+                                            "%s/%s" % (exchange_name, topic),
                                             {}, name or topic, {})
 
 
@@ -256,9 +257,9 @@ class TopicPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(TopicPublisher, self).__init__(
-            session,
-            "%s/%s" % (conf.control_exchange, topic))
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(TopicPublisher, self).__init__(session,
+                                             "%s/%s" % (exchange_name, topic))
 
 
 class FanoutPublisher(Publisher):
@@ -276,10 +277,10 @@ class NotifyPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(NotifyPublisher, self).__init__(
-            session,
-            "%s/%s" % (conf.control_exchange, topic),
-            {"durable": True})
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(NotifyPublisher, self).__init__(session,
+                                              "%s/%s" % (exchange_name, topic),
+                                              {"durable": True})
 
 
 class Connection(object):
@@ -464,10 +465,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
diff --git a/quantum/openstack/common/rpc/service.py b/quantum/openstack/common/rpc/service.py
new file mode 100644 (file)
index 0000000..0b2b474
--- /dev/null
@@ -0,0 +1,69 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+    """Service object for binaries running on hosts.
+
+    A service enables rpc by listening to queues based on topic and host."""
+    def __init__(self, host, topic, manager=None):
+        super(Service, self).__init__()
+        self.host = host
+        self.topic = topic
+        if manager is None:
+            self.manager = self
+        else:
+            self.manager = manager
+
+    def start(self):
+        super(Service, self).start()
+
+        self.conn = rpc.create_connection(new=True)
+        LOG.debug(_("Creating Consumer connection for Service %s") %
+                  self.topic)
+
+        rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
+
+        # Share this same connection for these Consumers
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
+
+        node_topic = '%s.%s' % (self.topic, self.host)
+        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
+
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
+
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+
+    def stop(self):
+        # Try to shut the connection down, but if we get any sort of
+        # errors, go ahead and ignore them.. as we're shutting down anyway
+        try:
+            self.conn.close()
+        except Exception:
+            pass
+        super(Service, self).stop()