]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update latest OSLO.
authorGary Kotton <gkotton@redhat.com>
Tue, 15 Jan 2013 13:00:03 +0000 (13:00 +0000)
committerGary Kotton <gkotton@redhat.com>
Tue, 15 Jan 2013 14:43:39 +0000 (14:43 +0000)
Changes include:
1. Removing the 'extras' in the pip-requires
2. Fixes for fake implementations for RPC calls
3. Version updates due to common version update

Change-Id: Iefd32b3f7d529943b078e6d927d06043286ff94e

quantum/openstack/common/cfg.py
quantum/openstack/common/importutils.py
quantum/openstack/common/iniparser.py
quantum/openstack/common/lockutils.py
quantum/openstack/common/log.py
quantum/openstack/common/policy.py
quantum/openstack/common/rpc/impl_fake.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/service.py
tools/pip-requires

index 10a91db31d74ebede1bb34bd46c78b789359552b..4aad78adc04dd01cd1b798067a68e87447866ffa 100644 (file)
@@ -1735,11 +1735,13 @@ class CommonConfigOpts(ConfigOpts):
         BoolOpt('debug',
                 short='d',
                 default=False,
-                help='Print debugging output'),
+                help='Print debugging output (set logging level to '
+                     'DEBUG instead of default WARNING level).'),
         BoolOpt('verbose',
                 short='v',
                 default=False,
-                help='Print more verbose output'),
+                help='Print more verbose output (set logging level to '
+                     'INFO instead of default WARNING level).'),
     ]
 
     logging_cli_opts = [
index 2a28b455e85c811abc3a509e0d3f9775a815cff4..9dec764fb4097bf5860c562a514baec88571fe79 100644 (file)
@@ -57,3 +57,11 @@ def import_module(import_str):
     """Import a module."""
     __import__(import_str)
     return sys.modules[import_str]
+
+
+def try_import(import_str, default=None):
+    """Try to import a module and if it fails return default."""
+    try:
+        return import_module(import_str)
+    except ImportError:
+        return default
index 241284449e32c0be0200df6eec92b46b632d4cca..9bf399f0c7c9f6098887715b8b7e8c360e7deecd 100644 (file)
@@ -54,7 +54,7 @@ class BaseParser(object):
 
         value = value.strip()
         if ((value and value[0] == value[-1]) and
-            (value[0] == "\"" or value[0] == "'")):
+                (value[0] == "\"" or value[0] == "'")):
             value = value[1:-1]
         return key.strip(), [value]
 
index ead7e13c7ffae4909833d388273a24c7abc33dc2..b9500f7bdbb7ad60a0a288455b285c37443d38c8 100644 (file)
@@ -220,6 +220,11 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
                                        'method': f.__name__})
                             retval = f(*args, **kwargs)
                     finally:
+                        LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
+                                    ' for method "%(method)s"...'),
+                                  {'lock': name,
+                                   'path': lock_file_path,
+                                   'method': f.__name__})
                         # NOTE(vish): This removes the tempdir if we needed
                         #             to create one. This is used to cleanup
                         #             the locks left behind by unit tests.
index 0640c307111bcfc28db56332ad50b5069302458e..9ef5441112600b7bced38b4606d94da5323849ff 100644 (file)
@@ -49,19 +49,19 @@ from quantum.openstack.common import notifier
 
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
-               default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
+               default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
                        '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
                        '%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
-               default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
+               default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
                        '%(name)s [-] %(instance)s%(message)s',
                help='format string to use for log messages without context'),
     cfg.StrOpt('logging_debug_format_suffix',
                default='%(funcName)s %(pathname)s:%(lineno)d',
                help='data to append to log format when level is DEBUG'),
     cfg.StrOpt('logging_exception_prefix',
-               default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
+               default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
                '%(instance)s',
                help='prefix each line of exception output with this format'),
     cfg.ListOpt('default_log_levels',
@@ -259,7 +259,7 @@ class JSONFormatter(logging.Formatter):
 class PublishErrorsHandler(logging.Handler):
     def emit(self, record):
         if ('quantum.openstack.common.notifier.log_notifier' in
-            CONF.notification_driver):
+                CONF.notification_driver):
             return
         notifier.api.notify(None, 'error.publisher',
                             'error_notification',
@@ -361,10 +361,12 @@ def _setup_logging_from_conf(product_name):
                                                    datefmt=datefmt))
         handler.setFormatter(LegacyFormatter(datefmt=datefmt))
 
