]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Revert "Sync rpc/impl_qpid.py from oslo-incubator."
authorDan Prince <dprince@redhat.com>
Tue, 13 Aug 2013 13:52:12 +0000 (09:52 -0400)
committerDan Prince <dprince@redhat.com>
Tue, 13 Aug 2013 13:55:07 +0000 (09:55 -0400)
This reverts commit 3f4bb0443e96e9b7d04e3e4e77a8bb4e8647e01e.

Fixes LP Bug #1211778.

Change-Id: I254995a1bf5416fb70fbcac28ee399b1373efde7

neutron/openstack/common/rpc/impl_qpid.py

index 6443513e7ee92ce7943f01db028d81ed3328c3b1..a5f934e4d47910139d9e9449834d6bb407e4b446 100644 (file)
@@ -24,8 +24,7 @@ import eventlet
 import greenlet
 from oslo.config import cfg
 
-from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _  # noqa
+from neutron.openstack.common.gettextutils import _
 from neutron.openstack.common import importutils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import log as logging
@@ -119,17 +118,10 @@ class ConsumerBase(object):
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
-        self.connect(session)
-
-    def connect(self, session):
-        """Declare the reciever on connect."""
-        self._declare_receiver(session)
+        self.reconnect(session)
 
     def reconnect(self, session):
         """Re-declare the receiver after a qpid reconnect."""
-        self._declare_receiver(session)
-
-    def _declare_receiver(self, session):
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
@@ -160,15 +152,11 @@ class ConsumerBase(object):
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
-            # TODO(sandy): Need support for optional ack_on_error.
             self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
 
-    def get_node_name(self):
-        return self.address.split(';')[0]
-
 
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'."""
@@ -181,16 +169,11 @@ class DirectConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(DirectConsumer, self).__init__(
-            session, callback,
-            "%s/%s" % (msg_id, msg_id),
-            {"type": "direct"},
-            msg_id,
-            {
-                "auto-delete": conf.amqp_auto_delete,
-                "exclusive": True,
-                "durable": conf.amqp_durable_queues,
-            })
+        super(DirectConsumer, self).__init__(session, callback,
+                                             "%s/%s" % (msg_id, msg_id),
+                                             {"type": "direct"},
+                                             msg_id,
+                                             {"exclusive": True})
 
 
 class TopicConsumer(ConsumerBase):
@@ -208,14 +191,9 @@ class TopicConsumer(ConsumerBase):
         """
 
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
-        super(TopicConsumer, self).__init__(
-            session, callback,
-            "%s/%s" % (exchange_name, topic),
-            {}, name or topic,
-            {
-                "auto-delete": conf.amqp_auto_delete,
-                "durable": conf.amqp_durable_queues,
-            })
+        super(TopicConsumer, self).__init__(session, callback,
+                                            "%s/%s" % (exchange_name, topic),
+                                            {}, name or topic, {})
 
 
 class FanoutConsumer(ConsumerBase):
@@ -228,7 +206,6 @@ class FanoutConsumer(ConsumerBase):
         'topic' is the topic to listen on
         'callback' is the callback to call when messages are received
         """
-        self.conf = conf
 
         super(FanoutConsumer, self).__init__(
             session, callback,
@@ -237,18 +214,6 @@ class FanoutConsumer(ConsumerBase):
             "%s_fanout_%s" % (topic, uuid.uuid4().hex),
             {"exclusive": True})
 
-    def reconnect(self, session):
-        topic = self.get_node_name().rpartition('_fanout')[0]
-        params = {
-            'session': session,
-            'topic': topic,
-            'callback': self.callback,
-        }
-
-        self.__init__(conf=self.conf, **params)
-
-        super(FanoutConsumer, self).reconnect(session)
-
 
 class Publisher(object):
     """Base Publisher class."""
@@ -320,7 +285,7 @@ class DirectPublisher(Publisher):
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
         super(DirectPublisher, self).__init__(session, msg_id,
-                                              {"type": "direct"})
+                                              {"type": "Direct"})
 
 
 class TopicPublisher(Publisher):
@@ -610,7 +575,6 @@ class Connection(object):
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
-        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -651,7 +615,7 @@ class Connection(object):
         return consumer
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None, ack_on_error=True):
+                           exchange_name=None):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.