]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Updated common module from oslo
authorguohliu <guohliu@cn.ibm.com>
Wed, 19 Jun 2013 12:18:39 +0000 (20:18 +0800)
committerguohliu <guohliu@cn.ibm.com>
Wed, 19 Jun 2013 12:18:39 +0000 (20:18 +0800)
This patch updated common module from oslo for known issue.

Fixed bug: #1175808
Fixed bug: #1072917

Change-Id: Ic1de6b786a9d67b39d186e31a37049729d367464

27 files changed:
heat/openstack/common/context.py
heat/openstack/common/exception.py
heat/openstack/common/gettextutils.py
heat/openstack/common/importutils.py
heat/openstack/common/log.py
heat/openstack/common/network_utils.py
heat/openstack/common/notifier/api.py
heat/openstack/common/notifier/log_notifier.py
heat/openstack/common/notifier/no_op_notifier.py
heat/openstack/common/notifier/rpc_notifier.py
heat/openstack/common/notifier/rpc_notifier2.py
heat/openstack/common/rpc/amqp.py
heat/openstack/common/rpc/common.py
heat/openstack/common/rpc/impl_fake.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/rpc/impl_zmq.py
heat/openstack/common/rpc/matchmaker.py
heat/openstack/common/rpc/matchmaker_redis.py
heat/openstack/common/rpc/matchmaker_ring.py
heat/openstack/common/rpc/proxy.py
heat/openstack/common/rpc/serializer.py
heat/openstack/common/rpc/service.py
heat/openstack/common/service.py
heat/openstack/common/threadgroup.py
heat/openstack/common/timeutils.py
tools/patch_tox_venv.py

index c45abfa7824be073e69959c562553b35af5c1a55..b125af71fd4d8cdcf17bfe5889c3d26ad5e3f5ad 100644 (file)
@@ -33,7 +33,8 @@ def generate_request_id():
 
 class RequestContext(object):
 
-    """
+    """Helper class to represent useful information about a request context.
+
     Stores information about the security context under which the user
     accesses the system, as well as additional request information.
     """
index cf69ae71913e926e25c062352dc19c35408554d4..ca1195a3e1278fa6b43b83425b5e4c900e2da12d 100644 (file)
@@ -110,8 +110,7 @@ def wrap_exception(f):
 
 
 class OpenstackException(Exception):
-    """
-    Base Exception
+    """Base Exception class.
 
     To correctly use this class, inherit from it and define
     a 'message' property. That message will get printf'd
index 0d3994ce6b42641fe35fea65facfaf5d843edc8a..a98dad9359c010ce9a60149e06c0a0edfb495aa9 100644 (file)
@@ -2,6 +2,7 @@
 
 # Copyright 2012 Red Hat, Inc.
 # All Rights Reserved.
+# Copyright 2013 IBM Corp.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -23,8 +24,11 @@ Usual usage in an openstack.common module:
     from heat.openstack.common.gettextutils import _
 """
 
+import copy
 import gettext
+import logging.handlers
 import os
+import UserString
 
 _localedir = os.environ.get('heat'.upper() + '_LOCALEDIR')
 _t = gettext.translation('heat', localedir=_localedir, fallback=True)
@@ -48,3 +52,175 @@ def install(domain):
     gettext.install(domain,
                     localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
                     unicode=True)
