]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Latest common updates
authorGary Kotton <gkotton@redhat.com>
Mon, 25 Feb 2013 17:55:58 +0000 (17:55 +0000)
committerGary Kotton <gkotton@redhat.com>
Mon, 25 Feb 2013 17:59:05 +0000 (17:59 +0000)
Fixes bug 1132908

Change-Id: I726bc5b3199d2fa606832418db9b6e20ffbc3879

quantum/openstack/common/eventlet_backdoor.py
quantum/openstack/common/log.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/setup.py

index 61ceded438e009914a523b6607ac6c7277878a48..8b81ebf8ebfce14b2c531ba4ccffb60b11425550 100644 (file)
@@ -51,12 +51,20 @@ def _print_greenthreads():
         print
 
 
+def _print_nativethreads():
+    for threadId, stack in sys._current_frames().items():
+        print threadId
+        traceback.print_stack(stack)
+        print
+
+
 def initialize_if_enabled():
     backdoor_locals = {
         'exit': _dont_use_this,      # So we don't exit the entire process
         'quit': _dont_use_this,      # So we don't exit the entire process
         'fo': _find_objects,
         'pgt': _print_greenthreads,
+        'pnt': _print_nativethreads,
     }
 
     if CONF.backdoor_port is None:
index ef7e27e8ad19b0f46a3634e29f7657ff7cb0c136..37245d4da56b1e4128821f92f6533718d116c240 100644 (file)
@@ -325,16 +325,11 @@ def _create_logging_excepthook(product_name):
 
 def setup(product_name):
     """Setup logging."""
-    sys.excepthook = _create_logging_excepthook(product_name)
-
     if CONF.log_config:
-        try:
-            logging.config.fileConfig(CONF.log_config)
-        except Exception:
-            traceback.print_exc()
-            raise
+        logging.config.fileConfig(CONF.log_config)
     else:
         _setup_logging_from_conf(product_name)
+    sys.excepthook = _create_logging_excepthook(product_name)
 
 
 def set_defaults(logging_context_format_string):
index 4b7b99dc2e23c3cd7c6d390cbdfde68d061e1e51..a6d5002d57e8ca653b0aecf15be331f2aa3b0e66 100644 (file)
@@ -25,6 +25,7 @@ Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
 AMQP, but is deprecated and predates this code.
 """
 
+import collections
 import inspect
 import sys
 import uuid
@@ -54,6 +55,7 @@ amqp_opts = [
 
 cfg.CONF.register_opts(amqp_opts)
 
+UNIQUE_ID = '_unique_id'
 LOG = logging.getLogger(__name__)
 
 
@@ -236,6 +238,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
                    'failure': failure}
         if ending:
             msg['ending'] = True
+        _add_unique_id(msg)
         # If a reply_q exists, add the msg_id to the reply and pass the
         # reply_q to direct_send() to use it as the response queue.
         # Otherwise use the msg_id for backward compatibilty.
@@ -302,6 +305,37 @@ def pack_context(msg, context):
     msg.update(context_d)
 
 
+class _MsgIdCache(object):
+    """This class checks any duplicate messages."""
+
+    # NOTE: This value is considered can be a configuration item, but
+    #       it is not necessary to change its value in most cases,
+    #       so let this value as static for now.
+    DUP_MSG_CHECK_SIZE = 16
+
+    def __init__(self, **kwargs):
+        self.prev_msgids = collections.deque([],
+                                             maxlen=self.DUP_MSG_CHECK_SIZE)
+
+    def check_duplicate_message(self, message_data):
+        """AMQP consumers may read same message twice when exceptions occur
+           before ack is returned. This method prevents doing it.
+        """
+        if UNIQUE_ID in message_data:
+            msg_id = message_data[UNIQUE_ID]
+            if msg_id not in self.prev_msgids:
+                self.prev_msgids.append(msg_id)
+            else:
+                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+    """Add unique_id for checking duplicate messages."""
+    unique_id = uuid.uuid4().hex
+    msg.update({UNIQUE_ID: unique_id})
+    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
 class _ThreadPoolWithWait(object):
     """Base class for a delayed invocation manager used by
     the Connection class to start up green threads
@@ -349,6 +383,7 @@ class ProxyCallback(_ThreadPoolWithWait):
             connection_pool=connection_pool,
         )
         self.proxy = proxy