-    if CONF.verbose or CONF.debug:
+    if CONF.debug:
         log_root.setLevel(logging.DEBUG)
-    else:
+    elif CONF.verbose:
         log_root.setLevel(logging.INFO)
+    else:
+        log_root.setLevel(logging.WARNING)
 
     level = logging.NOTSET
     for pair in CONF.default_log_levels:
@@ -425,7 +427,7 @@ class LegacyFormatter(logging.Formatter):
             self._fmt = CONF.logging_default_format_string
 
         if (record.levelno == logging.DEBUG and
-            CONF.logging_debug_format_suffix):
+                CONF.logging_debug_format_suffix):
             self._fmt += " " + CONF.logging_debug_format_suffix
 
         # Cache this on the record, Logger will respect our formated copy
index 2593d20b3db26c488923fd98a7a99a814f8da56d..2b6703dab4057e157250a74670ad31fc5da68349 100644 (file)
@@ -574,19 +574,19 @@ class ParseState(object):
 
         for reduction, methname in self.reducers:
             if (len(self.tokens) >= len(reduction) and
-                self.tokens[-len(reduction):] == reduction):
-                    # Get the reduction method
-                    meth = getattr(self, methname)
+                    self.tokens[-len(reduction):] == reduction):
+                # Get the reduction method
+                meth = getattr(self, methname)
 
-                    # Reduce the token stream
-                    results = meth(*self.values[-len(reduction):])
+                # Reduce the token stream
+                results = meth(*self.values[-len(reduction):])
 
-                    # Update the tokens and values
-                    self.tokens[-len(reduction):] = [r[0] for r in results]
-                    self.values[-len(reduction):] = [r[1] for r in results]
+                # Update the tokens and values
+                self.tokens[-len(reduction):] = [r[0] for r in results]
+                self.values[-len(reduction):] = [r[1] for r in results]
 
-                    # Check for any more reductions
-                    return self.reduce()
+                # Check for any more reductions
+                return self.reduce()
 
     def shift(self, tok, value):
         """Adds one more token to the state.  Calls reduce()."""
index af1406615accb74f872911bf195fc05507496e1f..779d2445293806fa67e0b938aea3bad7984962a7 100644 (file)
@@ -167,7 +167,7 @@ def cast(conf, context, topic, msg):
         pass
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     check_serialize(msg)
 
 
index efee8461cb8f947a7ed7312e278befeee7ecab74..eef873a43734ad8d03227e8be5d6499fb0840066 100644 (file)
@@ -15,6 +15,7 @@
 #    under the License.
 
 import pprint
+import os
 import socket
 import string
 import sys
@@ -29,6 +30,7 @@ from quantum.openstack.common import cfg
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
+from quantum.openstack.common import processutils as utils
 from quantum.openstack.common.rpc import common as rpc_common
 
 
@@ -61,6 +63,10 @@ zmq_opts = [
     cfg.IntOpt('rpc_zmq_contexts', default=1,
                help='Number of ZeroMQ contexts, defaults to 1'),
 
+    cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+               help='Maximum number of ingress messages to locally buffer '
+                    'per topic. Default is unlimited.'),
+
     cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
                help='Directory for holding IPC sockets'),
 
@@ -413,12 +419,6 @@ class ZmqProxy(ZmqBaseReactor):
         super(ZmqProxy, self).__init__(conf)
 
         self.topic_proxy = {}
-        ipc_dir = CONF.rpc_zmq_ipc_dir
-
-        self.topic_proxy['zmq_replies'] = \
-            ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
-                      zmq.PUB, bind=True)
-        self.sockets.append(self.topic_proxy['zmq_replies'])
 
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
@@ -444,20 +444,81 @@ class ZmqProxy(ZmqBaseReactor):
             sock_type = zmq.PUSH
 
         if not topic in self.topic_proxy:
