driver.notify(context, msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
- "send to notification system. Payload=%(payload)s") %
- locals())
+ "send to notification system. "
+ "Payload=%(payload)s") % locals())
_drivers = None
except ImportError as e:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
- notification_driver)
+ notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
cfg.ListOpt('allowed_rpc_exception_modules',
default=['quantum.openstack.common.exception',
'nova.exception',
+ 'cinder.exception',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
- cfg.StrOpt('control_exchange',
- default='nova',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
+ #
+ # The following options are not registered here, but are expected to be
+ # present. The project using this library must register these options with
+ # the configuration so that project-specific defaults may be defined.
+ #
+ #cfg.StrOpt('control_exchange',
+ # default='nova',
+ # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
from eventlet import pools
from eventlet import semaphore
+from quantum.openstack.common import cfg
from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
def cleanup(connection_pool):
if connection_pool:
connection_pool.empty()
+
+
+def get_control_exchange(conf):
+ try:
+ return conf.control_exchange
+ except cfg.NoSuchOptError:
+ return 'openstack'
import kombu.entity
import kombu.messaging
-from openstack.common import cfg
-from openstack.common.gettextutils import _
-from openstack.common.rpc import amqp as rpc_amqp
-from openstack.common.rpc import common as rpc_common
-from openstack.common import network_utils
+from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common.rpc import amqp as rpc_amqp
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
def cleanup():
return rpc_amqp.cleanup(Connection.pool)
-
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
:param name: optional queue name, defaults to topic
"""
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (conf.control_exchange,
- topic),
+ "%s/%s" % (exchange_name, topic),
{}, name or topic, {})
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(
- session,
- "%s/%s" % (conf.control_exchange, topic))
+ exchange_name = rpc_amqp.get_control_exchange(conf)
+ super(TopicPublisher, self).__init__(session,
+ "%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(
- session,
- "%s/%s" % (conf.control_exchange, topic),
- {"durable": True})
+ exchange_name = rpc_amqp.get_control_exchange(conf)
+ super(NotifyPublisher, self).__init__(session,
+ "%s/%s" % (exchange_name, topic),
+ {"durable": True})
class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+ """Service object for binaries running on hosts.
+
+ A service enables rpc by listening to queues based on topic and host."""
+ def __init__(self, host, topic, manager=None):
+ super(Service, self).__init__()
+ self.host = host
+ self.topic = topic
+ if manager is None:
+ self.manager = self
+ else:
+ self.manager = manager
+
+ def start(self):
+ super(Service, self).start()
+
+ self.conn = rpc.create_connection(new=True)
+ LOG.debug(_("Creating Consumer connection for Service %s") %
+ self.topic)
+
+ rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
+
+ # Share this same connection for these Consumers
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
+
+ node_topic = '%s.%s' % (self.topic, self.host)
+ self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
+
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
+
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
+
+ def stop(self):
+ # Try to shut the connection down, but if we get any sort of
+ # errors, go ahead and ignore them.. as we're shutting down anyway
+ try:
+ self.conn.close()
+ except Exception:
+ pass
+ super(Service, self).stop()