]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Sync rpc fix from oslo-incubator
authorRussell Bryant <rbryant@redhat.com>
Fri, 30 Aug 2013 19:36:36 +0000 (15:36 -0400)
committerRussell Bryant <rbryant@redhat.com>
Fri, 30 Aug 2013 19:36:36 +0000 (15:36 -0400)
Sync the following fix from oslo-incubator:

76972e2 Support a new qpid topology

This includes one other commit, so that the above fix could be brought
over cleanly:

5ff534d Add config for amqp durable/auto_delete queues

Change-Id: I1fd5aaf87ec87836df3e44e83247bf82301475f5
Closes-bug: #1178375

cinder/openstack/common/rpc/amqp.py
cinder/openstack/common/rpc/impl_kombu.py
cinder/openstack/common/rpc/impl_qpid.py
etc/cinder/cinder.conf.sample

index 9addfa1c76391489b56771a14fe2c66d3df3e60e..587d0f91e40ffb604f8e5cd0661c32ea243bd3ac 100644 (file)
@@ -46,12 +46,20 @@ from cinder.openstack.common import log as logging
 from cinder.openstack.common.rpc import common as rpc_common
 
 
-# TODO(pekowski): Remove this option in Havana.
 amqp_opts = [
+    # TODO(pekowski): Remove this option in Havana.
     cfg.BoolOpt('amqp_rpc_single_reply_queue',
                 default=False,
                 help='Enable a fast single reply queue if using AMQP based '
                 'RPC like RabbitMQ or Qpid.'),
+    cfg.BoolOpt('amqp_durable_queues',
+                default=False,
+                deprecated_name='rabbit_durable_queues',
+                deprecated_group='DEFAULT',
+                help='Use durable queues in amqp.'),
+    cfg.BoolOpt('amqp_auto_delete',
+                default=False,
+                help='Auto-delete queues in amqp.'),
 ]
 
 cfg.CONF.register_opts(amqp_opts)
index 681f531843d3fe580f99ca3264ab52367c188f5e..424a61c0a1cebba8e7f818c1e75a5912dd05908e 100644 (file)
@@ -82,9 +82,6 @@ kombu_opts = [
                default=0,
                help='maximum retries with trying to connect to RabbitMQ '
                     '(the default of 0 implies an infinite retry count)'),
