]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Update common.
authorMichael Still <mikal@stillhq.com>
Thu, 18 Oct 2012 18:43:12 +0000 (11:43 -0700)
committerMichael Still <mikal@stillhq.com>
Thu, 18 Oct 2012 20:42:36 +0000 (13:42 -0700)
policy.py is not updated as it causes test failures. I will investigate
that separately.

Change-Id: I9936137c1ab5b368ba6c3c35c423568b38bf913f

cinder/openstack/common/rpc/impl_zmq.py
cinder/openstack/common/rpc/matchmaker.py
cinder/openstack/common/rpc/service.py
cinder/openstack/common/setup.py
cinder/openstack/common/timeutils.py

index 570431778a061a9393b8ac92aad36d3cfafe22b8..f3f3b9e90ce38e2f839f7d8993bccf2b3ce03ed2 100644 (file)
@@ -58,9 +58,6 @@ zmq_opts = [
     cfg.IntOpt('rpc_zmq_port', default=9501,
                help='ZeroMQ receiver listening port'),
 
-    cfg.IntOpt('rpc_zmq_port_pub', default=9502,
-               help='ZeroMQ fanout publisher port'),
-
     cfg.IntOpt('rpc_zmq_contexts', default=1,
                help='Number of ZeroMQ contexts, defaults to 1'),
 
@@ -209,7 +206,7 @@ class ZmqClient(object):
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
     def cast(self, msg_id, topic, data):
-        self.outq.send([str(topic), str(msg_id), str('cast'),
+        self.outq.send([str(msg_id), str(topic), str('cast'),
                         _serialize(data)])
 
     def close(self):
@@ -302,9 +299,6 @@ class ConsumerBase(object):
         else:
             return [result]
 
-    def consume(self, sock):
-        raise NotImplementedError()
-
     def process(self, style, target, proxy, ctx, data):
         # Method starting with - are
         # processed internally. (non-valid method name)
@@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor):
                       zmq.PUB, bind=True)
         self.sockets.append(self.topic_proxy['zmq_replies'])
 
-        self.topic_proxy['fanout~'] = \
-            ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
-                      CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
-        self.sockets.append(self.topic_proxy['fanout~'])
-
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
-        topic, msg_id, style, in_msg = data
+        msg_id, topic, style, in_msg = data
         topic = topic.split('.', 1)[0]
 
         LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor):
         # Handle zmq_replies magic
         if topic.startswith('fanout~'):
             sock_type = zmq.PUB
-
-            # This doesn't change what is in the message,
-            # it only specifies that these messages go to
-            # the generic fanout topic.
-            topic = 'fanout~'
         elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
             inside = _deserialize(in_msg)
@@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor):
         else:
             sock_type = zmq.PUSH
 
-            if not topic in self.topic_proxy:
-                outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
-                                 sock_type, bind=True)
-                self.topic_proxy[topic] = outq
-                self.sockets.append(outq)
-                LOG.info(_("Created topic proxy: %s"), topic)
+        if not topic in self.topic_proxy:
+            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+                             sock_type, bind=True)
+            self.topic_proxy[topic] = outq
+            self.sockets.append(outq)
+            LOG.info(_("Created topic proxy: %s"), topic)
+
+            # It takes some time for a pub socket to open,
+            # before we can have any faith in doing a send() to it.
+            if sock_type == zmq.PUB:
+                eventlet.sleep(.5)
 
         LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
         self.topic_proxy[topic].send(data)
         LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
 
 