+
+
+"""
+Lazy gettext functionality.
+
+The following is an attempt to introduce a deferred way
+to do translations on messages in OpenStack. We attempt to
+override the standard _() function and % (format string) operation
+to build Message objects that can later be translated when we have
+more information. Also included is an example LogHandler that
+translates Messages to an associated locale, effectively allowing
+many logs, each with their own locale.
+"""
+
+
+def get_lazy_gettext(domain):
+    """Assemble and return a lazy gettext function for a given domain.
+
+    Factory method for a project/module to get a lazy gettext function
+    for its own translation domain (i.e. nova, glance, cinder, etc.)
+    """
+
+    def _lazy_gettext(msg):
+        """Create and return a Message object.
+
+        Message encapsulates a string so that we can translate it later when
+        needed.
+        """
+        return Message(msg, domain)
+
+    return _lazy_gettext
+
+
+class Message(UserString.UserString, object):
+    """Class used to encapsulate translatable messages."""
+    def __init__(self, msg, domain):
+        # _msg is the gettext msgid and should never change
+        self._msg = msg
+        self._left_extra_msg = ''
+        self._right_extra_msg = ''
+        self.params = None
+        self.locale = None
+        self.domain = domain
+
+    @property
+    def data(self):
+        # NOTE(mrodden): this should always resolve to a unicode string
+        # that best represents the state of the message currently
+
+        localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
+        if self.locale:
+            lang = gettext.translation(self.domain,
+                                       localedir=localedir,
+                                       languages=[self.locale],
+                                       fallback=True)
+        else:
+            # use system locale for translations
+            lang = gettext.translation(self.domain,
+                                       localedir=localedir,
+                                       fallback=True)
+
+        full_msg = (self._left_extra_msg +
+                    lang.ugettext(self._msg) +
+                    self._right_extra_msg)
+
+        if self.params is not None:
+            full_msg = full_msg % self.params
+
+        return unicode(full_msg)
+
+    def _save_parameters(self, other):
+        # we check for None later to see if
+        # we actually have parameters to inject,
+        # so encapsulate if our parameter is actually None
+        if other is None:
+            self.params = (other, )
+        else:
+            self.params = copy.deepcopy(other)
+
+        return self
+
+    # overrides to be more string-like
+    def __unicode__(self):
+        return self.data
+
+    def __str__(self):
+        return self.data.encode('utf-8')
+
+    def __getstate__(self):
+        to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
+                   'domain', 'params', 'locale']
+        new_dict = self.__dict__.fromkeys(to_copy)
+        for attr in to_copy:
+            new_dict[attr] = copy.deepcopy(self.__dict__[attr])
+
+        return new_dict
+
+    def __setstate__(self, state):
+        for (k, v) in state.items():
+            setattr(self, k, v)
+
+    # operator overloads
+    def __add__(self, other):
+        copied = copy.deepcopy(self)
+        copied._right_extra_msg += other.__str__()
+        return copied
+
+    def __radd__(self, other):
+        copied = copy.deepcopy(self)
+        copied._left_extra_msg += other.__str__()
+        return copied
+
+    def __mod__(self, other):
+        # do a format string to catch and raise
+        # any possible KeyErrors from missing parameters
+        self.data % other
+        copied = copy.deepcopy(self)
+        return copied._save_parameters(other)
+
+    def __mul__(self, other):
+        return self.data * other
+
+    def __rmul__(self, other):
+        return other * self.data
+
+    def __getitem__(self, key):
+        return self.data[key]
+
+    def __getslice__(self, start, end):
+        return self.data.__getslice__(start, end)
+
+    def __getattribute__(self, name):
+        # NOTE(mrodden): handle lossy operations that we can't deal with yet
+        # These override the UserString implementation, since UserString
+        # uses our __class__ attribute to try and build a new message
+        # after running the inner data string through the operation.
+        # At that point, we have lost the gettext message id and can just
+        # safely resolve to a string instead.
+        ops = ['capitalize', 'center', 'decode', 'encode',
+               'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
+               'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
+        if name in ops:
+            return getattr(self.data, name)
+        else:
+            return UserString.UserString.__getattribute__(self, name)
+
+
+class LocaleHandler(logging.Handler):
+    """Handler that can have a locale associated to translate Messages.
+
+    A quick example of how to utilize the Message class above.
+    LocaleHandler takes a locale and a target logging.Handler object
+    to forward LogRecord objects to after translating the internal Message.
+    """
+
+    def __init__(self, locale, target):
+        """Initialize a LocaleHandler
+
+        :param locale: locale to use for translating messages
+        :param target: logging.Handler object to forward
+                       LogRecord objects to after translation
+        """
+        logging.Handler.__init__(self)
+        self.locale = locale
+        self.target = target
+
+    def emit(self, record):
+        if isinstance(record.msg, Message):
+            # set the locale and resolve to a string
+            record.msg.locale = self.locale
+
+        self.target.emit(record)
index 3bd277f47e2d4fc503c4b17f9f9b6b6c18473d41..7a303f93f21d42c831883bbe4cf7c38aea842bf7 100644 (file)
@@ -24,7 +24,7 @@ import traceback
 
 
 def import_class(import_str):
-    """Returns a class from a string including module and class"""
+    """Returns a class from a string including module and class."""
     mod_str, _sep, class_str = import_str.rpartition('.')
     try:
         __import__(mod_str)
@@ -41,8 +41,9 @@ def import_object(import_str, *args, **kwargs):
 
 
 def import_object_ns(name_space, import_str, *args, **kwargs):
-    """
-    Import a class and return an instance of it, first by trying
+    """Tries to import object from default namespace.
+
+    Imports a class and return an instance of it, first by trying
     to find the class in a default namespace, then failing back to
     a full path if not found in the default namespace.
     """
index 42f93b08ceb64a3226b219bff836971f167fce1e..84113dc97544d020c5345df4bab28aa2b4459902 100644 (file)
@@ -459,10 +459,11 @@ def getLogger(name='unknown', version='unknown'):
 
 
 def getLazyLogger(name='unknown', version='unknown'):
-    """
-    create a pass-through logger that does not create the real logger
+    """Returns lazy logger.
+
+    Creates a pass-through logger that does not create the real logger
     until it is really needed and delegates all calls to the real logger
-    once it is created
+    once it is created.
     """
     return LazyAdapter(name, version)
 
index a242ee637795b4011f251ed7c84a690100e74401..d049d658ce89193646ab5b7f5b6380294f2cb7c1 100644 (file)
@@ -26,8 +26,8 @@ LOG = logging.getLogger(__name__)
 
 
 def parse_host_port(address, default_port=None):
-    """
-    Interpret a string as a host:port pair.
+    """Interpret a string as a host:port pair.
+
     An IPv6 address MUST be escaped if accompanied by a port,
     because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
     means both [2001:db8:85a3::8a2e:370:7334] and
index 3b110d4d79c4b5aa598cb00f47b5b09ea402f408..d5d2b36a271578a7183674dd1adf4e740e520a32 100644 (file)
@@ -56,7 +56,7 @@ class BadPriorityException(Exception):
 
 
 def notify_decorator(name, fn):
-    """ decorator for notify which is used from utils.monkey_patch()
+    """Decorator for notify which is used from utils.monkey_patch().
 
         :param name: name of the function
         :param function: - object of the function
index 938df94fe0bc89890779e3b948f0b09565ce27ee..f90ec80b958680d8ef547656795d68aa691f169f 100644 (file)
@@ -24,7 +24,9 @@ CONF = cfg.CONF
 
 def notify(_context, message):
     """Notifies the recipient of the desired event given the model.
-    Log notifications using openstack's default logging system"""
+
+    Log notifications using openstack's default logging system.
+    """
 
     priority = message.get('priority',
                            CONF.default_notification_level)
index bc7a56ca7ac0209c97dc9a1b83255b6850dcb77b..13d946e362d0fbb7c4134270eec63a3eb40fa87b 100644 (file)
@@ -15,5 +15,5 @@
 
 
 def notify(_context, message):
-    """Notifies the recipient of the desired event given the model"""
+    """Notifies the recipient of the desired event given the model."""
     pass
index 88bee890cfbbf434110267db8f9d133838b21f03..1b3c2597caca70de2b2856e34589c03fc167f9d2 100644 (file)
@@ -31,7 +31,7 @@ CONF.register_opt(notification_topic_opt)
 
 
 def notify(context, message):
-    """Sends a notification via RPC"""
+    """Sends a notification via RPC."""
     if not context:
         context = req_context.get_admin_context()
     priority = message.get('priority',
index 28817bb5da9b52b4dcf1b6276d69cfa5d16a742a..3688484c9f9dd6067a6abfb5a216c5488e6b3182 100644 (file)
@@ -37,7 +37,7 @@ CONF.register_opt(notification_topic_opt, opt_group)
 
 
 def notify(context, message):
-    """Sends a notification via RPC"""
+    """Sends a notification via RPC."""
     if not context:
         context = req_context.get_admin_context()
     priority = message.get('priority',
index 019d7d7cc65863fe30ee6df995126c6b3ee60171..ac73877482e1912faabf068a52231752cc610bb6 100644 (file)
@@ -34,10 +34,6 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import queue
 from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
-from oslo.config import cfg
 
 from heat.openstack.common import excutils
 from heat.openstack.common.gettextutils import _
@@ -46,16 +42,6 @@ from heat.openstack.common import log as logging
 from heat.openstack.common.rpc import common as rpc_common
 
 
-# TODO(pekowski): Remove this option in Havana.
-amqp_opts = [
-    cfg.BoolOpt('amqp_rpc_single_reply_queue',
-                default=False,
-                help='Enable a fast single reply queue if using AMQP based '
-                'RPC like RabbitMQ or Qpid.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
 UNIQUE_ID = '_unique_id'
 LOG = logging.getLogger(__name__)
 
@@ -83,7 +69,7 @@ class Pool(pools.Pool):
         # is the above "while loop" gets all the cached connections from the
         # pool and closes them, but never returns them to the pool, a pool
         # leak. The unit tests hang waiting for an item to be returned to the
-        # pool. The unit tests get here via the teatDown() method. In the run
+        # pool. The unit tests get here via the tearDown() method. In the run
         # time code, it gets here via cleanup() and only appears in service.py
         # just before doing a sys.exit(), so cleanup() only happens once and
         # the leakage is not a problem.
@@ -102,19 +88,19 @@ def get_connection_pool(conf, connection_cls):
 
 
 class ConnectionContext(rpc_common.Connection):
-    """The class that is actually returned to the caller of
-    create_connection().  This is essentially a wrapper around
-    Connection that supports 'with'.  It can also return a new
-    Connection, or one from a pool.  The function will also catch
-    when an instance of this class is to be deleted.  With that
-    we can return Connections to the pool on exceptions and so
-    forth without making the caller be responsible for catching
-    them.  If possible the function makes sure to return a
-    connection to the pool.
+    """The class that is actually returned to the create_connection() caller.
+
+    This is essentially a wrapper around Connection that supports 'with'.
+    It can also return a new Connection, or one from a pool.
+
+    The function will also catch when an instance of this class is to be
+    deleted.  With that we can return Connections to the pool on exceptions
+    and so forth without making the caller be responsible for catching them.
+    If possible the function makes sure to return a connection to the pool.
     """
 
     def __init__(self, conf, connection_pool, pooled=True, server_params=None):
-        """Create a new connection, or get one from the pool"""
+        """Create a new connection, or get one from the pool."""
         self.connection = None
         self.conf = conf
         self.connection_pool = connection_pool
@@ -127,7 +113,7 @@ class ConnectionContext(rpc_common.Connection):
         self.pooled = pooled
 
     def __enter__(self):
-        """When with ConnectionContext() is used, return self"""
+        """When with ConnectionContext() is used, return self."""
         return self
 
     def _done(self):
@@ -175,7 +161,7 @@ class ConnectionContext(rpc_common.Connection):
         self.connection.consume_in_thread()
 
     def __getattr__(self, key):
-        """Proxy all other calls to the Connection instance"""
+        """Proxy all other calls to the Connection instance."""
         if self.connection:
             return getattr(self.connection, key)
         else:
@@ -183,7 +169,7 @@ class ConnectionContext(rpc_common.Connection):
 
 
 class ReplyProxy(ConnectionContext):
-    """ Connection class for RPC replies / callbacks """
+    """Connection class for RPC replies / callbacks."""
     def __init__(self, conf, connection_pool):
         self._call_waiters = {}
         self._num_call_waiters = 0
@@ -197,9 +183,10 @@ class ReplyProxy(ConnectionContext):
         msg_id = message_data.pop('_msg_id', None)
         waiter = self._call_waiters.get(msg_id)
         if not waiter:
-            LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+            LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
                        ', message : %(data)s'), {'msg_id': msg_id,
                                                  'data': message_data})
+            LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
         else:
             waiter.put(message_data)
 
@@ -252,7 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
 
 
 class RpcContext(rpc_common.CommonRpcContext):
-    """Context that supports replying to a rpc.call"""
+    """Context that supports replying to a rpc.call."""
     def __init__(self, **kwargs):
         self.msg_id = kwargs.pop('msg_id', None)
         self.reply_q = kwargs.pop('reply_q', None)
@@ -339,8 +326,9 @@ def _add_unique_id(msg):
 
 
 class _ThreadPoolWithWait(object):