-    cfg.BoolOpt('rabbit_durable_queues',
-                default=False,
-                help='use durable queues in RabbitMQ'),
     cfg.BoolOpt('rabbit_ha_queues',
                 default=False,
                 help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@@ -233,9 +230,9 @@ class TopicConsumer(ConsumerBase):
         Other kombu options may be passed as keyword arguments
         """
         # Default options
-        options = {'durable': conf.rabbit_durable_queues,
+        options = {'durable': conf.amqp_durable_queues,
                    'queue_arguments': _get_queue_arguments(conf),
-                   'auto_delete': False,
+                   'auto_delete': conf.amqp_auto_delete,
                    'exclusive': False}
         options.update(kwargs)
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@@ -339,8 +336,8 @@ class TopicPublisher(Publisher):
 
         Kombu options may be passed as keyword args to override defaults
         """
-        options = {'durable': conf.rabbit_durable_queues,
-                   'auto_delete': False,
+        options = {'durable': conf.amqp_durable_queues,
+                   'auto_delete': conf.amqp_auto_delete,
                    'exclusive': False}
         options.update(kwargs)
         exchange_name = rpc_amqp.get_control_exchange(conf)
@@ -370,7 +367,7 @@ class NotifyPublisher(TopicPublisher):
     """Publisher class for 'notify'"""
 
     def __init__(self, conf, channel, topic, **kwargs):
-        self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
         self.queue_arguments = _get_queue_arguments(conf)
         super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
index 8f52fb84f79e53dce2d9d288ba7448b3865a2083..1137278d8df460b67b0806dee46c7de4aa63ffb8 100644 (file)
@@ -65,15 +65,33 @@ qpid_opts = [
     cfg.BoolOpt('qpid_tcp_nodelay',
                 default=True,
                 help='Disable Nagle algorithm'),
+    # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
+    # this file could probably use some additional refactoring so that the
+    # differences between each version are split into different classes.
+    cfg.IntOpt('qpid_topology_version',
+               default=1,
+               help="The qpid topology version to use.  Version 1 is what "
+                    "was originally used by impl_qpid.  Version 2 includes "
+                    "some backwards-incompatible changes that allow broker "
+                    "federation to work.  Users should update to version 2 "
+                    "when they are able to take everything down, as it "
+                    "requires a clean break."),
 ]
 
 cfg.CONF.register_opts(qpid_opts)
 
 
+def raise_invalid_topology_version(conf):
+    msg = (_("Invalid value for qpid_topology_version: %d") %
+           conf.qpid_topology_version)
+    LOG.error(msg)
+    raise Exception(msg)
+
+
 class ConsumerBase(object):
     """Consumer base class."""
 
-    def __init__(self, session, callback, node_name, node_opts,
+    def __init__(self, conf, session, callback, node_name, node_opts,
                  link_name, link_opts):
         """Declare a queue on an amqp session.
 
@@ -91,26 +109,38 @@ class ConsumerBase(object):
         self.receiver = None
         self.session = None
 
-        addr_opts = {
-            "create": "always",
-            "node": {
-                "type": "topic",
-                "x-declare": {
+        if conf.qpid_topology_version == 1:
+            addr_opts = {
+                "create": "always",
+                "node": {
+                    "type": "topic",
+                    "x-declare": {
+                        "durable": True,
+                        "auto-delete": True,
+                    },
+                },
+                "link": {
+                    "name": link_name,
                     "durable": True,
-                    "auto-delete": True,
+                    "x-declare": {
+                        "durable": False,
+                        "auto-delete": True,
+                        "exclusive": False,
+                    },
                 },
-            },
-            "link": {
-                "name": link_name,
-                "durable": True,
-                "x-declare": {
-                    "durable": False,
-                    "auto-delete": True,
-                    "exclusive": False,
+            }
+            addr_opts["node"]["x-declare"].update(node_opts)
+        elif conf.qpid_topology_version == 2:
+            addr_opts = {
+                "link": {
+                    "x-declare": {
+                        "auto-delete": True,
+                    },
                 },
-            },
-        }
-        addr_opts["node"]["x-declare"].update(node_opts)
+            }
+        else:
+            raise_invalid_topology_version()
+
         addr_opts["link"]["x-declare"].update(link_opts)
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@@ -149,11 +179,24 @@ class DirectConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(DirectConsumer, self).__init__(session, callback,
-                                             "%s/%s" % (msg_id, msg_id),
-                                             {"type": "direct"},
-                                             msg_id,
-                                             {"exclusive": True})
+        link_opts = {
+            "auto-delete": conf.amqp_auto_delete,
+            "exclusive": True,
+            "durable": conf.amqp_durable_queues,
+        }
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (msg_id, msg_id)
+            node_opts = {"type": "direct"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.direct/%s" % msg_id
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(DirectConsumer, self).__init__(conf, session, callback,
+                                             node_name, node_opts, msg_id,
+                                             link_opts)
 
 
 class TopicConsumer(ConsumerBase):
@@ -171,9 +214,20 @@ class TopicConsumer(ConsumerBase):
         """
 
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
-        super(TopicConsumer, self).__init__(session, callback,
-                                            "%s/%s" % (exchange_name, topic),
-                                            {}, name or topic, {})
+        link_opts = {
+            "auto-delete": conf.amqp_auto_delete,
+            "durable": conf.amqp_durable_queues,
+        }
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(TopicConsumer, self).__init__(conf, session, callback, node_name,
+                                            {}, name or topic, link_opts)
 
 
 class FanoutConsumer(ConsumerBase):
@@ -187,40 +241,55 @@ class FanoutConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(FanoutConsumer, self).__init__(
-            session, callback,
-            "%s_fanout" % topic,
-            {"durable": False, "type": "fanout"},
-            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
-            {"exclusive": True})
+        link_opts = {"exclusive": True}
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s_fanout" % topic
+            node_opts = {"durable": False, "type": "fanout"}
+            link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/fanout/%s" % topic
+            node_opts = {}
+            link_name = ""
+        else:
+            raise_invalid_topology_version()
+
+        super(FanoutConsumer, self).__init__(conf, session, callback,
+                                             node_name, node_opts, link_name,
+                                             link_opts)
 
 
 class Publisher(object):
     """Base Publisher class"""
 
-    def __init__(self, session, node_name, node_opts=None):
+    def __init__(self, conf, session, node_name, node_opts=None):
         """Init the Publisher class with the exchange_name, routing_key,
         and other options
         """
         self.sender = None
         self.session = session
 
-        addr_opts = {
-            "create": "always",
-            "node": {
-                "type": "topic",
-                "x-declare": {
-                    "durable": False,
-                    # auto-delete isn't implemented for exchanges in qpid,
-                    # but put in here anyway
-                    "auto-delete": True,
+        if conf.qpid_topology_version == 1:
+            addr_opts = {
+                "create": "always",
+                "node": {
+                    "type": "topic",
+                    "x-declare": {
+                        "durable": False,
+                        # auto-delete isn't implemented for exchanges in qpid,
+                        # but put in here anyway
+                        "auto-delete": True,
+                    },
                 },
-            },
-        }
-        if node_opts:
-            addr_opts["node"]["x-declare"].update(node_opts)
+            }
+            if node_opts:
+                addr_opts["node"]["x-declare"].update(node_opts)
 
-        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+            self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+        elif conf.qpid_topology_version == 2:
+            self.address = node_name
+        else:
+            raise_invalid_topology_version()
 
         self.reconnect(session)
 
@@ -237,8 +306,17 @@ class DirectPublisher(Publisher):
     """Publisher class for 'direct'"""
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
-        super(DirectPublisher, self).__init__(session, msg_id,
-                                              {"type": "Direct"})
+        if conf.qpid_topology_version == 1:
+            node_name = msg_id
+            node_opts = {"type": "direct"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.direct/%s" % msg_id
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(DirectPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class TopicPublisher(Publisher):
@@ -247,8 +325,15 @@ class TopicPublisher(Publisher):
         """init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
-        super(TopicPublisher, self).__init__(session,
-                                             "%s/%s" % (exchange_name, topic))
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(TopicPublisher, self).__init__(conf, session, node_name)
 
 
 class FanoutPublisher(Publisher):
@@ -256,9 +341,18 @@ class FanoutPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'fanout' publisher.
         """
-        super(FanoutPublisher, self).__init__(
-            session,
-            "%s_fanout" % topic, {"type": "fanout"})
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s_fanout" % topic
+            node_opts = {"type": "fanout"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/fanout/%s" % topic
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(FanoutPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class NotifyPublisher(Publisher):
@@ -267,9 +361,17 @@ class NotifyPublisher(Publisher):
         """init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
-        super(NotifyPublisher, self).__init__(session,
-                                              "%s/%s" % (exchange_name, topic),
-                                              {"durable": True})
+        node_opts = {"durable": True}
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(NotifyPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class Connection(object):
index 8ce6c2555bf5f92d2290b6ab573ed243dd9a5eb0..7b39e2c96170f6d81113fd71a7cd3f5bb5f01f43 100644 (file)
 # like RabbitMQ or Qpid. (boolean value)
 #amqp_rpc_single_reply_queue=false
 
+# Use durable queues in amqp. (boolean value)
+#amqp_durable_queues=false
+
+# Auto-delete queues in amqp. (boolean value)
+#amqp_auto_delete=false
+
 
 #
 # Options defined in cinder.openstack.common.rpc.impl_kombu
 # value)
 #rabbit_max_retries=0
 
-# use durable queues in RabbitMQ (boolean value)
-#rabbit_durable_queues=false
-
 # use H/A queues in RabbitMQ (x-ha-policy: all).You need to
 # wipe RabbitMQ database when changing this option. (boolean
 # value)
 # Disable Nagle algorithm (boolean value)
 #qpid_tcp_nodelay=true
 
+# The qpid topology version to use.  Version 1 is what was
+# originally used by impl_qpid.  Version 2 includes some
+# backwards-incompatible changes that allow broker federation
+# to work.  Users should update to version 2 when they are
+# able to take everything down, as it requires a clean break.
+# (integer value)
+#qpid_topology_version=1
+
 
 #
 # Options defined in cinder.openstack.common.rpc.impl_zmq
 #volume_dd_blocksize=1M
 
 
-# Total option count: 353
+# Total option count: 355