+        self.msg_id_cache = _MsgIdCache()
 
     def __call__(self, message_data):
         """Consumer callback to call a method on a proxy object.
@@ -368,6 +403,7 @@ class ProxyCallback(_ThreadPoolWithWait):
         if hasattr(local.store, 'context'):
             del local.store.context
         rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+        self.msg_id_cache.check_duplicate_message(message_data)
         ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
         args = message_data.get('args', {})
@@ -422,6 +458,7 @@ class MulticallProxyWaiter(object):
         self._dataqueue = queue.LightQueue()
         # Add this caller to the reply proxy's call_waiters
         self._reply_proxy.add_call_waiter(self, self._msg_id)
+        self.msg_id_cache = _MsgIdCache()
 
     def put(self, data):
         self._dataqueue.put(data)
@@ -435,6 +472,7 @@ class MulticallProxyWaiter(object):
 
     def _process_data(self, data):
         result = None
+        self.msg_id_cache.check_duplicate_message(data)
         if data['failure']:
             failure = data['failure']
             result = rpc_common.deserialize_remote_exception(self._conf,
@@ -479,6 +517,7 @@ class MulticallWaiter(object):
         self._done = False
         self._got_ending = False
         self._conf = conf
+        self.msg_id_cache = _MsgIdCache()
 
     def done(self):
         if self._done:
@@ -490,6 +529,7 @@ class MulticallWaiter(object):
 
     def __call__(self, data):
         """The consume() callback will call this.  Store the result."""
+        self.msg_id_cache.check_duplicate_message(data)
         if data['failure']:
             failure = data['failure']
             self._result = rpc_common.deserialize_remote_exception(self._conf,
@@ -542,6 +582,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
     LOG.debug(_('MSG_ID is %s') % (msg_id))
+    _add_unique_id(msg)
     pack_context(msg, context)
 
     # TODO(pekowski): Remove this flag and the code under the if clause
@@ -575,6 +616,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
 def cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a topic without waiting for a response."""
     LOG.debug(_('Making asynchronous cast on %s...'), topic)
+    _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
         conn.topic_send(topic, rpc_common.serialize_msg(msg))
@@ -583,6 +625,7 @@ def cast(conf, context, topic, msg, connection_pool):
 def fanout_cast(conf, context, topic, msg, connection_pool):
     """Sends a message on a fanout exchange without waiting for a response."""
     LOG.debug(_('Making asynchronous fanout cast...'))
+    _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
         conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@@ -590,6 +633,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
 
 def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     """Sends a message on a topic to a specific server."""
+    _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
@@ -599,6 +643,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
 def fanout_cast_to_server(conf, context, server_params, topic, msg,
                           connection_pool):
     """Sends a message on a fanout exchange to a specific server."""
+    _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
@@ -610,6 +655,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
     LOG.debug(_('Sending %(event_type)s on %(topic)s'),
               dict(event_type=msg.get('event_type'),
                    topic=topic))
+    _add_unique_id(msg)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
         if envelope:
index 4421e7d951c2450b9ab2cf2fe5e712a53eea0c37..67a9ebc3b2878a12310a93bbd6dc7f762c168279 100644 (file)
@@ -49,8 +49,8 @@ deserialize_msg().
 The current message format (version 2.0) is very simple.  It is:
 
     {
-        'quantum.version': <RPC Envelope Version as a String>,
-        'quantum.message': <Application Message Payload, JSON encoded>
+        'oslo.version': <RPC Envelope Version as a String>,
+        'oslo.message': <Application Message Payload, JSON encoded>
     }
 
 Message format version '1.0' is just considered to be the messages we sent