-    """Base class for a delayed invocation manager used by
-    the Connection class to start up green threads
+    """Base class for a delayed invocation manager.
+
+    Used by the Connection class to start up green threads
     to handle incoming messages.
     """
 
@@ -355,12 +343,14 @@ class _ThreadPoolWithWait(object):
 
 
 class CallbackWrapper(_ThreadPoolWithWait):
-    """Wraps a straight callback to allow it to be invoked in a green
-    thread.
+    """Wraps a straight callback.
+
+    Allows it to be invoked in a green thread.
     """
 
     def __init__(self, conf, callback, connection_pool):
-        """
+        """Initiates CallbackWrapper object.
+
         :param conf: cfg.CONF instance
         :param callback: a callable (probably a function)
         :param connection_pool: connection pool as returned by
@@ -491,7 +481,7 @@ class MulticallProxyWaiter(object):
         return result
 
     def __iter__(self):
-        """Return a result until we get a reply with an 'ending" flag"""
+        """Return a result until we get a reply with an 'ending' flag."""
         if self._done:
             raise StopIteration
         while True:
@@ -513,61 +503,8 @@ class MulticallProxyWaiter(object):
             yield result
 
 
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
-    def __init__(self, conf, connection, timeout):
-        self._connection = connection
-        self._iterator = connection.iterconsume(timeout=timeout or
-                                                conf.rpc_response_timeout)
-        self._result = None
-        self._done = False
-        self._got_ending = False
-        self._conf = conf
-        self.msg_id_cache = _MsgIdCache()
-
-    def done(self):
-        if self._done:
-            return
-        self._done = True
-        self._iterator.close()
-        self._iterator = None
-        self._connection.close()
-
-    def __call__(self, data):
-        """The consume() callback will call this.  Store the result."""
-        self.msg_id_cache.check_duplicate_message(data)
-        if data['failure']:
-            failure = data['failure']
-            self._result = rpc_common.deserialize_remote_exception(self._conf,
-                                                                   failure)
-
-        elif data.get('ending', False):
-            self._got_ending = True
-        else:
-            self._result = data['result']
-
-    def __iter__(self):
-        """Return a result until we get a 'None' response from consumer"""
-        if self._done:
-            raise StopIteration
-        while True:
-            try:
-                self._iterator.next()
-            except Exception:
-                with excutils.save_and_reraise_exception():
-                    self.done()
-            if self._got_ending:
-                self.done()
-                raise StopIteration
-            result = self._result
-            if isinstance(result, Exception):
-                self.done()
-                raise result
-            yield result
-
-
 def create_connection(conf, new, connection_pool):
-    """Create a connection"""
+    """Create a connection."""
     return ConnectionContext(conf, connection_pool, pooled=not new)
 
 
@@ -576,14 +513,6 @@ _reply_proxy_create_sem = semaphore.Semaphore()
 
 def multicall(conf, context, topic, msg, timeout, connection_pool):
     """Make a call that returns multiple times."""
-    # TODO(pekowski): Remove all these comments in Havana.
-    # For amqp_rpc_single_reply_queue = False,
-    # Can't use 'with' for multicall, as it returns an iterator
-    # that will continue to use the connection.  When it's done,
-    # connection.close() will get called which will put it back into
-    # the pool
-    # For amqp_rpc_single_reply_queue = True,
-    # The 'with' statement is mandatory for closing the connection
     LOG.debug(_('Making synchronous call on %s ...'), topic)
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
@@ -591,21 +520,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     _add_unique_id(msg)
     pack_context(msg, context)
 
-    # TODO(pekowski): Remove this flag and the code under the if clause
-    #                 in Havana.
-    if not conf.amqp_rpc_single_reply_queue:
-        conn = ConnectionContext(conf, connection_pool)
-        wait_msg = MulticallWaiter(conf, conn, timeout)
-        conn.declare_direct_consumer(msg_id, wait_msg)
+    with _reply_proxy_create_sem:
+        if not connection_pool.reply_proxy:
+            connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+    msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+    wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+    with ConnectionContext(conf, connection_pool) as conn:
         conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
-    else:
-        with _reply_proxy_create_sem:
-            if not connection_pool.reply_proxy:
-                connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
-        msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
-        wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
-        with ConnectionContext(conf, connection_pool) as conn:
-            conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
     return wait_msg
 
 
index 672c818c53c43cce130e4eb360d28b552e78f555..31ecbdf3d2edd83eaf88dfcbd71e7be436c40536 100644 (file)
@@ -70,6 +70,8 @@ _RPC_ENVELOPE_VERSION = '2.0'
 _VERSION_KEY = 'oslo.version'
 _MESSAGE_KEY = 'oslo.message'
 
+_REMOTE_POSTFIX = '_Remote'
+
 
 class RPCException(Exception):
     message = _("An unknown RPC related exception occurred.")
@@ -124,7 +126,8 @@ class Timeout(RPCException):
                 'info: "%(info)s"')
 
     def __init__(self, info=None, topic=None, method=None):
-        """
+        """Initiates Timeout object.
+
         :param info: Extra info to convey to the user
         :param topic: The topic that the rpc call was sent to
         :param rpc_method_name: The name of the rpc method being
@@ -221,9 +224,9 @@ class Connection(object):
         raise NotImplementedError()
 
     def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
-        """Register as a member of a group of consumers for a given topic from
-        the specified exchange.
+        """Register as a member of a group of consumers.
 
+        Uses given topic from the specified exchange.
         Exactly one member of a given pool will receive each message.
 
         A message will be delivered to multiple pools, if more than
@@ -312,9 +315,18 @@ def serialize_remote_exception(failure_info, log_failure=True):
     if hasattr(failure, 'kwargs'):
         kwargs = failure.kwargs
 
+    # NOTE(matiu): With cells, it's possible to re-raise remote, remote
+    # exceptions. Lets turn it back into the original exception type.
+    cls_name = str(failure.__class__.__name__)
+    mod_name = str(failure.__class__.__module__)
+    if (cls_name.endswith(_REMOTE_POSTFIX) and
+            mod_name.endswith(_REMOTE_POSTFIX)):
+        cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
+        mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
+
     data = {
-        'class': str(failure.__class__.__name__),
-        'module': str(failure.__class__.__module__),
+        'class': cls_name,
+        'module': mod_name,
         'message': six.text_type(failure),
         'tb': tb,
         'args': failure.args,
@@ -351,8 +363,9 @@ def deserialize_remote_exception(conf, data):
 
     ex_type = type(failure)
     str_override = lambda self: message
-    new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+    new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
                        {'__str__': str_override, '__unicode__': str_override})
+    new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
     try:
         # NOTE(ameade): Dynamically create a new exception type and swap it in
         # as the new type for the exception. This only works on user defined
@@ -414,10 +427,11 @@ class CommonRpcContext(object):
 
 
 class ClientException(Exception):
-    """This encapsulates some actual exception that is expected to be
-    hit by an RPC proxy object. Merely instantiating it records the
-    current exception information, which will be passed back to the
-    RPC client without exceptional logging."""
+    """Encapsulates actual exception expected to be hit by a RPC proxy object.
+
+    Merely instantiating it records the current exception information, which
+    will be passed back to the RPC client without exceptional logging.
+    """
     def __init__(self):
         self._exc_info = sys.exc_info()
 
@@ -434,11 +448,13 @@ def catch_client_exception(exceptions, func, *args, **kwargs):
 
 def client_exceptions(*exceptions):
     """Decorator for manager methods that raise expected exceptions.
+
     Marking a Manager method with this decorator allows the declaration
     of expected exceptions that the RPC layer should not consider fatal,
     and not log as if they were generated in a real error scenario. Note
     that this will cause listed exceptions to be wrapped in a
-    ClientException, which is used internally by the RPC layer."""
+    ClientException, which is used internally by the RPC layer.
+    """
     def outer(func):
         def inner(*args, **kwargs):
             return catch_client_exception(exceptions, func, *args, **kwargs)
index 4cc9b7c7e3b490161c80e524daf0a1c48db55aa1..a48d4cdbf14f1f87b1fdb5c97426ed7729388baa 100644 (file)
@@ -122,7 +122,7 @@ class Connection(object):
 
 
 def create_connection(conf, new=True):
-    """Create a connection"""
+    """Create a connection."""
     return Connection()
 
 
@@ -179,7 +179,7 @@ def cleanup():
 
 
 def fanout_cast(conf, context, topic, msg):
-    """Cast to all consumers of a topic"""
+    """Cast to all consumers of a topic."""
     check_serialize(msg)
     method = msg.get('method')
     if not method:
index ab7db713e3240b16874ac6185da706efec1b3a96..c29007dbd12f989f6dda34ce1255c72b4018b1c2 100644 (file)
@@ -132,7 +132,7 @@ class ConsumerBase(object):
         self.reconnect(channel)
 
     def reconnect(self, channel):
-        """Re-declare the queue after a rabbit reconnect"""
+        """Re-declare the queue after a rabbit reconnect."""
         self.channel = channel
         self.kwargs['channel'] = channel
         self.queue = kombu.entity.Queue(**self.kwargs)
@@ -173,7 +173,7 @@ class ConsumerBase(object):
         self.queue.consume(*args, callback=_callback, **options)
 
     def cancel(self):
-        """Cancel the consuming from the queue, if it has started"""
+        """Cancel the consuming from the queue, if it has started."""
         try:
             self.queue.cancel(self.tag)
         except KeyError as e:
@@ -184,7 +184,7 @@ class ConsumerBase(object):
 
 
 class DirectConsumer(ConsumerBase):
-    """Queue/consumer class for 'direct'"""
+    """Queue/consumer class for 'direct'."""
 
     def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
         """Init a 'direct' queue.
