]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Sync zmq changes from openstack-common
authorMark McLoughlin <markmc@redhat.com>
Wed, 5 Sep 2012 11:19:07 +0000 (12:19 +0100)
committerMark McLoughlin <markmc@redhat.com>
Wed, 5 Sep 2012 11:19:49 +0000 (12:19 +0100)
Syncs the following from stable/folsom:

 9f9e14c Update zmq context cleanup to use term.
 704fb8b Remove register_opts from client methods
 f88d38b Rename FLAGS to CONF; Remove self.conf

Change-Id: Ic6ab732d71420aba032554c51904dd71f1d23b78

cinder/openstack/common/rpc/impl_zmq.py

index 87bd70d1d2894e3cfe71af7dbd67c3f455ed70c0..f3f3b9e90ce38e2f839f7d8993bccf2b3ce03ed2 100644 (file)
@@ -72,7 +72,7 @@ zmq_opts = [
 
 # These globals are defined in register_opts(conf),
 # a mandatory initialization call
-FLAGS = None
+CONF = None
 ZMQ_CTX = None  # ZeroMQ Context, must be global.
 matchmaker = None  # memoized matchmaker object
 
@@ -274,7 +274,7 @@ class InternalContext(object):
             ctx.replies)
 
         LOG.debug(_("Sending reply"))
-        cast(FLAGS, ctx, topic, {
+        cast(CONF, ctx, topic, {
             'method': '-process_reply',
             'args': {
                 'msg_id': msg_id,
@@ -329,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
     def __init__(self, conf):
         super(ZmqBaseReactor, self).__init__()
 
-        self.conf = conf
         self.mapping = {}
         self.proxies = {}
         self.threads = []
@@ -405,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
         super(ZmqProxy, self).__init__(conf)
 
         self.topic_proxy = {}
-        ipc_dir = conf.rpc_zmq_ipc_dir
+        ipc_dir = CONF.rpc_zmq_ipc_dir
 
         self.topic_proxy['zmq_replies'] = \
             ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
@@ -413,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
         self.sockets.append(self.topic_proxy['zmq_replies'])
 
     def consume(self, sock):
-        ipc_dir = self.conf.rpc_zmq_ipc_dir
+        ipc_dir = CONF.rpc_zmq_ipc_dir
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
@@ -487,7 +486,6 @@ class Connection(rpc_common.Connection):
     """Manages connections and threads."""
 
     def __init__(self, conf):
-        self.conf = conf
         self.reactor = ZmqReactor(conf)
 
     def create_consumer(self, topic, proxy, fanout=False):
@@ -508,7 +506,7 @@ class Connection(rpc_common.Connection):
 
         # Receive messages from (local) proxy
         inaddr = "ipc://%s/zmq_topic_%s" % \
-            (self.conf.rpc_zmq_ipc_dir, topic)
+            (CONF.rpc_zmq_ipc_dir, topic)
 
         LOG.debug(_("Consumer is a zmq.%s"),
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
@@ -527,7 +525,7 @@ class Connection(rpc_common.Connection):
 
 
 def _cast(addr, context, msg_id, topic, msg, timeout=None):
-    timeout_cast = timeout or FLAGS.rpc_cast_timeout
+    timeout_cast = timeout or CONF.rpc_cast_timeout
     payload = [RpcContext.marshal(context), msg]
 
     with Timeout(timeout_cast, exception=rpc_common.Timeout):
@@ -545,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
 
 def _call(addr, context, msg_id, topic, msg, timeout=None):
     # timeout_response is how long we wait for a response
-    timeout = timeout or FLAGS.rpc_response_timeout
+    timeout = timeout or CONF.rpc_response_timeout
 
     # The msg_id is used to track replies.
     msg_id = str(uuid.uuid4().hex)
 
     # Replies always come into the reply service.
-    reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
 
     LOG.debug(_("Creating payload"))
     # Curry the original request into a reply method.
@@ -573,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     with Timeout(timeout, exception=rpc_common.Timeout):
         try:
             msg_waiter = ZmqSocket(
-                "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
                 zmq.SUB, subscribe=msg_id, bind=False
             )
 
@@ -599,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     # responses for Exceptions.
     for resp in responses:
         if isinstance(resp, types.DictType) and 'exc' in resp:
-            raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
 
     return responses[-1]
 
@@ -610,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
     dispatches to the matchmaker and sends
     message to all relevant hosts.
     """
-    conf = FLAGS
+    conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
 
     queues = matchmaker.queues(topic)
@@ -641,26 +639,22 @@ def create_connection(conf, new=True):
 
 def multicall(conf, *args, **kwargs):
     """Multiple calls."""
-    register_opts(conf)
     return _multi_send(_call, *args, **kwargs)
 
 
 def call(conf, *args, **kwargs):
     """Send a message, expect a response."""
-    register_opts(conf)
     data = _multi_send(_call, *args, **kwargs)
     return data[-1]
 
 
 def cast(conf, *args, **kwargs):
     """Send a message expecting no reply."""
-    register_opts(conf)
     _multi_send(_cast, *args, **kwargs)
 
 
 def fanout_cast(conf, context, topic, msg, **kwargs):
     """Send a message to all listening and expect no reply."""
-    register_opts(conf)
     # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
     # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
     _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
@@ -672,7 +666,6 @@ def notify(conf, context, topic, msg, **kwargs):
     Notifications are sent to topic-priority.
     This differs from the AMQP drivers which send to topic.priority.
     """
-    register_opts(conf)
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
     topic.replace('.', '-')
@@ -684,7 +677,7 @@ def cleanup():
     global ZMQ_CTX
     global matchmaker
     matchmaker = None
-    ZMQ_CTX.destroy()
+    ZMQ_CTX.term()
     ZMQ_CTX = None
 
 
@@ -697,11 +690,11 @@ def register_opts(conf):
     # We memoize through these globals
     global ZMQ_CTX
     global matchmaker
-    global FLAGS
+    global CONF
 
-    if not FLAGS:
+    if not CONF:
         conf.register_opts(zmq_opts)
-        FLAGS = conf
+        CONF = conf
     # Don't re-set, if this method is called twice.
     if not ZMQ_CTX:
         ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)