-class CallbackReactor(ZmqBaseReactor):
-    """
-    A consumer class passing messages to a callback
-    """
-
-    def __init__(self, conf, callback):
-        self._cb = callback
-        super(CallbackReactor, self).__init__(conf)
-
-    def consume(self, sock):
-        data = sock.recv()
-        self._cb(data[3])
-
-
 class ZmqReactor(ZmqBaseReactor):
     """
     A consumer class implementing a
@@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor):
             self.mapping[sock].send(data)
             return
 
-        topic, msg_id, style, in_msg = data
+        msg_id, topic, style, in_msg = data
 
         ctx, request = _deserialize(in_msg)
         ctx = RpcContext.unmarshal(ctx)
@@ -513,26 +488,6 @@ class Connection(rpc_common.Connection):
     def __init__(self, conf):
         self.reactor = ZmqReactor(conf)
 
-    def _consume_fanout(self, reactor, topic, proxy, bind=False):
-        for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
-            inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
-            reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
-
-    def declare_topic_consumer(self, topic, callback=None,
-                               queue_name=None):
-        """declare_topic_consumer is a private method, but
-           it is being used by Quantum (Folsom).
-           This has been added compatibility.
-        """
-        # Only consume on the base topic name.
-        topic = topic.split('.', 1)[0]
-
-        if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
-            return
-
-        reactor = CallbackReactor(CONF, callback)
-        self._consume_fanout(reactor, topic, None, bind=False)
-
     def create_consumer(self, topic, proxy, fanout=False):
         # Only consume on the base topic name.
         topic = topic.split('.', 1)[0]
@@ -540,35 +495,22 @@ class Connection(rpc_common.Connection):
         LOG.info(_("Create Consumer for topic (%(topic)s)") %
                  {'topic': topic})
 
-        # Consume direct-push fanout messages (relay to local consumers)
+        # Subscription scenarios
         if fanout:
-            # If we're not in here, we can't receive direct fanout messages
-            if CONF.rpc_zmq_host in matchmaker.queues(topic):
-                # Consume from all remote publishers.
-                self._consume_fanout(self.reactor, topic, proxy)
-            else:
-                LOG.warn("This service cannot receive direct PUSH fanout "
-                         "messages without being known by the matchmaker.")
-                return
-
-            # Configure consumer for direct pushes.
-            subscribe = (topic, fanout)[type(fanout) == str]
+            subscribe = ('', fanout)[type(fanout) == str]
             sock_type = zmq.SUB
             topic = 'fanout~' + topic
-
-            inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
         else:
             sock_type = zmq.PULL
             subscribe = None
 
-            # Receive messages from (local) proxy
-            inaddr = "ipc://%s/zmq_topic_%s" % \
-                (CONF.rpc_zmq_ipc_dir, topic)
+        # Receive messages from (local) proxy
+        inaddr = "ipc://%s/zmq_topic_%s" % \
+            (CONF.rpc_zmq_ipc_dir, topic)
 
         LOG.debug(_("Consumer is a zmq.%s"),
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
 
-        # Consume messages from local rpc-zmq-receiver daemon.
         self.reactor.register(proxy, inaddr, sock_type,
                               subscribe=subscribe, in_bind=False)
 
index ffe4870aa4a0c12df6e7336fe076d2d3af53e1e6..2c0aac5bb6b5b3ae534dd54e2c996b0b4f0b54eb 100644 (file)
@@ -132,14 +132,6 @@ class FanoutBinding(Binding):
         return False
 
 
-class PublisherBinding(Binding):
-    """Match on publishers keys, where key starts with 'publishers.' string."""
-    def test(self, key):
-        if key.startswith('publishers~'):
-            return True
-        return False
-
-
 class StubExchange(Exchange):
     """Exchange that does nothing."""
     def run(self, key):
@@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange):
         return [(key + '.' + host, host)]
 
 
-class PublisherRingExchange(RingExchange):
-    """Fanout Exchange based on a hashmap."""
-    def __init__(self, ring=None):
-        super(PublisherRingExchange, self).__init__(ring)
-
-    def run(self, key):
-        # Assume starts with "publishers~", strip it for lookup.
-        nkey = key.split('publishers~')[1:][0]
-        if not self._ring_has(nkey):
-            LOG.warn(
-                _("No key defining hosts for topic '%s', "
-                  "see ringfile") % (nkey, )
-            )
-            return []
-        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
 class FanoutRingExchange(RingExchange):
     """Fanout Exchange based on a hashmap."""
     def __init__(self, ring=None):
@@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange):
                   "see ringfile") % (nkey, )
             )
             return []
-        return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
-                   ['localhost'])
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
 
 
 class LocalhostExchange(Exchange):
@@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase):
     """
     def __init__(self, ring=None):
         super(MatchMakerRing, self).__init__()