@@ -216,7 +216,7 @@ class DirectConsumer(ConsumerBase):
 
 
 class TopicConsumer(ConsumerBase):
-    """Consumer class for 'topic'"""
+    """Consumer class for 'topic'."""
 
     def __init__(self, conf, channel, topic, callback, tag, name=None,
                  exchange_name=None, **kwargs):
@@ -253,7 +253,7 @@ class TopicConsumer(ConsumerBase):
 
 
 class FanoutConsumer(ConsumerBase):
-    """Consumer class for 'fanout'"""
+    """Consumer class for 'fanout'."""
 
     def __init__(self, conf, channel, topic, callback, tag, **kwargs):
         """Init a 'fanout' queue.
@@ -286,7 +286,7 @@ class FanoutConsumer(ConsumerBase):
 
 
 class Publisher(object):
-    """Base Publisher class"""
+    """Base Publisher class."""
 
     def __init__(self, channel, exchange_name, routing_key, **kwargs):
         """Init the Publisher class with the exchange_name, routing_key,
@@ -298,7 +298,7 @@ class Publisher(object):
         self.reconnect(channel)
 
     def reconnect(self, channel):
-        """Re-establish the Producer after a rabbit reconnection"""
+        """Re-establish the Producer after a rabbit reconnection."""
         self.exchange = kombu.entity.Exchange(name=self.exchange_name,
                                               **self.kwargs)
         self.producer = kombu.messaging.Producer(exchange=self.exchange,
@@ -306,7 +306,7 @@ class Publisher(object):
                                                  routing_key=self.routing_key)
 
     def send(self, msg, timeout=None):
-        """Send a message"""
+        """Send a message."""
         if timeout:
             #
             # AMQP TTL is in milliseconds when set in the header.
@@ -317,7 +317,7 @@ class Publisher(object):
 
 
 class DirectPublisher(Publisher):
-    """Publisher class for 'direct'"""
+    """Publisher class for 'direct'."""
     def __init__(self, conf, channel, msg_id, **kwargs):
         """init a 'direct' publisher.
 
@@ -333,7 +333,7 @@ class DirectPublisher(Publisher):
 
 
 class TopicPublisher(Publisher):
-    """Publisher class for 'topic'"""
+    """Publisher class for 'topic'."""
     def __init__(self, conf, channel, topic, **kwargs):
         """init a 'topic' publisher.
 
@@ -352,7 +352,7 @@ class TopicPublisher(Publisher):
 
 
 class FanoutPublisher(Publisher):
-    """Publisher class for 'fanout'"""
+    """Publisher class for 'fanout'."""
     def __init__(self, conf, channel, topic, **kwargs):
         """init a 'fanout' publisher.
 
@@ -367,7 +367,7 @@ class FanoutPublisher(Publisher):
 
 
 class NotifyPublisher(TopicPublisher):
-    """Publisher class for 'notify'"""
+    """Publisher class for 'notify'."""
 
     def __init__(self, conf, channel, topic, **kwargs):
         self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
@@ -447,8 +447,9 @@ class Connection(object):
         self.reconnect()
 
     def _fetch_ssl_params(self):
-        """Handles fetching what ssl params
-        should be used for the connection (if any)"""
+        """Handles fetching what ssl params should be used for the connection
+        (if any).
+        """
         ssl_params = dict()
 
         # http://docs.python.org/library/ssl.html - ssl.wrap_socket
@@ -578,18 +579,18 @@ class Connection(object):
             self.reconnect()
 
     def get_channel(self):
-        """Convenience call for bin/clear_rabbit_queues"""
+        """Convenience call for bin/clear_rabbit_queues."""
         return self.channel
 
     def close(self):
-        """Close/release this connection"""
+        """Close/release this connection."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
         self.connection.release()
         self.connection = None
 
     def reset(self):
-        """Reset a connection so it can be used again"""
+        """Reset a connection so it can be used again."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
         self.channel.close()
@@ -618,7 +619,7 @@ class Connection(object):
         return self.ensure(_connect_error, _declare_consumer)
 
     def iterconsume(self, limit=None, timeout=None):
-        """Return an iterator that will consume from all queues/consumers"""
+        """Return an iterator that will consume from all queues/consumers."""
 
         info = {'do_consume': True}
 
@@ -648,7 +649,7 @@ class Connection(object):
             yield self.ensure(_error_callback, _consume)
 
     def cancel_consumer_thread(self):
-        """Cancel a consumer thread"""
+        """Cancel a consumer thread."""
         if self.consumer_thread is not None:
             self.consumer_thread.kill()
             try:
@@ -663,7 +664,7 @@ class Connection(object):
             proxy_cb.wait()
 
     def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
-        """Send to a publisher based on the publisher class"""
+        """Send to a publisher based on the publisher class."""
 
         def _error_callback(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
@@ -693,27 +694,27 @@ class Connection(object):
                               topic, callback)
 
     def declare_fanout_consumer(self, topic, callback):
-        """Create a 'fanout' consumer"""
+        """Create a 'fanout' consumer."""
         self.declare_consumer(FanoutConsumer, topic, callback)
 
     def direct_send(self, msg_id, msg):
-        """Send a 'direct' message"""
+        """Send a 'direct' message."""
         self.publisher_send(DirectPublisher, msg_id, msg)
 
     def topic_send(self, topic, msg, timeout=None):
-        """Send a 'topic' message"""
+        """Send a 'topic' message."""
         self.publisher_send(TopicPublisher, topic, msg, timeout)
 
     def fanout_send(self, topic, msg):
-        """Send a 'fanout' message"""
+        """Send a 'fanout' message."""
         self.publisher_send(FanoutPublisher, topic, msg)
 
     def notify_send(self, topic, msg, **kwargs):
-        """Send a notify message on a topic"""
+        """Send a notify message on a topic."""
         self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
 
     def consume(self, limit=None):
-        """Consume from all queues/consumers"""
+        """Consume from all queues/consumers."""
         it = self.iterconsume(limit=limit)
         while True:
             try:
@@ -722,7 +723,7 @@ class Connection(object):
                 return
 
     def consume_in_thread(self):
-        """Consumer from all queues/consumers in a greenthread"""
+        """Consumer from all queues/consumers in a greenthread."""
         def _consumer_thread():
             try:
                 self.consume()
@@ -733,7 +734,7 @@ class Connection(object):
         return self.consumer_thread
 
     def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer that calls a method in a proxy object"""
+        """Create a consumer that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -745,7 +746,7 @@ class Connection(object):
             self.declare_topic_consumer(topic, proxy_cb)
 
     def create_worker(self, topic, proxy, pool_name):
-        """Create a worker that calls a method in a proxy object"""
+        """Create a worker that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -778,7 +779,7 @@ class Connection(object):
 
 
 def create_connection(conf, new=True):
-    """Create a connection"""
+    """Create a connection."""
     return rpc_amqp.create_connection(
         conf, new,
         rpc_amqp.get_connection_pool(conf, Connection))
index 152dfd09f33d13d899ac24276ac8bd2087b8c51a..b98d84bd2d2afc3f360470a1d36fa696ecb83873 100644 (file)
@@ -31,6 +31,7 @@ from heat.openstack.common import log as logging
 from heat.openstack.common.rpc import amqp as rpc_amqp
 from heat.openstack.common.rpc import common as rpc_common
 
