]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
ZMQ fixes for Quantum from openstack-common
authorEric Windisch <eric@cloudscaling.com>
Thu, 20 Sep 2012 22:50:32 +0000 (18:50 -0400)
committerEric Windisch <eric@cloudscaling.com>
Thu, 20 Sep 2012 22:53:03 +0000 (18:53 -0400)
Change Ifdeff8ae: Support declare_topic_consumer in impl_zmq
Change Ifc132519: fanout subscriptions in impl_zmq

Change-Id: Ic51fd09c302e767316d72fb38892e3761ef36de9

quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/rpc/matchmaker.py

index f4fcce0389a2f38528bdde1295dbd925b7f8e406..bee1487fc21fed65012b0614a78d6f85a0e4d2b7 100644 (file)
@@ -58,6 +58,9 @@ zmq_opts = [
     cfg.IntOpt('rpc_zmq_port', default=9501,
                help='ZeroMQ receiver listening port'),
 
+    cfg.IntOpt('rpc_zmq_port_pub', default=9502,
+               help='ZeroMQ fanout publisher port'),
+
     cfg.IntOpt('rpc_zmq_contexts', default=1,
                help='Number of ZeroMQ contexts, defaults to 1'),
 
@@ -206,7 +209,7 @@ class ZmqClient(object):
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
     def cast(self, msg_id, topic, data):
-        self.outq.send([str(msg_id), str(topic), str('cast'),
+        self.outq.send([str(topic), str(msg_id), str('cast'),
                         _serialize(data)])
 
     def close(self):
@@ -299,6 +302,9 @@ class ConsumerBase(object):
         else:
             return [result]
 
+    def consume(self, sock):
+        raise NotImplementedError()
+
     def process(self, style, target, proxy, ctx, data):
         # Method starting with - are
         # processed internally. (non-valid method name)
@@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
                       zmq.PUB, bind=True)
         self.sockets.append(self.topic_proxy['zmq_replies'])
 
+        self.topic_proxy['fanout~'] = \
+            ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
+                      CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
+        self.sockets.append(self.topic_proxy['fanout~'])
+
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
-        msg_id, topic, style, in_msg = data
+        topic, msg_id, style, in_msg = data
         topic = topic.split('.', 1)[0]
 
         LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
         # Handle zmq_replies magic
         if topic.startswith('fanout~'):
             sock_type = zmq.PUB
+
+            # This doesn't change what is in the message,
+            # it only specifies that these messages go to
+            # the generic fanout topic.
+            topic = 'fanout~'
         elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
             inside = _deserialize(in_msg)
@@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor):
         else:
             sock_type = zmq.PUSH
 
-        if not topic in self.topic_proxy:
-            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
-                             sock_type, bind=True)
-            self.topic_proxy[topic] = outq
-            self.sockets.append(outq)
-            LOG.info(_("Created topic proxy: %s"), topic)
-
-            # It takes some time for a pub socket to open,
-            # before we can have any faith in doing a send() to it.
-            if sock_type == zmq.PUB:
-                eventlet.sleep(.5)
+            if not topic in self.topic_proxy:
+                outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+                                 sock_type, bind=True)
+                self.topic_proxy[topic] = outq
+                self.sockets.append(outq)
+                LOG.info(_("Created topic proxy: %s"), topic)
 
         LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
         self.topic_proxy[topic].send(data)
         LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
 
 
