]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Sync rpc fix from oslo-incubator
authorFlavio Percoco <flaper87@gmail.com>
Fri, 6 Dec 2013 14:35:14 +0000 (15:35 +0100)
committerFlavio Percoco <flaper87@gmail.com>
Fri, 6 Dec 2013 14:35:51 +0000 (15:35 +0100)
Sync the following fixes from oslo-incubator:

e227c0e Properly reconnect subscribing clients when QPID broker restarts
ef406a2 Create a shared queue for QPID topic consumers

Change-Id: I286edf6bc4a677aa61f60da785802c19878c79c7
Closes-bug: #1251757
Closes-bug: #1257293

neutron/openstack/common/rpc/impl_qpid.py

index 026d39280cc46c23c29413efe7bc4f1686aa01e3..67e0f9c628086109cf035a0f7739acfc9397dbdf 100644 (file)
@@ -18,7 +18,6 @@
 import functools
 import itertools
 import time
-import uuid
 
 import eventlet
 import greenlet
@@ -123,7 +122,6 @@ class ConsumerBase(object):
                     },
                 },
                 "link": {
-                    "name": link_name,
                     "durable": True,
                     "x-declare": {
                         "durable": False,
@@ -138,6 +136,7 @@ class ConsumerBase(object):
                 "link": {
                     "x-declare": {
                         "auto-delete": True,
+                        "exclusive": False,
                     },
                 },
             }
@@ -145,6 +144,8 @@ class ConsumerBase(object):
             raise_invalid_topology_version()
 
         addr_opts["link"]["x-declare"].update(link_opts)
+        if link_name:
+            addr_opts["link"]["name"] = link_name
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
@@ -208,14 +209,16 @@ class DirectConsumer(ConsumerBase):
         if conf.qpid_topology_version == 1:
             node_name = "%s/%s" % (msg_id, msg_id)
             node_opts = {"type": "direct"}
+            link_name = msg_id
         elif conf.qpid_topology_version == 2:
             node_name = "amq.direct/%s" % msg_id
             node_opts = {}
+            link_name = None
         else:
             raise_invalid_topology_version()
 
         super(DirectConsumer, self).__init__(conf, session, callback,
-                                             node_name, node_opts, msg_id,
+                                             node_name, node_opts, link_name,
                                              link_opts)
 
 
@@ -266,16 +269,14 @@ class FanoutConsumer(ConsumerBase):
         if conf.qpid_topology_version == 1:
             node_name = "%s_fanout" % topic
             node_opts = {"durable": False, "type": "fanout"}
-            link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
         elif conf.qpid_topology_version == 2:
             node_name = "amq.topic/fanout/%s" % topic
             node_opts = {}
-            link_name = ""
         else:
             raise_invalid_topology_version()
 
         super(FanoutConsumer, self).__init__(conf, session, callback,
-                                             node_name, node_opts, link_name,
+                                             node_name, node_opts, None,
                                              link_opts)