+qpid_codec = importutils.try_import("qpid.codec010")
 qpid_messaging = importutils.try_import("qpid.messaging")
 qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
 
@@ -69,6 +70,8 @@ qpid_opts = [
 
 cfg.CONF.register_opts(qpid_opts)
 
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
 
 class ConsumerBase(object):
     """Consumer base class."""
@@ -118,15 +121,32 @@ class ConsumerBase(object):
         self.reconnect(session)
 
     def reconnect(self, session):
-        """Re-declare the receiver after a qpid reconnect"""
+        """Re-declare the receiver after a qpid reconnect."""
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
 
+    def _unpack_json_msg(self, msg):
+        """Load the JSON data in msg if msg.content_type indicates that it
+           is necessary.  Put the loaded data back into msg.content and
+           update msg.content_type appropriately.
+
+        A Qpid Message containing a dict will have a content_type of
+        'amqp/map', whereas one containing a string that needs to be converted
+        back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+        :param msg: a Qpid Message object
+        :returns: None
+        """
+        if msg.content_type == JSON_CONTENT_TYPE:
+            msg.content = jsonutils.loads(msg.content)
+            msg.content_type = 'amqp/map'
+
     def consume(self):
-        """Fetch the message and pass it to the callback object"""
+        """Fetch the message and pass it to the callback object."""
         message = self.receiver.fetch()
         try:
+            self._unpack_json_msg(message)
             msg = rpc_common.deserialize_msg(message.content)
             self.callback(msg)
         except Exception:
@@ -139,7 +159,7 @@ class ConsumerBase(object):
 
 
 class DirectConsumer(ConsumerBase):
-    """Queue/consumer class for 'direct'"""
+    """Queue/consumer class for 'direct'."""
 
     def __init__(self, conf, session, msg_id, callback):
         """Init a 'direct' queue.
@@ -157,7 +177,7 @@ class DirectConsumer(ConsumerBase):
 
 
 class TopicConsumer(ConsumerBase):
-    """Consumer class for 'topic'"""
+    """Consumer class for 'topic'."""
 
     def __init__(self, conf, session, topic, callback, name=None,
                  exchange_name=None):
@@ -177,7 +197,7 @@ class TopicConsumer(ConsumerBase):
 
 
 class FanoutConsumer(ConsumerBase):
-    """Consumer class for 'fanout'"""
+    """Consumer class for 'fanout'."""
 
     def __init__(self, conf, session, topic, callback):
         """Init a 'fanout' queue.
@@ -196,7 +216,7 @@ class FanoutConsumer(ConsumerBase):
 
 
 class Publisher(object):
-    """Base Publisher class"""
+    """Base Publisher class."""
 
     def __init__(self, session, node_name, node_opts=None):
         """Init the Publisher class with the exchange_name, routing_key,
@@ -225,16 +245,43 @@ class Publisher(object):
         self.reconnect(session)
 
     def reconnect(self, session):
-        """Re-establish the Sender after a reconnection"""
+        """Re-establish the Sender after a reconnection."""
         self.sender = session.sender(self.address)
 
+    def _pack_json_msg(self, msg):
+        """Qpid cannot serialize dicts containing strings longer than 65535
+           characters.  This function dumps the message content to a JSON
+           string, which Qpid is able to handle.
+
+        :param msg: May be either a Qpid Message object or a bare dict.
+        :returns: A Qpid Message with its content field JSON encoded.
+        """
+        try:
+            msg.content = jsonutils.dumps(msg.content)
+        except AttributeError:
+            # Need to have a Qpid message so we can set the content_type.
+            msg = qpid_messaging.Message(jsonutils.dumps(msg))
+        msg.content_type = JSON_CONTENT_TYPE
+        return msg
+
     def send(self, msg):
-        """Send a message"""
+        """Send a message."""
+        try:
+            # Check if Qpid can encode the message
+            check_msg = msg
+            if not hasattr(check_msg, 'content_type'):
+                check_msg = qpid_messaging.Message(msg)
+            content_type = check_msg.content_type
+            enc, dec = qpid_messaging.message.get_codec(content_type)
+            enc(check_msg.content)
+        except qpid_codec.CodecException:
+            # This means the message couldn't be serialized as a dict.
+            msg = self._pack_json_msg(msg)
         self.sender.send(msg)
 
 
 class DirectPublisher(Publisher):
-    """Publisher class for 'direct'"""
+    """Publisher class for 'direct'."""
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
         super(DirectPublisher, self).__init__(session, msg_id,
@@ -242,7 +289,7 @@ class DirectPublisher(Publisher):
 
 
 class TopicPublisher(Publisher):
-    """Publisher class for 'topic'"""
+    """Publisher class for 'topic'."""
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
@@ -252,7 +299,7 @@ class TopicPublisher(Publisher):
 
 
 class FanoutPublisher(Publisher):
-    """Publisher class for 'fanout'"""
+    """Publisher class for 'fanout'."""
     def __init__(self, conf, session, topic):
         """init a 'fanout' publisher.
         """
@@ -262,7 +309,7 @@ class FanoutPublisher(Publisher):
 
 
 class NotifyPublisher(Publisher):
-    """Publisher class for notifications"""
+    """Publisher class for notifications."""
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
@@ -330,7 +377,7 @@ class Connection(object):
         return self.consumers[str(receiver)]
 
     def reconnect(self):
-        """Handles reconnecting and re-establishing sessions and queues"""
+        """Handles reconnecting and re-establishing sessions and queues."""
         attempt = 0
         delay = 1
         while True:
@@ -381,14 +428,20 @@ class Connection(object):
                 self.reconnect()
 
     def close(self):
-        """Close/release this connection"""
+        """Close/release this connection."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
-        self.connection.close()
+        try:
+            self.connection.close()
+        except Exception:
+            # NOTE(dripton) Logging exceptions that happen during cleanup just
+            # causes confusion; there's really nothing useful we can do with
+            # them.
+            pass
         self.connection = None
 
     def reset(self):
-        """Reset a connection so it can be used again"""
+        """Reset a connection so it can be used again."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
         self.session.close()
@@ -412,7 +465,7 @@ class Connection(object):
         return self.ensure(_connect_error, _declare_consumer)
 
     def iterconsume(self, limit=None, timeout=None):
-        """Return an iterator that will consume from all queues/consumers"""
+        """Return an iterator that will consume from all queues/consumers."""
 
         def _error_callback(exc):
             if isinstance(exc, qpid_exceptions.Empty):
@@ -436,7 +489,7 @@ class Connection(object):
             yield self.ensure(_error_callback, _consume)
 
     def cancel_consumer_thread(self):
-        """Cancel a consumer thread"""
+        """Cancel a consumer thread."""
         if self.consumer_thread is not None:
             self.consumer_thread.kill()
             try:
@@ -451,7 +504,7 @@ class Connection(object):
             proxy_cb.wait()
 
     def publisher_send(self, cls, topic, msg):
-        """Send to a publisher based on the publisher class"""
+        """Send to a publisher based on the publisher class."""
 
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
@@ -481,15 +534,15 @@ class Connection(object):
                               topic, callback)
 
     def declare_fanout_consumer(self, topic, callback):
-        """Create a 'fanout' consumer"""
+        """Create a 'fanout' consumer."""
         self.declare_consumer(FanoutConsumer, topic, callback)
 
     def direct_send(self, msg_id, msg):
-        """Send a 'direct' message"""
+        """Send a 'direct' message."""
         self.publisher_send(DirectPublisher, msg_id, msg)
 
     def topic_send(self, topic, msg, timeout=None):
-        """Send a 'topic' message"""
+        """Send a 'topic' message."""
         #
         # We want to create a message with attributes, e.g. a TTL. We
         # don't really need to keep 'msg' in its JSON format any longer
@@ -504,15 +557,15 @@ class Connection(object):
         self.publisher_send(TopicPublisher, topic, qpid_message)
 
     def fanout_send(self, topic, msg):
-        """Send a 'fanout' message"""
+        """Send a 'fanout' message."""
         self.publisher_send(FanoutPublisher, topic, msg)
 
     def notify_send(self, topic, msg, **kwargs):
-        """Send a notify message on a topic"""
+        """Send a notify message on a topic."""
         self.publisher_send(NotifyPublisher, topic, msg)
 
     def consume(self, limit=None):
-        """Consume from all queues/consumers"""
+        """Consume from all queues/consumers."""
         it = self.iterconsume(limit=limit)
         while True:
             try:
@@ -521,7 +574,7 @@ class Connection(object):
                 return
 
     def consume_in_thread(self):
-        """Consumer from all queues/consumers in a greenthread"""
+        """Consumer from all queues/consumers in a greenthread."""
         def _consumer_thread():
             try:
                 self.consume()
@@ -532,7 +585,7 @@ class Connection(object):
         return self.consumer_thread
 
     def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer that calls a method in a proxy object"""
+        """Create a consumer that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -548,7 +601,7 @@ class Connection(object):
         return consumer
 
     def create_worker(self, topic, proxy, pool_name):
-        """Create a worker that calls a method in a proxy object"""
+        """Create a worker that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -591,7 +644,7 @@ class Connection(object):
 
 
 def create_connection(conf, new=True):
-    """Create a connection"""
+    """Create a connection."""
     return rpc_amqp.create_connection(
         conf, new,
         rpc_amqp.get_connection_pool(conf, Connection))
index 97a924baae5620b5ecc1a4d5e25238553a1265f3..1a2f4b3ed8bea71b62d033946f480b34d24a661b 100644 (file)
@@ -30,7 +30,6 @@ from heat.openstack.common import excutils
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
-from heat.openstack.common import processutils as utils
 from heat.openstack.common.rpc import common as rpc_common
 
 zmq = importutils.try_import('eventlet.green.zmq')
@@ -85,8 +84,8 @@ matchmaker = None  # memoized matchmaker object
 
 
 def _serialize(data):
-    """
-    Serialization wrapper
+    """Serialization wrapper.
+
     We prefer using JSON, but it cannot encode all types.
     Error if a developer passes us bad data.
     """
@@ -98,18 +97,15 @@ def _serialize(data):
 
 
 def _deserialize(data):
-    """
-    Deserialization wrapper
-    """
+    """Deserialization wrapper."""
     LOG.debug(_("Deserializing: %s"), data)
     return jsonutils.loads(data)
 
 
 class ZmqSocket(object):
-    """
-    A tiny wrapper around ZeroMQ to simplify the send/recv protocol
-    and connection management.
+    """A tiny wrapper around ZeroMQ.
 
+    Simplifies the send/recv protocol and connection management.
     Can be used as a Context (supports the 'with' statement).
     """
 
@@ -199,26 +195,24 @@ class ZmqSocket(object):
             LOG.error("ZeroMQ socket could not be closed.")
         self.sock = None
 
-    def recv(self):
+    def recv(self, **kwargs):
         if not self.can_recv:
             raise RPCException(_("You cannot recv on this socket."))
-        return self.sock.recv_multipart()
+        return self.sock.recv_multipart(**kwargs)
 
-    def send(self, data):
+    def send(self, data, **kwargs):
         if not self.can_send:
             raise RPCException(_("You cannot send on this socket."))
-        self.sock.send_multipart(data)
+        self.sock.send_multipart(data, **kwargs)
 
 
 class ZmqClient(object):
     """Client for ZMQ sockets."""
 
-    def __init__(self, addr, socket_type=None, bind=False):
-        if socket_type is None:
-            socket_type = zmq.PUSH
-        self.outq = ZmqSocket(addr, socket_type, bind=bind)
+    def __init__(self, addr):
+        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
 
-    def cast(self, msg_id, topic, data, envelope=False):
+    def cast(self, msg_id, topic, data, envelope):
         msg_id = msg_id or 0
 
         if not envelope:
@@ -356,10 +350,9 @@ class ConsumerBase(object):
 
 
 class ZmqBaseReactor(ConsumerBase):
-    """
-    A consumer class implementing a
-    centralized casting broker (PULL-PUSH)
-    for RoundRobin requests.
+    """A consumer class implementing a centralized casting broker (PULL-PUSH).
+
+    Used for RoundRobin requests.
     """
 
     def __init__(self, conf):
@@ -430,10 +423,9 @@ class ZmqBaseReactor(ConsumerBase):
 
 
 class ZmqProxy(ZmqBaseReactor):
-    """
-    A consumer class implementing a
-    topic-based proxy, forwarding to
-    IPC sockets.
+    """A consumer class implementing a topic-based proxy.
+
+    Forwards to IPC sockets.
     """
 
     def __init__(self, conf):
@@ -446,11 +438,8 @@ class ZmqProxy(ZmqBaseReactor):
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
 
-        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
-        data = sock.recv()
-        topic = data[1]
-
-        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+        data = sock.recv(copy=False)
+        topic = data[1].bytes
 
         if topic.startswith('fanout~'):
             sock_type = zmq.PUB
@@ -492,9 +481,7 @@ class ZmqProxy(ZmqBaseReactor):
 
                 while(True):
                     data = self.topic_proxy[topic].get()
-                    out_sock.send(data)
-                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
-                              {'data': data})
+                    out_sock.send(data, copy=False)
 
             wait_sock_creation = eventlet.event.Event()
             eventlet.spawn(publisher, wait_sock_creation)