+class CallbackReactor(ZmqBaseReactor):
+    """
+    A consumer class passing messages to a callback
+    """
+
+    def __init__(self, conf, callback):
+        self._cb = callback
+        super(CallbackReactor, self).__init__(conf)
+
+    def consume(self, sock):
+        data = sock.recv()
+        self._cb(data[3])
+
+
 class ZmqReactor(ZmqBaseReactor):
     """
     A consumer class implementing a
@@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor):
             self.mapping[sock].send(data)
             return
 
-        msg_id, topic, style, in_msg = data
+        topic, msg_id, style, in_msg = data
 
         ctx, request = _deserialize(in_msg)
         ctx = RpcContext.unmarshal(ctx)
@@ -488,6 +513,26 @@ class Connection(rpc_common.Connection):
     def __init__(self, conf):
         self.reactor = ZmqReactor(conf)
 
+    def _consume_fanout(self, reactor, topic, proxy, bind=False):
+        for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
+            inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
+            reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+
+    def declare_topic_consumer(self, topic, callback=None,
+                               queue_name=None):
+        """declare_topic_consumer is a private method, but
+           it is being used by Quantum (Folsom).
+           This has been added compatibility.
+        """
+        # Only consume on the base topic name.
+        topic = topic.split('.', 1)[0]
+
+        if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
+            return
+
+        reactor = CallbackReactor(CONF, callback)
+        self._consume_fanout(reactor, topic, None, bind=False)
+
     def create_consumer(self, topic, proxy, fanout=False):
         # Only consume on the base topic name.
         topic = topic.split('.', 1)[0]
@@ -495,22 +540,35 @@ class Connection(rpc_common.Connection):
         LOG.info(_("Create Consumer for topic (%(topic)s)") %
                  {'topic': topic})
 
-        # Subscription scenarios
+        # Consume direct-push fanout messages (relay to local consumers)
         if fanout:
-            subscribe = ('', fanout)[type(fanout) == str]
+            # If we're not in here, we can't receive direct fanout messages
+            if CONF.rpc_zmq_host in matchmaker.queues(topic):
+                # Consume from all remote publishers.
+                self._consume_fanout(self.reactor, topic, proxy)
+            else:
+                LOG.warn("This service cannot receive direct PUSH fanout "
+                         "messages without being known by the matchmaker.")
+                return
+
+            # Configure consumer for direct pushes.
+            subscribe = (topic, fanout)[type(fanout) == str]
             sock_type = zmq.SUB
             topic = 'fanout~' + topic
+
+            inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
         else:
             sock_type = zmq.PULL
             subscribe = None
 
-        # Receive messages from (local) proxy
-        inaddr = "ipc://%s/zmq_topic_%s" % \
-            (CONF.rpc_zmq_ipc_dir, topic)
+            # Receive messages from (local) proxy
+            inaddr = "ipc://%s/zmq_topic_%s" % \
+                (CONF.rpc_zmq_ipc_dir, topic)
 
         LOG.debug(_("Consumer is a zmq.%s"),
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
 
+        # Consume messages from local rpc-zmq-receiver daemon.
         self.reactor.register(proxy, inaddr, sock_type,
                               subscribe=subscribe, in_bind=False)
 
index ecb54eb8ce4dea9cf102bc8561d7dccb5c07612a..d5006c5a0e872fc42c94550b7e9411739693d580 100644 (file)
@@ -132,6 +132,14 @@ class FanoutBinding(Binding):
         return False
 
 
+class PublisherBinding(Binding):
+    """Match on publishers keys, where key starts with 'publishers.' string."""
+    def test(self, key):
+        if key.startswith('publishers~'):
+            return True
+        return False
+
+
 class StubExchange(Exchange):
     """Exchange that does nothing."""
     def run(self, key):
@@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
         return [(key + '.' + host, host)]
 
 
+class PublisherRingExchange(RingExchange):
+    """Fanout Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(PublisherRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        # Assume starts with "publishers~", strip it for lookup.
+        nkey = key.split('publishers~')[1:][0]
+        if not self._ring_has(nkey):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (nkey, )
+            )
+            return []
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
 class FanoutRingExchange(RingExchange):
     """Fanout Exchange based on a hashmap."""
     def __init__(self, ring=None):
@@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
                   "see ringfile") % (nkey, )
             )
             return []
-        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+                   ['localhost'])
 
 
 class LocalhostExchange(Exchange):
@@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
     """
     def __init__(self, ring=None):
         super(MatchMakerRing, self).__init__()
+        self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
         self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
     """
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
+        self.add_binding(PublisherBinding(), LocalhostExchange())
         self.add_binding(FanoutBinding(), LocalhostExchange())
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), LocalhostExchange())
@@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
 
+        self.add_binding(PublisherBinding(), StubExchange())
         self.add_binding(FanoutBinding(), StubExchange())
         self.add_binding(DirectBinding(), StubExchange())
         self.add_binding(TopicBinding(), StubExchange())