import greenlet
from oslo.config import cfg
-from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _ # noqa
+from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- self.connect(session)
-
- def connect(self, session):
- """Declare the reciever on connect."""
- self._declare_receiver(session)
+ self.reconnect(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
- self._declare_receiver(session)
-
- def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
- # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
- def get_node_name(self):
- return self.address.split(';')[0]
-
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
'callback' is the callback to call when messages are received
"""
- super(DirectConsumer, self).__init__(
- session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {
- "auto-delete": conf.amqp_auto_delete,
- "exclusive": True,
- "durable": conf.amqp_durable_queues,
- })
+ super(DirectConsumer, self).__init__(session, callback,
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- super(TopicConsumer, self).__init__(
- session, callback,
- "%s/%s" % (exchange_name, topic),
- {}, name or topic,
- {
- "auto-delete": conf.amqp_auto_delete,
- "durable": conf.amqp_durable_queues,
- })
+ super(TopicConsumer, self).__init__(session, callback,
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
- self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
- def reconnect(self, session):
- topic = self.get_node_name().rpartition('_fanout')[0]
- params = {
- 'session': session,
- 'topic': topic,
- 'callback': self.callback,
- }
-
- self.__init__(conf=self.conf, **params)
-
- super(FanoutConsumer, self).reconnect(session)
-
class Publisher(object):
"""Base Publisher class."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
- {"type": "direct"})
+ {"type": "Direct"})
class TopicPublisher(Publisher):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
- @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None, ack_on_error=True):
+ exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.