@@ -507,37 +494,35 @@ class ZmqProxy(ZmqBaseReactor):
 
         try:
             self.topic_proxy[topic].put_nowait(data)
-            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
-                      {'data': data})
         except eventlet.queue.Full:
             LOG.error(_("Local per-topic backlog buffer full for topic "
                         "%(topic)s. Dropping message.") % {'topic': topic})
 
     def consume_in_thread(self):
-        """Runs the ZmqProxy service"""
+        """Runs the ZmqProxy service."""
         ipc_dir = CONF.rpc_zmq_ipc_dir
         consume_in = "tcp://%s:%s" % \
             (CONF.rpc_zmq_bind_address,
              CONF.rpc_zmq_port)
         consumption_proxy = InternalContext(None)
 
-        if not os.path.isdir(ipc_dir):
-            try:
-                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
-                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
-                              ipc_dir, run_as_root=True)
-                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
-            except utils.ProcessExecutionError:
+        try:
+            os.makedirs(ipc_dir)
+        except os.error:
+            if not os.path.isdir(ipc_dir):
                 with excutils.save_and_reraise_exception():
-                    LOG.error(_("Could not create IPC directory %s") %
-                              (ipc_dir, ))
-
+                    LOG.error(_("Required IPC directory does not exist at"
+                                " %s") % (ipc_dir, ))
         try:
             self.register(consumption_proxy,
                           consume_in,
                           zmq.PULL,
                           out_bind=True)
         except zmq.ZMQError:
+            if os.access(ipc_dir, os.X_OK):
+                with excutils.save_and_reraise_exception():
+                    LOG.error(_("Permission denied to IPC directory at"
+                                " %s") % (ipc_dir, ))
             with excutils.save_and_reraise_exception():
                 LOG.error(_("Could not create ZeroMQ receiver daemon. "
                             "Socket may already be in use."))
@@ -547,8 +532,9 @@ class ZmqProxy(ZmqBaseReactor):
 
 def unflatten_envelope(packenv):
     """Unflattens the RPC envelope.
-       Takes a list and returns a dictionary.
-       i.e. [1,2,3,4] => {1: 2, 3: 4}
+
+    Takes a list and returns a dictionary.
+    i.e. [1,2,3,4] => {1: 2, 3: 4}
     """
     i = iter(packenv)
     h = {}
@@ -561,10 +547,9 @@ def unflatten_envelope(packenv):
 
 
 class ZmqReactor(ZmqBaseReactor):
-    """
-    A consumer class implementing a
-    consumer for messages. Can also be
-    used as a 1:1 proxy
+    """A consumer class implementing a consumer for messages.
+
+    Can also be used as a 1:1 proxy
     """
 
     def __init__(self, conf):
@@ -751,10 +736,9 @@ def _call(addr, context, topic, msg, timeout=None,
 
 def _multi_send(method, context, topic, msg, timeout=None,
                 envelope=False, _msg_id=None):
-    """
-    Wraps the sending of messages,
-    dispatches to the matchmaker and sends
-    message to all relevant hosts.
+    """Wraps the sending of messages.
+
+    Dispatches to the matchmaker and sends message to all relevant hosts.
     """
     conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
@@ -811,8 +795,8 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
 
 
 def notify(conf, context, topic, msg, envelope):
-    """
-    Send notification event.
+    """Send notification event.
+
     Notifications are sent to topic-priority.
     This differs from the AMQP drivers which send to topic.priority.
     """
index 2e4f63c218845076ad821f1db313779297c59e38..f822c715cec28cb1f69e471ef789d2ff29af2ba6 100644 (file)
@@ -48,8 +48,8 @@ class MatchMakerException(Exception):
 
 
 class Exchange(object):
-    """
-    Implements lookups.
+    """Implements lookups.
+
     Subclass this to support hashtables, dns, etc.
     """
     def __init__(self):
