]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update oslo rpc libraries
authorMichael J Fork <mjfork@us.ibm.com>
Mon, 11 Mar 2013 13:26:59 +0000 (13:26 +0000)
committerMichael J Fork <mjfork@us.ibm.com>
Mon, 11 Mar 2013 22:20:35 +0000 (22:20 +0000)
Update oslo rpc libraries to capture changes, primarly motivated
by secret=True flag on password config options.

Added bin/quantum-rpc-zmq-receiver

Change-Id: I40d3ef1a85901e5b9eab803cc67cdad3cbb0719a

bin/quantum-rpc-zmq-receiver [new file with mode: 0755]
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/rpc/matchmaker.py
quantum/openstack/common/rpc/matchmaker_redis.py [new file with mode: 0644]

diff --git a/bin/quantum-rpc-zmq-receiver b/bin/quantum-rpc-zmq-receiver
new file mode 100755 (executable)
index 0000000..d79fdb5
--- /dev/null
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack LLC
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import os
+import sys
+
+# If ../quantum/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+                                   os.pardir,
+                                   os.pardir))
+if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'quantum', '__init__.py')):
+    sys.path.insert(0, POSSIBLE_TOPDIR)
+
+from oslo.config import cfg
+
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+    CONF(sys.argv[1:], project='quantum')
+    logging.setup("quantum")
+
+    with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+        reactor.consume_in_thread()
+        reactor.wait()
+
+if __name__ == '__main__':
+    main()
index a6d5002d57e8ca653b0aecf15be331f2aa3b0e66..d648c4fc3d679525c1ad82d95bc40e7c7e463c6e 100644 (file)
@@ -32,19 +32,20 @@ import uuid
 
 from eventlet import greenpool
 from eventlet import pools
-from eventlet import semaphore
 from eventlet import queue
-
+from eventlet import semaphore
 # TODO(pekowsk): Remove import cfg and below comment in Havana.
 # This import should no longer be needed when the amqp_rpc_single_reply_queue
 # option is removed.
 from oslo.config import cfg
+
 from quantum.openstack.common import excutils
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import local
 from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import common as rpc_common
 
