default='noauth',
help='The strategy to use for auth. Supports noauth, keystone, '
'and deprecated.'),
+ cfg.StrOpt('control_exchange',
+ default='cinder',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
FLAGS.register_opts(global_opts)
cfg.ListOpt('allowed_rpc_exception_modules',
default=['cinder.openstack.common.exception',
'nova.exception',
+ 'cinder.exception',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
- cfg.StrOpt('control_exchange',
- default='nova',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
+ #
+ # The following options are not registered here, but are expected to be
+ # present. The project using this library must register these options with
+ # the configuration so that project-specific defaults may be defined.
+ #
+ #cfg.StrOpt('control_exchange',
+ # default='nova',
+ # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
from eventlet import pools
from eventlet import semaphore
+from cinder.openstack.common import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import local
def cleanup(connection_pool):
if connection_pool:
connection_pool.empty()
+
+
+def get_control_exchange(conf):
+ try:
+ return conf.control_exchange
+ except cfg.NoSuchOptError:
+ return 'openstack'
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(name=conf.control_exchange,
- type='topic',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
+ exchange = kombu.entity.Exchange(
+ name=rpc_amqp.get_control_exchange(conf),
+ type='topic', durable=options['durable'],
+ auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- super(TopicPublisher, self).__init__(channel, conf.control_exchange,
- topic, type='topic', **options)
+ super(TopicPublisher, self).__init__(channel,
+ rpc_amqp.get_control_exchange(conf), topic,
+ type='topic', **options)
class FanoutPublisher(Publisher):
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (conf.control_exchange,
- topic),
- {}, name or topic, {})
+ "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(
- session,
- "%s/%s" % (conf.control_exchange, topic))
+ super(TopicPublisher, self).__init__(session,
+ "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(
- session,
- "%s/%s" % (conf.control_exchange, topic),
- {"durable": True})
+ super(NotifyPublisher, self).__init__(session,
+ "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
+ {"durable": True})
class Connection(object):
#### (StrOpt) The strategy to use for auth. Supports noauth, keystone, and
#### deprecated.
+# control_exchange=cinder
+#### (StrOpt) AMQP exchange to connect to if using RabbitMQ or Qpid
+
######## defined in cinder.policy ########
#### (IntOpt) Seconds to wait before a cast expires (TTL). Only supported
#### by impl_zmq.
-# allowed_rpc_exception_modules=cinder.openstack.common.exception,nova.exception
+# allowed_rpc_exception_modules=cinder.openstack.common.exception,nova.exception,cinder.exception
#### (ListOpt) Modules of exceptions that are permitted to be recreatedupon
#### receiving exception data from an rpc call.
-# control_exchange=nova
-#### (StrOpt) AMQP exchange to connect to if using RabbitMQ or Qpid
-
# fake_rabbit=false
#### (BoolOpt) If passed, use a fake RabbitMQ provider