@@ -60,9 +60,7 @@ class Exchange(object):
 
 
 class Binding(object):
-    """
-    A binding on which to perform a lookup.
-    """
+    """A binding on which to perform a lookup."""
     def __init__(self):
         pass
 
@@ -71,10 +69,10 @@ class Binding(object):
 
 
 class MatchMakerBase(object):
-    """
-    Match Maker Base Class.
-    Build off HeartbeatMatchMakerBase if building a
-    heartbeat-capable MatchMaker.
+    """Match Maker Base Class.
+
+    Build off HeartbeatMatchMakerBase if building a heartbeat-capable
+    MatchMaker.
     """
     def __init__(self):
         # Array of tuples. Index [2] toggles negation, [3] is last-if-true
@@ -84,58 +82,47 @@ class MatchMakerBase(object):
                                   'registration or heartbeat.')
 
     def register(self, key, host):
-        """
-        Register a host on a backend.
+        """Register a host on a backend.
+
         Heartbeats, if applicable, may keepalive registration.
         """
         pass
 
     def ack_alive(self, key, host):
-        """
-        Acknowledge that a key.host is alive.
-        Used internally for updating heartbeats,
-        but may also be used publically to acknowledge
-        a system is alive (i.e. rpc message successfully
-        sent to host)
+        """Acknowledge that a key.host is alive.
+
+        Used internally for updating heartbeats, but may also be used
+        publically to acknowledge a system is alive (i.e. rpc message
+        successfully sent to host)
         """
         pass
 
     def is_alive(self, topic, host):
-        """
-        Checks if a host is alive.
-        """
+        """Checks if a host is alive."""
         pass
 
     def expire(self, topic, host):
-        """
-        Explicitly expire a host's registration.
-        """
+        """Explicitly expire a host's registration."""
         pass
 
     def send_heartbeats(self):
-        """
-        Send all heartbeats.
+        """Send all heartbeats.
+
         Use start_heartbeat to spawn a heartbeat greenthread,
         which loops this method.
         """
         pass
 
     def unregister(self, key, host):
-        """
-        Unregister a topic.
-        """
+        """Unregister a topic."""
         pass
 
     def start_heartbeat(self):
-        """
-        Spawn heartbeat greenthread.
-        """
+        """Spawn heartbeat greenthread."""
         pass
 
     def stop_heartbeat(self):
-        """
-        Destroys the heartbeat greenthread.
-        """
+        """Destroys the heartbeat greenthread."""
         pass
 
     def add_binding(self, binding, rule, last=True):
@@ -162,10 +149,10 @@ class MatchMakerBase(object):
 
 
 class HeartbeatMatchMakerBase(MatchMakerBase):
-    """
-    Base for a heart-beat capable MatchMaker.
-    Provides common methods for registering,
-    unregistering, and maintaining heartbeats.
+    """Base for a heart-beat capable MatchMaker.
+
+    Provides common methods for registering, unregistering, and maintaining
+    heartbeats.
     """
     def __init__(self):
         self.hosts = set()
@@ -175,8 +162,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         super(HeartbeatMatchMakerBase, self).__init__()
 
     def send_heartbeats(self):
-        """
-        Send all heartbeats.
+        """Send all heartbeats.
+
         Use start_heartbeat to spawn a heartbeat greenthread,
         which loops this method.
         """
@@ -184,32 +171,31 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
             self.ack_alive(key, host)
 
     def ack_alive(self, key, host):
-        """
-        Acknowledge that a host.topic is alive.
-        Used internally for updating heartbeats,
-        but may also be used publically to acknowledge
-        a system is alive (i.e. rpc message successfully
-        sent to host)
+        """Acknowledge that a host.topic is alive.
+
+        Used internally for updating heartbeats, but may also be used
+        publically to acknowledge a system is alive (i.e. rpc message
+        successfully sent to host)
         """
         raise NotImplementedError("Must implement ack_alive")
 
     def backend_register(self, key, host):
-        """
-        Implements registration logic.
+        """Implements registration logic.
+
         Called by register(self,key,host)
         """
         raise NotImplementedError("Must implement backend_register")
 
     def backend_unregister(self, key, key_host):
-        """
-        Implements de-registration logic.
+        """Implements de-registration logic.
+
         Called by unregister(self,key,host)
         """
         raise NotImplementedError("Must implement backend_unregister")
 
     def register(self, key, host):
-        """
-        Register a host on a backend.
+        """Register a host on a backend.
+
         Heartbeats, if applicable, may keepalive registration.
         """
         self.hosts.add(host)
@@ -221,9 +207,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         self.ack_alive(key, host)
 
     def unregister(self, key, host):
-        """
-        Unregister a topic.
-        """
+        """Unregister a topic."""
         if (key, host) in self.host_topic:
             del self.host_topic[(key, host)]
 
@@ -234,8 +218,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
                  {'key': key, 'host': host})
 
     def start_heartbeat(self):
-        """
-        Implementation of MatchMakerBase.start_heartbeat
+        """Implementation of MatchMakerBase.start_heartbeat.
+
         Launches greenthread looping send_heartbeats(),
         yielding for CONF.matchmaker_heartbeat_freq seconds
         between iterations.
@@ -252,16 +236,14 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         self._heart = eventlet.spawn(do_heartbeat)
 
     def stop_heartbeat(self):
-        """
-        Destroys the heartbeat greenthread.
-        """
+        """Destroys the heartbeat greenthread."""
         if self._heart:
             self._heart.kill()
 
 
 class DirectBinding(Binding):
-    """
-    Specifies a host in the key via a '.' character
+    """Specifies a host in the key via a '.' character.
+
     Although dots are used in the key, the behavior here is
     that it maps directly to a host, thus direct.
     """
@@ -272,8 +254,8 @@ class DirectBinding(Binding):
 
 
 class TopicBinding(Binding):
