From: Gary Kotton Date: Tue, 15 Jan 2013 13:00:03 +0000 (+0000) Subject: Update latest OSLO. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=0748f92e44461ef3b3c46a89426692d2643f85fe;p=openstack-build%2Fneutron-build.git Update latest OSLO. Changes include: 1. Removing the 'extras' in the pip-requires 2. Fixes for fake implementations for RPC calls 3. Version updates due to common version update Change-Id: Iefd32b3f7d529943b078e6d927d06043286ff94e --- diff --git a/quantum/openstack/common/cfg.py b/quantum/openstack/common/cfg.py index 10a91db31..4aad78adc 100644 --- a/quantum/openstack/common/cfg.py +++ b/quantum/openstack/common/cfg.py @@ -1735,11 +1735,13 @@ class CommonConfigOpts(ConfigOpts): BoolOpt('debug', short='d', default=False, - help='Print debugging output'), + help='Print debugging output (set logging level to ' + 'DEBUG instead of default WARNING level).'), BoolOpt('verbose', short='v', default=False, - help='Print more verbose output'), + help='Print more verbose output (set logging level to ' + 'INFO instead of default WARNING level).'), ] logging_cli_opts = [ diff --git a/quantum/openstack/common/importutils.py b/quantum/openstack/common/importutils.py index 2a28b455e..9dec764fb 100644 --- a/quantum/openstack/common/importutils.py +++ b/quantum/openstack/common/importutils.py @@ -57,3 +57,11 @@ def import_module(import_str): """Import a module.""" __import__(import_str) return sys.modules[import_str] + + +def try_import(import_str, default=None): + """Try to import a module and if it fails return default.""" + try: + return import_module(import_str) + except ImportError: + return default diff --git a/quantum/openstack/common/iniparser.py b/quantum/openstack/common/iniparser.py index 241284449..9bf399f0c 100644 --- a/quantum/openstack/common/iniparser.py +++ b/quantum/openstack/common/iniparser.py @@ -54,7 +54,7 @@ class BaseParser(object): value = value.strip() if ((value and value[0] == value[-1]) and - (value[0] == "\"" or value[0] == "'")): + (value[0] == "\"" or value[0] == "'")): value = value[1:-1] return key.strip(), [value] diff --git a/quantum/openstack/common/lockutils.py b/quantum/openstack/common/lockutils.py index ead7e13c7..b9500f7bd 100644 --- a/quantum/openstack/common/lockutils.py +++ b/quantum/openstack/common/lockutils.py @@ -220,6 +220,11 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): 'method': f.__name__}) retval = f(*args, **kwargs) finally: + LOG.debug(_('Released file lock "%(lock)s" at %(path)s' + ' for method "%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) # NOTE(vish): This removes the tempdir if we needed # to create one. This is used to cleanup # the locks left behind by unit tests. diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index 0640c3071..9ef544111 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -49,19 +49,19 @@ from quantum.openstack.common import notifier log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s.%(msecs)d %(levelname)s %(name)s ' + default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s ' '[%(request_id)s %(user)s %(tenant)s] %(instance)s' '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s.%(msecs)d %(process)d %(levelname)s ' + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' '%(name)s [-] %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s ' + default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' '%(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', @@ -259,7 +259,7 @@ class JSONFormatter(logging.Formatter): class PublishErrorsHandler(logging.Handler): def emit(self, record): if ('quantum.openstack.common.notifier.log_notifier' in - CONF.notification_driver): + CONF.notification_driver): return notifier.api.notify(None, 'error.publisher', 'error_notification', @@ -361,10 +361,12 @@ def _setup_logging_from_conf(product_name): datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt)) - if CONF.verbose or CONF.debug: + if CONF.debug: log_root.setLevel(logging.DEBUG) - else: + elif CONF.verbose: log_root.setLevel(logging.INFO) + else: + log_root.setLevel(logging.WARNING) level = logging.NOTSET for pair in CONF.default_log_levels: @@ -425,7 +427,7 @@ class LegacyFormatter(logging.Formatter): self._fmt = CONF.logging_default_format_string if (record.levelno == logging.DEBUG and - CONF.logging_debug_format_suffix): + CONF.logging_debug_format_suffix): self._fmt += " " + CONF.logging_debug_format_suffix # Cache this on the record, Logger will respect our formated copy diff --git a/quantum/openstack/common/policy.py b/quantum/openstack/common/policy.py index 2593d20b3..2b6703dab 100644 --- a/quantum/openstack/common/policy.py +++ b/quantum/openstack/common/policy.py @@ -574,19 +574,19 @@ class ParseState(object): for reduction, methname in self.reducers: if (len(self.tokens) >= len(reduction) and - self.tokens[-len(reduction):] == reduction): - # Get the reduction method - meth = getattr(self, methname) + self.tokens[-len(reduction):] == reduction): + # Get the reduction method + meth = getattr(self, methname) - # Reduce the token stream - results = meth(*self.values[-len(reduction):]) + # Reduce the token stream + results = meth(*self.values[-len(reduction):]) - # Update the tokens and values - self.tokens[-len(reduction):] = [r[0] for r in results] - self.values[-len(reduction):] = [r[1] for r in results] + # Update the tokens and values + self.tokens[-len(reduction):] = [r[0] for r in results] + self.values[-len(reduction):] = [r[1] for r in results] - # Check for any more reductions - return self.reduce() + # Check for any more reductions + return self.reduce() def shift(self, tok, value): """Adds one more token to the state. Calls reduce().""" diff --git a/quantum/openstack/common/rpc/impl_fake.py b/quantum/openstack/common/rpc/impl_fake.py index af1406615..779d24452 100644 --- a/quantum/openstack/common/rpc/impl_fake.py +++ b/quantum/openstack/common/rpc/impl_fake.py @@ -167,7 +167,7 @@ def cast(conf, context, topic, msg): pass -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): check_serialize(msg) diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index efee8461c..eef873a43 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from quantum.openstack.common import cfg from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils from quantum.openstack.common import jsonutils +from quantum.openstack.common import processutils as utils from quantum.openstack.common.rpc import common as rpc_common @@ -61,6 +63,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -413,12 +419,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -444,20 +444,81 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUSH if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) + + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) + + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + super(ZmqProxy, self).consume_in_thread() class ZmqReactor(ZmqBaseReactor): @@ -551,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None): +def _call(addr, context, msg_id, topic, msg, timeout=None, + serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -586,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload) + _cast(addr, context, msg_id, topic, payload, + serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply @@ -642,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, _topic, _topic, msg, timeout, serialize, force_envelope) return - return method(_addr, context, _topic, _topic, msg, timeout) + return method(_addr, context, _topic, _topic, msg, timeout, + serialize, force_envelope) def create_connection(conf, new=True): diff --git a/quantum/openstack/common/service.py b/quantum/openstack/common/service.py index 9e34d9ff1..e51b8f7d2 100644 --- a/quantum/openstack/common/service.py +++ b/quantum/openstack/common/service.py @@ -27,17 +27,17 @@ import sys import time import eventlet -import extras import logging as std_logging from quantum.openstack.common import cfg from quantum.openstack.common import eventlet_backdoor from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import importutils from quantum.openstack.common import log as logging from quantum.openstack.common import threadgroup -rpc = extras.try_import('quantum.openstack.common.rpc') +rpc = importutils.try_import('quantum.openstack.common.rpc') CONF = cfg.CONF LOG = logging.getLogger(__name__) diff --git a/tools/pip-requires b/tools/pip-requires index 331b5beb9..ff31a5141 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -5,7 +5,6 @@ amqplib==0.6.1 anyjson>=0.2.4 argparse eventlet>=0.9.17 -extras greenlet>=0.3.1 httplib2 iso8601>=0.1.4