]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Sync rpc/impl_qpid.py from oslo-incubator.
authorDavid Ripton <dripton@redhat.com>
Mon, 12 Aug 2013 14:38:17 +0000 (10:38 -0400)
committerDavid Ripton <dripton@redhat.com>
Mon, 12 Aug 2013 14:39:55 +0000 (10:39 -0400)
Fixes bug #1211338 (seen in Neutron, marked in launchpad as oslo
since the bug was in oslo code).

Change-Id: I11971c1213b095979bd4f2e878ea2bcad3ceb617

neutron/openstack/common/rpc/impl_qpid.py

index a5f934e4d47910139d9e9449834d6bb407e4b446..6443513e7ee92ce7943f01db028d81ed3328c3b1 100644 (file)
@@ -24,7 +24,8 @@ import eventlet
 import greenlet
 from oslo.config import cfg
 
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import excutils
+from neutron.openstack.common.gettextutils import _  # noqa
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import log as logging
@@ -118,10 +119,17 @@ class ConsumerBase(object):
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
-        self.reconnect(session)
+        self.connect(session)
+
+    def connect(self, session):
+        """Declare the reciever on connect."""
+        self._declare_receiver(session)
 
     def reconnect(self, session):
         """Re-declare the receiver after a qpid reconnect."""
+        self._declare_receiver(session)
+
+    def _declare_receiver(self, session):
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
@@ -152,11 +160,15 @@ class ConsumerBase(object):
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
+            # TODO(sandy): Need support for optional ack_on_error.
             self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
 
+    def get_node_name(self):
+        return self.address.split(';')[0]
+
 
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'."""
@@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(DirectConsumer, self).__init__(session, callback,
-                                             "%s/%s" % (msg_id, msg_id),
-                                             {"type": "direct"},
-                                             msg_id,
-                                             {"exclusive": True})
+        super(DirectConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (msg_id, msg_id),
+            {"type": "direct"},
+            msg_id,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "exclusive": True,
+                "durable": conf.amqp_durable_queues,
+            })
 
 
 class TopicConsumer(ConsumerBase):
@@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
         """
 
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
-        super(TopicConsumer, self).__init__(session, callback,
-                                            "%s/%s" % (exchange_name, topic),
-                                            {}, name or topic, {})
+        super(TopicConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (exchange_name, topic),
+            {}, name or topic,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "durable": conf.amqp_durable_queues,
+            })
 
 
 class FanoutConsumer(ConsumerBase):
@@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
         'topic' is the topic to listen on
         'callback' is the callback to call when messages are received
         """
+        self.conf = conf
 
         super(FanoutConsumer, self).__init__(
             session, callback,
@@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
             "%s_fanout_%s" % (topic, uuid.uuid4().hex),
             {"exclusive": True})
 
+    def reconnect(self, session):
+        topic = self.get_node_name().rpartition('_fanout')[0]
+        params = {
+            'session': session,
+            'topic': topic,
+            'callback': self.callback,
+        }
+
+        self.__init__(conf=self.conf, **params)
+
+        super(FanoutConsumer, self).reconnect(session)
+
 
 class Publisher(object):
     """Base Publisher class."""
@@ -285,7 +320,7 @@ class DirectPublisher(Publisher):
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
         super(DirectPublisher, self).__init__(session, msg_id,
-                                              {"type": "Direct"})
+                                              {"type": "direct"})
 
 
 class TopicPublisher(Publisher):
@@ -575,6 +610,7 @@ class Connection(object):
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -615,7 +651,7 @@ class Connection(object):
         return consumer
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
+                           exchange_name=None, ack_on_error=True):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.