]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update Oslo rpc module
authorZhongyue Luo <zhongyue.nah@intel.com>
Thu, 17 Jan 2013 05:00:46 +0000 (13:00 +0800)
committerZhongyue Luo <zhongyue.nah@intel.com>
Thu, 17 Jan 2013 05:00:46 +0000 (13:00 +0800)
Updates rpc/impl_qpid and rpc/impl_zmq

Change-Id: Ie1b2a9d9dc3228528c501ef92d3db2133c3fe127

quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py

index 16f21a4e58da4eeb388c08d1b04adc38f21c75f8..7743b7205ca0398d7515163ba68b218682105232 100644 (file)
@@ -22,16 +22,18 @@ import uuid
 
 import eventlet
 import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
 
 from quantum.openstack.common import cfg
 from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
 from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import amqp as rpc_amqp
 from quantum.openstack.common.rpc import common as rpc_common
 
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
 LOG = logging.getLogger(__name__)
 
 qpid_opts = [
@@ -275,6 +277,9 @@ class Connection(object):
     pool = None
 
     def __init__(self, conf, server_params=None):
+        if not qpid_messaging:
+            raise ImportError("Failed to import qpid.messaging")
+
         self.session = None
         self.consumers = {}
         self.consumer_thread = None
@@ -303,7 +308,7 @@ class Connection(object):
 
     def connection_create(self, broker):
         # Create the connection - this does not open the connection
-        self.connection = qpid.messaging.Connection(broker)
+        self.connection = qpid_messaging.Connection(broker)
 
         # Check if flags are set and if so set them for the connection
         # before we call open
@@ -328,7 +333,7 @@ class Connection(object):
         if self.connection.opened():
             try:
                 self.connection.close()
-            except qpid.messaging.exceptions.ConnectionError:
+            except qpid_exceptions.ConnectionError:
                 pass
 
         attempt = 0
@@ -340,7 +345,7 @@ class Connection(object):
             try:
                 self.connection_create(broker)
                 self.connection.open()
-            except qpid.messaging.exceptions.ConnectionError, e:
+            except qpid_exceptions.ConnectionError, e:
                 msg_dict = dict(e=e, delay=delay)
                 msg = _("Unable to connect to AMQP server: %(e)s. "
                         "Sleeping %(delay)s seconds") % msg_dict
@@ -367,8 +372,8 @@ class Connection(object):
         while True:
             try:
                 return method(*args, **kwargs)
-            except (qpid.messaging.exceptions.Empty,
-                    qpid.messaging.exceptions.ConnectionError), e:
+            except (qpid_exceptions.Empty,
+                    qpid_exceptions.ConnectionError), e:
                 if error_callback:
                     error_callback(e)
                 self.reconnect()
@@ -408,7 +413,7 @@ class Connection(object):
         """Return an iterator that will consume from all queues/consumers"""
 
         def _error_callback(exc):
-            if isinstance(exc, qpid.messaging.exceptions.Empty):
+            if isinstance(exc, qpid_exceptions.Empty):
                 LOG.exception(_('Timed out waiting for RPC response: %s') %
                               str(exc))
                 raise rpc_common.Timeout()
index eef873a43734ad8d03227e8be5d6499fb0840066..afa0bc8e1cdf510d4b48d4fb0ebf38a0769e569b 100644 (file)
@@ -23,7 +23,6 @@ import types
 import uuid
 
 import eventlet
-from eventlet.green import zmq
 import greenlet
 
 from quantum.openstack.common import cfg
@@ -33,6 +32,7 @@ from quantum.openstack.common import jsonutils
 from quantum.openstack.common import processutils as utils
 from quantum.openstack.common.rpc import common as rpc_common
 
+zmq = importutils.try_import('eventlet.green.zmq')
 
 # for convenience, are not modified.
 pformat = pprint.pformat
@@ -76,9 +76,9 @@ zmq_opts = [
 ]
 
 
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
 ZMQ_CTX = None  # ZeroMQ Context, must be global.
 matchmaker = None  # memoized matchmaker object
 
@@ -113,7 +113,7 @@ class ZmqSocket(object):
     """
 
     def __init__(self, addr, zmq_type, bind=True, subscribe=None):
-        self.sock = ZMQ_CTX.socket(zmq_type)
+        self.sock = _get_ctxt().socket(zmq_type)
         self.addr = addr
         self.type = zmq_type
         self.subscriptions = []
@@ -187,11 +187,15 @@ class ZmqSocket(object):
                     pass
             self.subscriptions = []
 
-        # Linger -1 prevents lost/dropped messages
         try:
-            self.sock.close(linger=-1)
+            # Default is to linger
+            self.sock.close()
         except Exception:
-            pass
+            # While this is a bad thing to happen,
+            # it would be much worse if some of the code calling this
+            # were to fail. For now, lets log, and later evaluate
+            # if we can safely raise here.
+            LOG.error("ZeroMQ socket could not be closed.")
         self.sock = None
 
     def recv(self):
@@ -208,7 +212,9 @@ class ZmqSocket(object):
 class ZmqClient(object):
     """Client for ZMQ sockets."""
 
-    def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+    def __init__(self, addr, socket_type=None, bind=False):
+        if socket_type is None:
+            socket_type = zmq.PUSH
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
     def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
@@ -685,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
     conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
 
-    queues = matchmaker.queues(topic)
+    queues = _get_matchmaker().queues(topic)
     LOG.debug(_("Sending message(s) to: %s"), queues)
 
     # Don't stack if we have no matchmaker results
@@ -753,32 +759,29 @@ def notify(conf, context, topic, msg, **kwargs):
 def cleanup():
     """Clean up resources in use by implementation."""
     global ZMQ_CTX
+    if ZMQ_CTX:
+        ZMQ_CTX.term()
+    ZMQ_CTX = None
+
     global matchmaker
     matchmaker = None
-    ZMQ_CTX.term()
-    ZMQ_CTX = None
 
 
-def register_opts(conf):
-    """Registration of options for this driver."""
-    #NOTE(ewindisch): ZMQ_CTX and matchmaker
-    # are initialized here as this is as good
-    # an initialization method as any.
+def _get_ctxt():
+    if not zmq:
+        raise ImportError("Failed to import eventlet.green.zmq")
 
-    # We memoize through these globals
     global ZMQ_CTX
-    global matchmaker
-    global CONF
-
-    if not CONF:
-        conf.register_opts(zmq_opts)
-        CONF = conf
-    # Don't re-set, if this method is called twice.
     if not ZMQ_CTX:
-        ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+    return ZMQ_CTX
+
+
+def _get_matchmaker():
+    global matchmaker
     if not matchmaker:
         # rpc_zmq_matchmaker should be set to a 'module.Class'
-        mm_path = conf.rpc_zmq_matchmaker.split('.')
+        mm_path = CONF.rpc_zmq_matchmaker.split('.')
         mm_module = '.'.join(mm_path[:-1])
         mm_class = mm_path[-1]
 
@@ -791,6 +794,4 @@ def register_opts(conf):
         mm_impl = importutils.import_module(mm_module)
         mm_constructor = getattr(mm_impl, mm_class)
         matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+    return matchmaker