-            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
-                             sock_type, bind=True)
-            self.topic_proxy[topic] = outq
-            self.sockets.append(outq)
-            LOG.info(_("Created topic proxy: %s"), topic)
+            def publisher(waiter):
+                LOG.info(_("Creating proxy for topic: %s"), topic)
+
+                try:
+                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+                                         (ipc_dir, topic),
+                                         sock_type, bind=True)
+                except RPCException:
+                    waiter.send_exception(*sys.exc_info())
+                    return
+
+                self.topic_proxy[topic] = eventlet.queue.LightQueue(
+                    CONF.rpc_zmq_topic_backlog)
+                self.sockets.append(out_sock)
+
+                # It takes some time for a pub socket to open,
+                # before we can have any faith in doing a send() to it.
+                if sock_type == zmq.PUB:
+                    eventlet.sleep(.5)
+
+                waiter.send(True)
+
+                while(True):
+                    data = self.topic_proxy[topic].get()
+                    out_sock.send(data)
+                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+                              {'data': data})
+
+            wait_sock_creation = eventlet.event.Event()
+            eventlet.spawn(publisher, wait_sock_creation)
+
+            try:
+                wait_sock_creation.wait()
+            except RPCException:
+                LOG.error(_("Topic socket file creation failed."))
+                return
+
+        try:
+            self.topic_proxy[topic].put_nowait(data)
+            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+                      {'data': data})
+        except eventlet.queue.Full:
+            LOG.error(_("Local per-topic backlog buffer full for topic "
+                        "%(topic)s. Dropping message.") % {'topic': topic})
+
+    def consume_in_thread(self):
+        """Runs the ZmqProxy service"""
+        ipc_dir = CONF.rpc_zmq_ipc_dir
+        consume_in = "tcp://%s:%s" % \
+            (CONF.rpc_zmq_bind_address,
+             CONF.rpc_zmq_port)
+        consumption_proxy = InternalContext(None)
+
+        if not os.path.isdir(ipc_dir):
+            try:
+                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+                              ipc_dir, run_as_root=True)
+                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+            except utils.ProcessExecutionError:
+                LOG.error(_("Could not create IPC directory %s") %
+                          (ipc_dir, ))
+                raise
 
-            # It takes some time for a pub socket to open,
-            # before we can have any faith in doing a send() to it.
-            if sock_type == zmq.PUB:
-                eventlet.sleep(.5)
+        try:
+            self.register(consumption_proxy,
+                          consume_in,
+                          zmq.PULL,
+                          out_bind=True)
+        except zmq.ZMQError:
+            LOG.error(_("Could not create ZeroMQ receiver daemon. "
+                        "Socket may already be in use."))
+            raise
 
-        LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
-        self.topic_proxy[topic].send(data)
-        LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+        super(ZmqProxy, self).consume_in_thread()
 
 
 class ZmqReactor(ZmqBaseReactor):
@@ -551,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
                 conn.close()
 
 
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+          serialize=True, force_envelope=False):
     # timeout_response is how long we wait for a response
     timeout = timeout or CONF.rpc_response_timeout
 
@@ -586,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
             )
 
             LOG.debug(_("Sending cast"))
-            _cast(addr, context, msg_id, topic, payload)
+            _cast(addr, context, msg_id, topic, payload,
+                  serialize=serialize, force_envelope=force_envelope)
 
             LOG.debug(_("Cast sent; Waiting reply"))
             # Blocks until receives reply
@@ -642,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
                              _topic, _topic, msg, timeout, serialize,
                              force_envelope)
             return
-        return method(_addr, context, _topic, _topic, msg, timeout)
+        return method(_addr, context, _topic, _topic, msg, timeout,
+                      serialize, force_envelope)
 
 
 def create_connection(conf, new=True):
index 9e34d9ff12594b5159cdb626c19b38c0e018ec55..e51b8f7d2701279b7253d20d3819533b6caed7c1 100644 (file)
@@ -27,17 +27,17 @@ import sys
 import time
 
 import eventlet
-import extras
 import logging as std_logging
 
 from quantum.openstack.common import cfg
 from quantum.openstack.common import eventlet_backdoor
 from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import threadgroup
 
 
-rpc = extras.try_import('quantum.openstack.common.rpc')
+rpc = importutils.try_import('quantum.openstack.common.rpc')
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
index 331b5beb9b060bbe2c2f7189a21c4cd35eff83ec..ff31a51410f12631fdaae2c7806811ef7b8dedc7 100644 (file)
@@ -5,7 +5,6 @@ amqplib==0.6.1
 anyjson>=0.2.4
 argparse
 eventlet>=0.9.17
-extras
 greenlet>=0.3.1
 httplib2
 iso8601>=0.1.4