cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
+ # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
+ # this file could probably use some additional refactoring so that the
+ # differences between each version are split into different classes.
+ cfg.IntOpt('qpid_topology_version',
+ default=1,
+ help="The qpid topology version to use. Version 1 is what "
+ "was originally used by impl_qpid. Version 2 includes "
+ "some backwards-incompatible changes that allow broker "
+ "federation to work. Users should update to version 2 "
+ "when they are able to take everything down, as it "
+ "requires a clean break."),
]
cfg.CONF.register_opts(qpid_opts)
+def raise_invalid_topology_version(conf):
+ msg = (_("Invalid value for qpid_topology_version: %d") %
+ conf.qpid_topology_version)
+ LOG.error(msg)
+ raise Exception(msg)
+
+
class ConsumerBase(object):
"""Consumer base class."""
- def __init__(self, session, callback, node_name, node_opts,
+ def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
self.receiver = None
self.session = None
- addr_opts = {
- "create": "always",
- "node": {
- "type": "topic",
- "x-declare": {
+ if conf.qpid_topology_version == 1:
+ addr_opts = {
+ "create": "always",
+ "node": {
+ "type": "topic",
+ "x-declare": {
+ "durable": True,
+ "auto-delete": True,
+ },
+ },
+ "link": {
+ "name": link_name,
"durable": True,
- "auto-delete": True,
+ "x-declare": {
+ "durable": False,
+ "auto-delete": True,
+ "exclusive": False,
+ },
},
- },
- "link": {
- "name": link_name,
- "durable": True,
- "x-declare": {
- "durable": False,
- "auto-delete": True,
- "exclusive": False,
+ }
+ addr_opts["node"]["x-declare"].update(node_opts)
+ elif conf.qpid_topology_version == 2:
+ addr_opts = {
+ "link": {
+ "x-declare": {
+ "auto-delete": True,
+ },
},
- },
- }
- addr_opts["node"]["x-declare"].update(node_opts)
+ }
+ else:
+ raise_invalid_topology_version()
+
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
'callback' is the callback to call when messages are received
"""
- super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ link_opts = {
+ "auto-delete": conf.amqp_auto_delete,
+ "exclusive": True,
+ "durable": conf.amqp_durable_queues,
+ }
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s/%s" % (msg_id, msg_id)
+ node_opts = {"type": "direct"}
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.direct/%s" % msg_id
+ node_opts = {}
+ else:
+ raise_invalid_topology_version()
+
+ super(DirectConsumer, self).__init__(conf, session, callback,
+ node_name, node_opts, msg_id,
+ link_opts)
class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (exchange_name, topic),
- {}, name or topic, {})
+ link_opts = {
+ "auto-delete": conf.amqp_auto_delete,
+ "durable": conf.amqp_durable_queues,
+ }
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s/%s" % (exchange_name, topic)
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+ else:
+ raise_invalid_topology_version()
+
+ super(TopicConsumer, self).__init__(conf, session, callback, node_name,
+ {}, name or topic, link_opts)
class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(FanoutConsumer, self).__init__(
- session, callback,
- "%s_fanout" % topic,
- {"durable": False, "type": "fanout"},
- "%s_fanout_%s" % (topic, uuid.uuid4().hex),
- {"exclusive": True})
+ link_opts = {"exclusive": True}
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s_fanout" % topic
+ node_opts = {"durable": False, "type": "fanout"}
+ link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.topic/fanout/%s" % topic
+ node_opts = {}
+ link_name = ""
+ else:
+ raise_invalid_topology_version()
+
+ super(FanoutConsumer, self).__init__(conf, session, callback,
+ node_name, node_opts, link_name,
+ link_opts)
class Publisher(object):
"""Base Publisher class"""
- def __init__(self, session, node_name, node_opts=None):
+ def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session
- addr_opts = {
- "create": "always",
- "node": {
- "type": "topic",
- "x-declare": {
- "durable": False,
- # auto-delete isn't implemented for exchanges in qpid,
- # but put in here anyway
- "auto-delete": True,
+ if conf.qpid_topology_version == 1:
+ addr_opts = {
+ "create": "always",
+ "node": {
+ "type": "topic",
+ "x-declare": {
+ "durable": False,
+ # auto-delete isn't implemented for exchanges in qpid,
+ # but put in here anyway
+ "auto-delete": True,
+ },
},
- },
- }
- if node_opts:
- addr_opts["node"]["x-declare"].update(node_opts)
+ }
+ if node_opts:
+ addr_opts["node"]["x-declare"].update(node_opts)
- self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+ elif conf.qpid_topology_version == 2:
+ self.address = node_name
+ else:
+ raise_invalid_topology_version()
self.reconnect(session)
"""Publisher class for 'direct'"""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
- super(DirectPublisher, self).__init__(session, msg_id,
- {"type": "Direct"})
+ if conf.qpid_topology_version == 1:
+ node_name = msg_id
+ node_opts = {"type": "direct"}
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.direct/%s" % msg_id
+ node_opts = {}
+ else:
+ raise_invalid_topology_version()
+
+ super(DirectPublisher, self).__init__(conf, session, node_name,
+ node_opts)
class TopicPublisher(Publisher):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
- super(TopicPublisher, self).__init__(session,
- "%s/%s" % (exchange_name, topic))
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s/%s" % (exchange_name, topic)
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+ else:
+ raise_invalid_topology_version()
+
+ super(TopicPublisher, self).__init__(conf, session, node_name)
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
- super(FanoutPublisher, self).__init__(
- session,
- "%s_fanout" % topic, {"type": "fanout"})
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s_fanout" % topic
+ node_opts = {"type": "fanout"}
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.topic/fanout/%s" % topic
+ node_opts = {}
+ else:
+ raise_invalid_topology_version()
+
+ super(FanoutPublisher, self).__init__(conf, session, node_name,
+ node_opts)
class NotifyPublisher(Publisher):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
- super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (exchange_name, topic),
- {"durable": True})
+ node_opts = {"durable": True}
+
+ if conf.qpid_topology_version == 1:
+ node_name = "%s/%s" % (exchange_name, topic)
+ elif conf.qpid_topology_version == 2:
+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+ else:
+ raise_invalid_topology_version()
+
+ super(NotifyPublisher, self).__init__(conf, session, node_name,
+ node_opts)
class Connection(object):