-    """
-    Where a 'bare' key without dots.
+    """Where a 'bare' key without dots.
+
     AMQP generally considers topic exchanges to be those *with* dots,
     but we deviate here in terminology as the behavior here matches
     that of a topic exchange (whereas where there are dots, behavior
@@ -310,8 +292,8 @@ class LocalhostExchange(Exchange):
 
 
 class DirectExchange(Exchange):
-    """
-    Exchange where all topic keys are split, sending to second half.
+    """Exchange where all topic keys are split, sending to second half.
+
     i.e. "compute.host" sends a message to "compute.host" running on "host"
     """
     def __init__(self):
@@ -323,8 +305,8 @@ class DirectExchange(Exchange):
 
 
 class MatchMakerLocalhost(MatchMakerBase):
-    """
-    Match Maker where all bare topics resolve to localhost.
+    """Match Maker where all bare topics resolve to localhost.
+
     Useful for testing.
     """
     def __init__(self, host='localhost'):
@@ -335,13 +317,13 @@ class MatchMakerLocalhost(MatchMakerBase):
 
 
 class MatchMakerStub(MatchMakerBase):
-    """
-    Match Maker where topics are untouched.
+    """Match Maker where topics are untouched.
+
     Useful for testing, or for AMQP/brokered queues.
     Will not work where knowledge of hosts is known (i.e. zeromq)
     """
     def __init__(self):
-        super(MatchMakerLocalhost, self).__init__()
+        super(MatchMakerStub, self).__init__()
 
         self.add_binding(FanoutBinding(), StubExchange())
         self.add_binding(DirectBinding(), StubExchange())
index 7f064d898b87c1af57535f7d7b620a48718040c6..18a8f607a297fb265c674fe83ea381aa47633545 100644 (file)
@@ -55,8 +55,8 @@ class RedisExchange(mm_common.Exchange):
 
 
 class RedisTopicExchange(RedisExchange):
-    """
-    Exchange where all topic keys are split, sending to second half.
+    """Exchange where all topic keys are split, sending to second half.
+
     i.e. "compute.host" sends a message to "compute" running on "host"
     """
     def run(self, topic):
@@ -77,9 +77,7 @@ class RedisTopicExchange(RedisExchange):
 
 
 class RedisFanoutExchange(RedisExchange):
-    """
-    Return a list of all hosts.
-    """
+    """Return a list of all hosts."""
     def run(self, topic):
         topic = topic.split('~', 1)[1]
         hosts = self.redis.smembers(topic)
@@ -90,9 +88,7 @@ class RedisFanoutExchange(RedisExchange):
 
 
 class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
-    """
-    MatchMaker registering and looking-up hosts with a Redis server.
-    """
+    """MatchMaker registering and looking-up hosts with a Redis server."""
     def __init__(self):
         super(MatchMakerRedis, self).__init__()
 
index 736ca9566e3c78a1580d36c8b8d88c216a71eab9..1b623b2a11a691815e269db6e77027e5f96311be 100644 (file)
@@ -43,9 +43,7 @@ LOG = logging.getLogger(__name__)
 
 
 class RingExchange(mm.Exchange):
-    """
-    Match Maker where hosts are loaded from a static file containing
-    a hashmap (JSON formatted).
+    """Match Maker where hosts are loaded from a static JSON formatted file.
 
     __init__ takes optional ring dictionary argument, otherwise
     loads the ringfile from CONF.mathcmaker_ringfile.
@@ -104,9 +102,7 @@ class FanoutRingExchange(RingExchange):
 
 
 class MatchMakerRing(mm.MatchMakerBase):
-    """
-    Match Maker where hosts are loaded from a static hashmap.
-    """
+    """Match Maker where hosts are loaded from a static hashmap."""
     def __init__(self, ring=None):
         super(MatchMakerRing, self).__init__()
         self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
index b22f7e776576482a200af736da33bb0354732136..178f40e827a97185ffff5448b4e6f83f4e825474 100644 (file)
@@ -76,6 +76,11 @@ class RpcProxy(object):
         """Return the topic to use for a message."""
         return topic if topic else self.topic
 
+    def can_send_version(self, version):
+        """Check to see if a version is compatible with the version cap."""
+        return (not self.version_cap or
+                rpc_common.version_is_compatible(self.version_cap, version))
+
     @staticmethod
     def make_namespaced_msg(method, namespace, **kwargs):
         return {'method': method, 'namespace': namespace, 'args': kwargs}
index 0a2c9c4f113c3504b10cd6107f2c6f64bd937b96..76c68310331b59c757a135fb974c129e13226f90 100644 (file)
@@ -18,7 +18,7 @@ import abc
 
 
 class Serializer(object):
-    """Generic (de-)serialization definition base class"""
+    """Generic (de-)serialization definition base class."""
     __metaclass__ = abc.ABCMeta
 
     @abc.abstractmethod
@@ -43,7 +43,7 @@ class Serializer(object):
 
 
 class NoOpSerializer(Serializer):
-    """A serializer that does nothing"""
+    """A serializer that does nothing."""
 
     def serialize_entity(self, context, entity):
         return entity
index b292a2476dd80a77a700150a4f498efe8340a4a4..7a02bedeefe88c1bc18014e0f2e1e37cd29afaf7 100644 (file)
@@ -30,7 +30,8 @@ LOG = logging.getLogger(__name__)
 class Service(service.Service):
     """Service object for binaries running on hosts.
 
-    A service enables rpc by listening to queues based on topic and host."""
+    A service enables rpc by listening to queues based on topic and host.
+    """
     def __init__(self, host, topic, manager=None):
         super(Service, self).__init__()
         self.host = host
index c4d16fec668aa765d6f6dd640b3f7008c6098c2d..0f61a68da13a8bc1f079ff594fdecfaaac5bbfef 100644 (file)
@@ -271,7 +271,7 @@ class ProcessLauncher(object):
         return wrap
 
     def wait(self):
-        """Loop waiting on children to die and respawning as necessary"""
+        """Loop waiting on children to die and respawning as necessary."""
 
         LOG.debug(_('Full set of CONF:'))
         CONF.log_opt_values(LOG, std_logging.DEBUG)
index c12064311a21024906a94fc222292c03bc2442a5..9387894546d20c803fae6f20b597b965f83344f3 100644 (file)
@@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__)
 
 
 def _thread_done(gt, *args, **kwargs):
-    """ Callback function to be passed to GreenThread.link() when we spawn()
+    """Callback function to be passed to GreenThread.link() when we spawn()
     Calls the :class:`ThreadGroup` to notify if.
 
     """
@@ -34,7 +34,7 @@ def _thread_done(gt, *args, **kwargs):
 
 
 class Thread(object):
-    """ Wrapper around a greenthread, that holds a reference to the
+    """Wrapper around a greenthread, that holds a reference to the
     :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
     it has done so it can be removed from the threads list.
     """
@@ -50,7 +50,7 @@ class Thread(object):
 
 
 class ThreadGroup(object):
-    """ The point of the ThreadGroup classis to:
+    """The point of the ThreadGroup classis to:
 
     * keep track of timers and greenthreads (making it easier to stop them
       when need be).
index 60943659076765f0f4838f476239262041a87b22..ac2441bcb41cea1faeb2f93133f68fd07b848e59 100644 (file)
@@ -32,7 +32,7 @@ PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
 
 
 def isotime(at=None, subsecond=False):
-    """Stringify time in ISO 8601 format"""
+    """Stringify time in ISO 8601 format."""
     if not at:
         at = utcnow()
     st = at.strftime(_ISO8601_TIME_FORMAT
@@ -44,7 +44,7 @@ def isotime(at=None, subsecond=False):
 
 
 def parse_isotime(timestr):
-    """Parse time from ISO 8601 format"""
+    """Parse time from ISO 8601 format."""
     try:
         return iso8601.parse_date(timestr)
     except iso8601.ParseError as e:
@@ -66,7 +66,7 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
 
 
 def normalize_time(timestamp):
-    """Normalize time in arbitrary timezone to UTC naive object"""
+    """Normalize time in arbitrary timezone to UTC naive object."""
     offset = timestamp.utcoffset()
     if offset is None:
         return timestamp
@@ -103,7 +103,7 @@ def utcnow():
 
 
 def iso8601_from_timestamp(timestamp):
-    """Returns a iso8601 formated date from timestamp"""
+    """Returns a iso8601 formated date from timestamp."""
     return isotime(datetime.datetime.utcfromtimestamp(timestamp))
 
 
@@ -111,9 +111,9 @@ utcnow.override_time = None
 
 
 def set_time_override(override_time=datetime.datetime.utcnow()):
-    """
-    Override utils.utcnow to return a constant time or a list thereof,
-    one at a time.
+    """Overrides utils.utcnow.
+
+    Make it return a constant time or a list thereof, one at a time.
     """
     utcnow.override_time = override_time
 
@@ -141,7 +141,8 @@ def clear_time_override():
 def marshall_now(now=None):
     """Make an rpc-safe datetime with microseconds.
 
-    Note: tzinfo is stripped, but not required for relative times."""
+    Note: tzinfo is stripped, but not required for relative times.
+    """
     if not now:
         now = utcnow()
     return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
@@ -161,7 +162,8 @@ def unmarshall_time(tyme):
 
 
 def delta_seconds(before, after):
-    """
+    """Return the difference between two timing objects.
+
     Compute the difference in seconds between two date, time, or
     datetime objects (as a float, to microsecond resolution).
     """
@@ -174,8 +176,7 @@ def delta_seconds(before, after):
 
 
 def is_soon(dt, window):
-    """
-    Determines if time is going to happen in the next window seconds.
+    """Determines if time is going to happen in the next window seconds.
 
     :params dt: the time
     :params window: minimum seconds to remain to consider the time not soon
index 6941af99870a57e22716139349e5959cdab52b32..ff8b5645bbfb598c58493150076c4fec0f0d8e9a 100644 (file)
@@ -20,13 +20,25 @@ import sys
 import install_venv_common as install_venv
 
 
+def first_file(file_list):
+    for candidate in file_list:
+        if os.path.exists(candidate):
+            return candidate
+
+
 def main(argv):
     root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
 
     venv = os.environ['VIRTUAL_ENV']
 
-    pip_requires = os.path.join(root, 'tools', 'pip-requires')
-    test_requires = os.path.join(root, 'tools', 'test-requires')
+    pip_requires = first_file([
+        os.path.join(root, 'requirements.txt'),
+        os.path.join(root, 'tools', 'pip-requires'),
+    ])
+    test_requires = first_file([
+        os.path.join(root, 'test-requirements.txt'),
+        os.path.join(root, 'tools', 'test-requires'),
+    ])
     py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
     project = 'heat'
     install = install_venv.InstallVenv(root, venv, pip_requires, test_requires,