import eventlet
import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
attempt = 0
try:
self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
+ if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from quantum.openstack.common import cfg
from quantum.openstack.common import processutils as utils
from quantum.openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
]
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = ZMQ_CTX.socket(zmq_type)
+ self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
pass
self.subscriptions = []
- # Linger -1 prevents lost/dropped messages
try:
- self.sock.close(linger=-1)
+ # Default is to linger
+ self.sock.close()
except Exception:
- pass
+ # While this is a bad thing to happen,
+ # it would be much worse if some of the code calling this
+ # were to fail. For now, lets log, and later evaluate
+ # if we can safely raise here.
+ LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
- queues = matchmaker.queues(topic)
+ queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
+ if ZMQ_CTX:
+ ZMQ_CTX.term()
+ ZMQ_CTX = None
+
global matchmaker
matchmaker = None
- ZMQ_CTX.term()
- ZMQ_CTX = None
-def register_opts(conf):
- """Registration of options for this driver."""
- #NOTE(ewindisch): ZMQ_CTX and matchmaker
- # are initialized here as this is as good
- # an initialization method as any.
+def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
- # We memoize through these globals
global ZMQ_CTX
- global matchmaker
- global CONF
-
- if not CONF:
- conf.register_opts(zmq_opts)
- CONF = conf
- # Don't re-set, if this method is called twice.
if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+ return ZMQ_CTX
+
+
+def _get_matchmaker():
+ global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+ return matchmaker