@@ -66,8 +66,8 @@ to the messaging libraries as a dict.
 '''
 _RPC_ENVELOPE_VERSION = '2.0'
 
-_VERSION_KEY = 'quantum.version'
-_MESSAGE_KEY = 'quantum.message'
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
 
 
 # TODO(russellb) Turn this on after Grizzly.
@@ -125,6 +125,10 @@ class Timeout(RPCException):
     message = _("Timeout while waiting on RPC response.")
 
 
+class DuplicateMessageError(RPCException):
+    message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+
+
 class InvalidRPCConnectionReuse(RPCException):
     message = _("Invalid reuse of an RPC connection.")
 
index dd4d47e2185021cbd1c368183e51a26300f88432..2a7f2dd8bee1a958dd7df7c499c42e41fde99c45 100644 (file)
@@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase):
         """
         # Default options
         options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': True,
                    'exclusive': False}
         options.update(kwargs)
index caa89c88ea7d6377f4b5a64358b84406b542a5cd..421457a42e1d4ab12e88772d7180ee56853c4752 100644 (file)
@@ -216,12 +216,18 @@ class ZmqClient(object):
             socket_type = zmq.PUSH
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
-    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+    def cast(self, msg_id, topic, data, envelope=False):
         msg_id = msg_id or 0
 
-        if serialize:
-            data = rpc_common.serialize_msg(data, force_envelope)
-        self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+        if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+            self.outq.send(map(bytes,
+                           (msg_id, topic, 'cast', _serialize(data))))
+            return
+
+        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+        self.outq.send(map(bytes,
+                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
 
     def close(self):
         self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
         else:
             return [result]
 
-    def process(self, style, target, proxy, ctx, data):
+    def process(self, proxy, ctx, data):
         data.setdefault('version', None)
         data.setdefault('args', {})
 
@@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor):
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
-        msg_id, topic, style, in_msg = data
-        topic = topic.split('.', 1)[0]
+        topic = data[1]
 
         LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
 
-        if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+        if topic.startswith('fanout~'):
+            sock_type = zmq.PUB
+            topic = topic.split('.', 1)[0]
+        elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
         else:
             sock_type = zmq.PUSH
@@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor):
         super(ZmqProxy, self).consume_in_thread()
 
 
+def unflatten_envelope(packenv):
+    """Unflattens the RPC envelope.
+       Takes a list and returns a dictionary.
+       i.e. [1,2,3,4] => {1: 2, 3: 4}
+    """
+    i = iter(packenv)
+    h = {}
+    try:
+        while True:
+            k = i.next()
+            h[k] = i.next()
+    except StopIteration:
+        return h
+
+
 class ZmqReactor(ZmqBaseReactor):
     """
     A consumer class implementing a
@@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor):
             self.mapping[sock].send(data)
             return
 
-        msg_id, topic, style, in_msg = data
+        proxy = self.proxies[sock]
 
-        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
-        ctx = RpcContext.unmarshal(ctx)
+        if data[2] == 'cast':  # Legacy protocol
+            packenv = data[3]
 
-        proxy = self.proxies[sock]
+            ctx, msg = _deserialize(packenv)
+            request = rpc_common.deserialize_msg(msg)
+            ctx = RpcContext.unmarshal(ctx)
+        elif data[2] == 'impl_zmq_v2':
+            packenv = data[4:]
+
+            msg = unflatten_envelope(packenv)
+            request = rpc_common.deserialize_msg(msg)
+
+            # Unmarshal only after verifying the message.
+            ctx = RpcContext.unmarshal(data[3])
+        else:
+            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+            return
 
-        self.pool.spawn_n(self.process, style, topic,
-                          proxy, ctx, request)
+        self.pool.spawn_n(self.process, proxy, ctx, request)
 
 
 class Connection(rpc_common.Connection):
     """Manages connections and threads."""
 
     def __init__(self, conf):
+        self.topics = []
         self.reactor = ZmqReactor(conf)
 
     def create_consumer(self, topic, proxy, fanout=False):
-        # Only consume on the base topic name.
-        topic = topic.split('.', 1)[0]
-
-        LOG.info(_("Create Consumer for topic (%(topic)s)") %
-                 {'topic': topic})
-
         # Subscription scenarios
         if fanout:
-            subscribe = ('', fanout)[type(fanout) == str]
             sock_type = zmq.SUB
-            topic = 'fanout~' + topic
+            subscribe = ('', fanout)[type(fanout) == str]
+            topic = 'fanout~' + topic.split('.', 1)[0]
         else:
             sock_type = zmq.PULL
             subscribe = None
+            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+        if topic in self.topics:
+            LOG.info(_("Skipping topic registration. Already registered."))
+            return
 
         # Receive messages from (local) proxy
         inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -582,9 +617,11 @@ class Connection(rpc_common.Connection):
 
         self.reactor.register(proxy, inaddr, sock_type,
                               subscribe=subscribe, in_bind=False)
+        self.topics.append(topic)
 
     def close(self):
         self.reactor.close()
+        self.topics = []
 
     def wait(self):
         self.reactor.wait()
@@ -593,8 +630,8 @@ class Connection(rpc_common.Connection):
         self.reactor.consume_in_thread()
 
 
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
-          force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+          _msg_id=None):
     timeout_cast = timeout or CONF.rpc_cast_timeout
     payload = [RpcContext.marshal(context), msg]
 
@@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
             conn = ZmqClient(addr)
 
             # assumes cast can't return an exception
-            conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+            conn.cast(_msg_id, topic, payload, envelope)
         except zmq.ZMQError:
             raise RPCException("Cast failed. ZMQ Socket Exception")
         finally:
@@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
 
 
 def _call(addr, context, topic, msg, timeout=None,
-          serialize=True, force_envelope=False):
+          envelope=False):
     # timeout_response is how long we wait for a response
     timeout = timeout or CONF.rpc_response_timeout
 
@@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None,
     with Timeout(timeout, exception=rpc_common.Timeout):
         try:
             msg_waiter = ZmqSocket(
-                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+                "ipc://%s/zmq_topic_zmq_replies.%s" %
+                (CONF.rpc_zmq_ipc_dir,
+                 CONF.rpc_zmq_host),
                 zmq.SUB, subscribe=msg_id, bind=False
             )
 
             LOG.debug(_("Sending cast"))
-            _cast(addr, context, topic, payload,
-                  serialize=serialize, force_envelope=force_envelope)
+            _cast(addr, context, topic, payload, envelope)
 
             LOG.debug(_("Cast sent; Waiting reply"))
             # Blocks until receives reply
             msg = msg_waiter.recv()
             LOG.debug(_("Received message: %s"), msg)
             LOG.debug(_("Unpacking response"))
-            responses = _deserialize(msg[-1])[-1]['args']['response']
+
+            if msg[2] == 'cast':  # Legacy version
+                raw_msg = _deserialize(msg[-1])[-1]
+            elif msg[2] == 'impl_zmq_v2':
+                rpc_envelope = unflatten_envelope(msg[4:])
+                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+            else:
+                raise rpc_common.UnsupportedRpcEnvelopeVersion(
+                    _("Unsupported or unknown ZMQ envelope returned."))
+
+            responses = raw_msg['args']['response']
         # ZMQError trumps the Timeout error.
         except zmq.ZMQError:
             raise RPCException("ZMQ Socket Error")
@@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None,
     return responses[-1]
 
 
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
-                force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+                envelope=False, _msg_id=None):
     """
     Wraps the sending of messages,
     dispatches to the matchmaker and sends
@@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
 
         if method.__name__ == '_cast':
             eventlet.spawn_n(method, _addr, context,
-                             _topic, msg, timeout, serialize,
-                             force_envelope, _msg_id)
+                             _topic, msg, timeout, envelope,
+                             _msg_id)
             return
         return method(_addr, context, _topic, msg, timeout,
-                      serialize, force_envelope)
+                      envelope)
 
 
 def create_connection(conf, new=True):
@@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs):
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
     topic.replace('.', '-')
-    kwargs['serialize'] = kwargs.pop('envelope')
-    kwargs['force_envelope'] = True
+    kwargs['envelope'] = kwargs.get('envelope', True)
     cast(conf, context, topic, msg, **kwargs)
 
 
index 35680b30485b9d3c74fef593b759e796fca1f19d..22f864d50e530bb1afdd351a3f2c6342cea99257 100644 (file)
@@ -117,9 +117,9 @@ def _run_shell_command(cmd, throw_on_error=False):
         output = subprocess.Popen(["/bin/sh", "-c", cmd],
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
+    out = output.communicate()
     if output.returncode and throw_on_error:
         raise Exception("%s returned %d" % cmd, output.returncode)
-    out = output.communicate()
     if len(out) == 0:
         return None
     if len(out[0].strip()) == 0:
@@ -131,7 +131,7 @@ def write_git_changelog():
     """Write a changelog based on the git changelog."""
     new_changelog = 'ChangeLog'
     if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
-        if os.path.isdir('.git'):
+        if os.path.exists('.git'):
             git_log_cmd = 'git log --stat'
             changelog = _run_shell_command(git_log_cmd)
             mailmap = parse_mailmap()
@@ -147,7 +147,7 @@ def generate_authors():
     old_authors = 'AUTHORS.in'
     new_authors = 'AUTHORS'
     if not os.getenv('SKIP_GENERATE_AUTHORS'):
-        if os.path.isdir('.git'):
+        if os.path.exists('.git'):
             # don't include jenkins email address in AUTHORS file
             git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
                            "egrep -v '" + jenkins_email + "'")
@@ -279,7 +279,7 @@ def _get_version_from_git(pre_version):
     revision if there is one, or tag plus number of additional revisions
     if the current revision has no tag."""
 
-    if os.path.isdir('.git'):
+    if os.path.exists('.git'):
         if pre_version:
             try:
                 return _run_shell_command(