-        self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
         self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase):
     """
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
-        self.add_binding(PublisherBinding(), LocalhostExchange())
         self.add_binding(FanoutBinding(), LocalhostExchange())
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), LocalhostExchange())
@@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase):
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
 
-        self.add_binding(PublisherBinding(), StubExchange())
         self.add_binding(FanoutBinding(), StubExchange())
         self.add_binding(DirectBinding(), StubExchange())
         self.add_binding(TopicBinding(), StubExchange())
index a35fd6ad44790555446f60e7f98a394741e36aac..1738b3d77d50790e7ec9593d7206bba60fae0e2c 100644 (file)
@@ -20,6 +20,7 @@
 from cinder.openstack.common.gettextutils import _
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher
 from cinder.openstack.common import service
 
 
@@ -46,15 +47,15 @@ class Service(service.Service):
         LOG.debug(_("Creating Consumer connection for Service %s") %
                   self.topic)
 
-        rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
+        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
 
         # Share this same connection for these Consumers
-        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
+        self.conn.create_consumer(self.topic, dispatcher, fanout=False)
 
         node_topic = '%s.%s' % (self.topic, self.host)
-        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
+        self.conn.create_consumer(node_topic, dispatcher, fanout=False)
 
-        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
+        self.conn.create_consumer(self.topic, dispatcher, fanout=True)
 
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
index 628f5e3c9b2f68e2b930982b470786dbdb4c6340..4e2a57717fba1d0cc554bbc2608a4e6ad49e3e4f 100644 (file)
@@ -31,13 +31,13 @@ from setuptools.command import sdist
 def parse_mailmap(mailmap='.mailmap'):
     mapping = {}
     if os.path.exists(mailmap):
-        fp = open(mailmap, 'r')
-        for l in fp:
-            l = l.strip()
-            if not l.startswith('#') and ' ' in l:
-                canonical_email, alias = [x for x in l.split(' ')
-                                          if x.startswith('<')]
-                mapping[alias] = canonical_email
+        with open(mailmap, 'r') as fp:
+            for l in fp:
+                l = l.strip()
+                if not l.startswith('#') and ' ' in l:
+                    canonical_email, alias = [x for x in l.split(' ')
+                                              if x.startswith('<')]
+                    mapping[alias] = canonical_email
     return mapping
 
 
@@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping):
 def get_reqs_from_files(requirements_files):
     for requirements_file in requirements_files:
         if os.path.exists(requirements_file):
-            return open(requirements_file, 'r').read().split('\n')
+            with open(requirements_file, 'r') as fil:
+                return fil.read().split('\n')
     return []
 
 
@@ -191,14 +192,14 @@ def write_git_changelog():
 
 def generate_authors():
     """Create AUTHORS file using git commits."""
-    jenkins_email = 'jenkins@review.openstack.org'
+    jenkins_email = 'jenkins@review.(openstack|stackforge).org'
     old_authors = 'AUTHORS.in'
     new_authors = 'AUTHORS'
     if not os.getenv('SKIP_GENERATE_AUTHORS'):
         if os.path.isdir('.git'):
             # don't include jenkins email address in AUTHORS file
             git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
-                           "grep -v " + jenkins_email)
+                           "egrep -v '" + jenkins_email + "'")
             changelog = _run_shell_command(git_log_cmd)
             mailmap = parse_mailmap()
             with open(new_authors, 'w') as new_authors_fh:
@@ -236,7 +237,8 @@ def read_versioninfo(project):
 
 def write_versioninfo(project, version):
     """Write a simple file containing the version of the package."""
-    open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version)
+    with open(os.path.join(project, 'versioninfo'), 'w') as fil:
+        fil.write("%s\n" % version)
 
 
 def get_cmdclass():
index 93b34fc5b1d4b424a8503ac35f0f63b1109b1b0c..86004391de06a7144b6dce653344880bc93ccbf0 100644 (file)
@@ -74,6 +74,11 @@ def is_older_than(before, seconds):
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
+def is_newer_than(after, seconds):
+    """Return True if after is newer than seconds."""
+    return after - utcnow() > datetime.timedelta(seconds=seconds)
+
+
 def utcnow_ts():
     """Timestamp version of our utcnow function."""
     return calendar.timegm(utcnow().timetuple())