+
 # TODO(pekowski): Remove this option in Havana.
 amqp_opts = [
     cfg.BoolOpt('amqp_rpc_single_reply_queue',
index 2a7f2dd8bee1a958dd7df7c499c42e41fde99c45..cbc92bdec30793f0f7fe0cb371cf11260abda0bd 100644 (file)
@@ -624,8 +624,8 @@ class Connection(object):
 
         def _error_callback(exc):
             if isinstance(exc, socket.timeout):
-                LOG.exception(_('Timed out waiting for RPC response: %s') %
-                              str(exc))
+                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                          str(exc))
                 raise rpc_common.Timeout()
             else:
                 LOG.exception(_('Failed to consume message from queue: %s') %
index ba0920a93649e88f1d9b53fce88b3d9498145b28..db165bceb98ca0f3a21ce1f7a0a78abbc88ac09c 100644 (file)
@@ -415,8 +415,8 @@ class Connection(object):
 
         def _error_callback(exc):
             if isinstance(exc, qpid_exceptions.Empty):
-                LOG.exception(_('Timed out waiting for RPC response: %s') %
-                              str(exc))
+                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                          str(exc))
                 raise rpc_common.Timeout()
             else:
                 LOG.exception(_('Failed to consume message from queue: %s') %
index 421457a42e1d4ab12e88772d7180ee56853c4752..dc8a5e8675d12997ec963ee67870b7e744dad5ed 100644 (file)
@@ -25,6 +25,7 @@ import eventlet
 import greenlet
 from oslo.config import cfg
 
+from quantum.openstack.common import excutils
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
@@ -91,8 +92,8 @@ def _serialize(data):
     try:
         return jsonutils.dumps(data, ensure_ascii=True)
     except TypeError:
-        LOG.error(_("JSON serialization failed."))
-        raise
+        with excutils.save_and_reraise_exception():
+            LOG.error(_("JSON serialization failed."))
 
 
 def _deserialize(data):
@@ -511,9 +512,9 @@ class ZmqProxy(ZmqBaseReactor):
                               ipc_dir, run_as_root=True)
                 utils.execute('chmod', '750', ipc_dir, run_as_root=True)
             except utils.ProcessExecutionError:
-                LOG.error(_("Could not create IPC directory %s") %
-                          (ipc_dir, ))
-                raise
+                with excutils.save_and_reraise_exception():
+                    LOG.error(_("Could not create IPC directory %s") %
+                              (ipc_dir, ))
 
         try:
             self.register(consumption_proxy,
@@ -521,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
                           zmq.PULL,
                           out_bind=True)
         except zmq.ZMQError:
-            LOG.error(_("Could not create ZeroMQ receiver daemon. "
-                        "Socket may already be in use."))
-            raise
+            with excutils.save_and_reraise_exception():
+                LOG.error(_("Could not create ZeroMQ receiver daemon. "
+                            "Socket may already be in use."))
 
         super(ZmqProxy, self).consume_in_thread()
 
@@ -594,6 +595,9 @@ class Connection(rpc_common.Connection):
         self.reactor = ZmqReactor(conf)
 
     def create_consumer(self, topic, proxy, fanout=False):
+        # Register with matchmaker.
+        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+
         # Subscription scenarios
         if fanout:
             sock_type = zmq.SUB
@@ -620,6 +624,10 @@ class Connection(rpc_common.Connection):
         self.topics.append(topic)
 
     def close(self):
+        _get_matchmaker().stop_heartbeat()
+        for topic in self.topics:
+            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
         self.reactor.close()
         self.topics = []
 
@@ -627,6 +635,7 @@ class Connection(rpc_common.Connection):
         self.reactor.wait()
 
     def consume_in_thread(self):
+        _get_matchmaker().start_heartbeat()
         self.reactor.consume_in_thread()
 
 
@@ -742,7 +751,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
         LOG.warn(_("No matchmaker results. Not casting."))
         # While not strictly a timeout, callers know how to handle
         # this exception and a timeout isn't too big a lie.
-        raise rpc_common.Timeout, "No match from matchmaker."
+        raise rpc_common.Timeout(_("No match from matchmaker."))
 
     # This supports brokerless fanout (addresses > 1)
     for queue in queues:
@@ -785,7 +794,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
     _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
 
 
-def notify(conf, context, topic, msg, **kwargs):
+def notify(conf, context, topic, msg, envelope):
     """
     Send notification event.
     Notifications are sent to topic-priority.
@@ -793,9 +802,8 @@ def notify(conf, context, topic, msg, **kwargs):
     """
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
-    topic.replace('.', '-')
-    kwargs['envelope'] = kwargs.get('envelope', True)
-    cast(conf, context, topic, msg, **kwargs)
+    topic = topic.replace('.', '-')
+    cast(conf, context, topic, msg, envelope=envelope)
 
 
 def cleanup():
index 874e022ab1e7e29cf32e808cd3f00109894b3512..4fa0d98a3e8e99f64f89226089dd69fcfe0214dd 100644 (file)
@@ -22,6 +22,7 @@ import contextlib
 import itertools
 import json
 
+import eventlet
 from oslo.config import cfg
 
 from quantum.openstack.common.gettextutils import _
@@ -33,6 +34,12 @@ matchmaker_opts = [
     cfg.StrOpt('matchmaker_ringfile',
                default='/etc/nova/matchmaker_ring.json',
                help='Matchmaker ring file (JSON)'),
+    cfg.IntOpt('matchmaker_heartbeat_freq',
+               default='300',
+               help='Heartbeat frequency'),
+    cfg.IntOpt('matchmaker_heartbeat_ttl',
+               default='600',
+               help='Heartbeat time-to-live.'),
 ]
 
 CONF = cfg.CONF
@@ -70,12 +77,73 @@ class Binding(object):
 
 
 class MatchMakerBase(object):
-    """Match Maker Base Class."""
-
+    """
+    Match Maker Base Class.
+    Build off HeartbeatMatchMakerBase if building a
+    heartbeat-capable MatchMaker.
+    """
     def __init__(self):
         # Array of tuples. Index [2] toggles negation, [3] is last-if-true
         self.bindings = []
 
+        self.no_heartbeat_msg = _('Matchmaker does not implement '
+                                  'registration or heartbeat.')
+
+    def register(self, key, host):
+        """
+        Register a host on a backend.
+        Heartbeats, if applicable, may keepalive registration.
+        """
+        pass
+
+    def ack_alive(self, key, host):
+        """
+        Acknowledge that a key.host is alive.
+        Used internally for updating heartbeats,
+        but may also be used publically to acknowledge
+        a system is alive (i.e. rpc message successfully
+        sent to host)
+        """
+        pass
+
+    def is_alive(self, topic, host):
+        """
+        Checks if a host is alive.
+        """
+        pass
+
+    def expire(self, topic, host):
+        """
+        Explicitly expire a host's registration.
+        """
+        pass
+
+    def send_heartbeats(self):
+        """
+        Send all heartbeats.
+        Use start_heartbeat to spawn a heartbeat greenthread,
+        which loops this method.
+        """
+        pass
+
+    def unregister(self, key, host):
+        """
+        Unregister a topic.
+        """
+        pass
+
+    def start_heartbeat(self):
+        """
+        Spawn heartbeat greenthread.
+        """
+        pass
+
+    def stop_heartbeat(self):
+        """
+        Destroys the heartbeat greenthread.
+        """
+        pass
+
     def add_binding(self, binding, rule, last=True):
         self.bindings.append((binding, rule, False, last))
 
@@ -99,6 +167,103 @@ class MatchMakerBase(object):
         return workers
 
 
+class HeartbeatMatchMakerBase(MatchMakerBase):
+    """
+    Base for a heart-beat capable MatchMaker.
+    Provides common methods for registering,
+    unregistering, and maintaining heartbeats.
+    """
+    def __init__(self):
+        self.hosts = set()
+        self._heart = None
+        self.host_topic = {}
+
+        super(HeartbeatMatchMakerBase, self).__init__()
+
+    def send_heartbeats(self):
+        """
+        Send all heartbeats.
+        Use start_heartbeat to spawn a heartbeat greenthread,
+        which loops this method.
+        """
+        for key, host in self.host_topic:
+            self.ack_alive(key, host)
+
+    def ack_alive(self, key, host):
+        """
+        Acknowledge that a host.topic is alive.
+        Used internally for updating heartbeats,
+        but may also be used publically to acknowledge
+        a system is alive (i.e. rpc message successfully
+        sent to host)
+        """
+        raise NotImplementedError("Must implement ack_alive")
+
+    def backend_register(self, key, host):
+        """
+        Implements registration logic.
+        Called by register(self,key,host)
+        """
+        raise NotImplementedError("Must implement backend_register")
+
+    def backend_unregister(self, key, key_host):
+        """
+        Implements de-registration logic.
+        Called by unregister(self,key,host)
+        """
+        raise NotImplementedError("Must implement backend_unregister")
+
+    def register(self, key, host):
+        """
+        Register a host on a backend.
+        Heartbeats, if applicable, may keepalive registration.
+        """
+        self.hosts.add(host)
+        self.host_topic[(key, host)] = host
+        key_host = '.'.join((key, host))
+
+        self.backend_register(key, key_host)
+
+        self.ack_alive(key, host)
+
+    def unregister(self, key, host):
+        """
+        Unregister a topic.
+        """
+        if (key, host) in self.host_topic:
+            del self.host_topic[(key, host)]
+
+        self.hosts.discard(host)
+        self.backend_unregister(key, '.'.join((key, host)))
+
+        LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+    def start_heartbeat(self):
+        """
+        Implementation of MatchMakerBase.start_heartbeat
+        Launches greenthread looping send_heartbeats(),
+        yielding for CONF.matchmaker_heartbeat_freq seconds
+        between iterations.
+        """
+        if len(self.hosts) == 0:
+            raise MatchMakerException(
+                _("Register before starting heartbeat."))
+
+        def do_heartbeat():
+            while True:
+                self.send_heartbeats()
+                eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+        self._heart = eventlet.spawn(do_heartbeat)
+
+    def stop_heartbeat(self):
+        """
+        Destroys the heartbeat greenthread.
+        """
+        if self._heart:
+            self._heart.kill()
+
+
 class DirectBinding(Binding):
     """
     Specifies a host in the key via a '.' character
diff --git a/quantum/openstack/common/rpc/matchmaker_redis.py b/quantum/openstack/common/rpc/matchmaker_redis.py
new file mode 100644 (file)
index 0000000..942e1a2
--- /dev/null
@@ -0,0 +1,149 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2013 Cloudscaling Group, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from quantum.openstack.common import importutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+    cfg.StrOpt('host',
+               default='127.0.0.1',
+               help='Host to locate redis'),
+    cfg.IntOpt('port',
+               default=6379,
+               help='Use this port to connect to redis host.'),
+    cfg.StrOpt('password',
+               default=None,
+               help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+                         title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+    def __init__(self, matchmaker):
+        self.matchmaker = matchmaker
+        self.redis = matchmaker.redis
+        super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+    """
+    Exchange where all topic keys are split, sending to second half.
+    i.e. "compute.host" sends a message to "compute" running on "host"
+    """
+    def run(self, topic):
+        while True:
+            member_name = self.redis.srandmember(topic)
+
+            if not member_name:
+                # If this happens, there are no
+                # longer any members.
+                break
+
+            if not self.matchmaker.is_alive(topic, member_name):
+                continue
+
+            host = member_name.split('.', 1)[1]
+            return [(member_name, host)]
+        return []
+
+
+class RedisFanoutExchange(RedisExchange):
+    """
+    Return a list of all hosts.
+    """
+    def run(self, topic):
+        topic = topic.split('~', 1)[1]
+        hosts = self.redis.smembers(topic)
+        good_hosts = filter(
+            lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+        return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+    """
+    MatchMaker registering and looking-up hosts with a Redis server.
+    """
+    def __init__(self):
+        super(MatchMakerRedis, self).__init__()
+
+        if not redis:
+            raise ImportError("Failed to import module redis.")
+
+        self.redis = redis.StrictRedis(
+            host=CONF.matchmaker_redis.host,
+            port=CONF.matchmaker_redis.port,
+            password=CONF.matchmaker_redis.password)
+
+        self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+        self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+        self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+    def ack_alive(self, key, host):
+        topic = "%s.%s" % (key, host)
+        if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+            # If we could not update the expiration, the key
+            # might have been pruned. Re-register, creating a new
+            # key in Redis.
+            self.register(self.topic_host[host], host)
+
+    def is_alive(self, topic, host):
+        if self.redis.ttl(host) == -1:
+            self.expire(topic, host)
+            return False
+        return True
+
+    def expire(self, topic, host):
+        with self.redis.pipeline() as pipe:
+            pipe.multi()
+            pipe.delete(host)
+            pipe.srem(topic, host)
+            pipe.execute()
+
+    def backend_register(self, key, key_host):
+        with self.redis.pipeline() as pipe:
+            pipe.multi()
+            pipe.sadd(key, key_host)
+
+            # No value is needed, we just
+            # care if it exists. Sets aren't viable
+            # because only keys can expire.
+            pipe.set(key_host, '')
+
+            pipe.execute()
+
+    def backend_unregister(self, key, key_host):
+        with self.redis.pipeline() as pipe:
+            pipe.multi()
+            pipe.srem(key, key_host)
+            pipe.delete(key_host)
+            pipe.execute()