From: Gary Kotton Date: Sat, 20 Apr 2013 12:19:26 +0000 (+0000) Subject: Update latest OSLO code X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=15a144558388e9a620c8e6751d8b067daee29c3f;p=openstack-build%2Fneutron-build.git Update latest OSLO code Change-Id: I804d1eae92e89740339546f0d0f490a3e4f21204 --- diff --git a/quantum/common/rpc.py b/quantum/common/rpc.py index 06803873c..0c19d4424 100644 --- a/quantum/common/rpc.py +++ b/quantum/common/rpc.py @@ -31,7 +31,7 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher): def __init__(self, callbacks): super(PluginRpcDispatcher, self).__init__(callbacks) - def dispatch(self, rpc_ctxt, version, method, **kwargs): + def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs): rpc_ctxt_dict = rpc_ctxt.to_dict() user_id = rpc_ctxt_dict.pop('user_id', None) if not user_id: @@ -41,4 +41,4 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher): tenant_id = rpc_ctxt_dict.pop('project_id', None) quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict) return super(PluginRpcDispatcher, self).dispatch( - quantum_ctxt, version, method, **kwargs) + quantum_ctxt, version, method, namespace, **kwargs) diff --git a/quantum/openstack/common/context.py b/quantum/openstack/common/context.py index e9cfd73cc..44cdbf1ea 100644 --- a/quantum/openstack/common/context.py +++ b/quantum/openstack/common/context.py @@ -23,11 +23,12 @@ context or provide additional information in their specific WSGI pipeline. """ import itertools -import uuid + +from quantum.openstack.common import uuidutils def generate_request_id(): - return 'req-' + str(uuid.uuid4()) + return 'req-%s' % uuidutils.generate_uuid() class RequestContext(object): diff --git a/quantum/openstack/common/gettextutils.py b/quantum/openstack/common/gettextutils.py index 81076c1f3..967d93a21 100644 --- a/quantum/openstack/common/gettextutils.py +++ b/quantum/openstack/common/gettextutils.py @@ -24,10 +24,27 @@ Usual usage in an openstack.common module: """ import gettext +import os - -t = gettext.translation('openstack-common', 'locale', fallback=True) +_localedir = os.environ.get('quantum'.upper() + '_LOCALEDIR') +_t = gettext.translation('quantum', localedir=_localedir, fallback=True) def _(msg): - return t.ugettext(msg) + return _t.ugettext(msg) + + +def install(domain): + """Install a _() function using the given translation domain. + + Given a translation domain, install a _() function using gettext's + install() function. + + The main difference from gettext.install() is that we allow + overriding the default localedir (e.g. /usr/share/locale) using + a translation-domain-specific environment variable (e.g. + NOVA_LOCALEDIR). + """ + gettext.install(domain, + localedir=os.environ.get(domain.upper() + '_LOCALEDIR'), + unicode=True) diff --git a/quantum/openstack/common/jsonutils.py b/quantum/openstack/common/jsonutils.py index ed98df3c1..ae7feee23 100644 --- a/quantum/openstack/common/jsonutils.py +++ b/quantum/openstack/common/jsonutils.py @@ -38,11 +38,21 @@ import functools import inspect import itertools import json +import types import xmlrpclib from quantum.openstack.common import timeutils +_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, + inspect.isfunction, inspect.isgeneratorfunction, + inspect.isgenerator, inspect.istraceback, inspect.isframe, + inspect.iscode, inspect.isbuiltin, inspect.isroutine, + inspect.isabstract] + +_simple_types = (types.NoneType, int, basestring, bool, float, long) + + def to_primitive(value, convert_instances=False, convert_datetime=True, level=0, max_depth=3): """Convert a complex object into primitives. @@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, Therefore, convert_instances=True is lossy ... be aware. """ - nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod, - inspect.isfunction, inspect.isgeneratorfunction, - inspect.isgenerator, inspect.istraceback, inspect.isframe, - inspect.iscode, inspect.isbuiltin, inspect.isroutine, - inspect.isabstract] - for test in nasty: - if test(value): - return unicode(value) - - # value of itertools.count doesn't get caught by inspects - # above and results in infinite loop when list(value) is called. + # handle obvious types first - order of basic types determined by running + # full tests on nova project, resulting in the following counts: + # 572754 + # 460353 + # 379632 + # 274610 + # 199918 + # 114200 + # 51817 + # 26164 + # 6491 + # 283 + # 19 + if isinstance(value, _simple_types): + return value + + if isinstance(value, datetime.datetime): + if convert_datetime: + return timeutils.strtime(value) + else: + return value + + # value of itertools.count doesn't get caught by nasty_type_tests + # and results in infinite loop when list(value) is called. if type(value) == itertools.count: return unicode(value) @@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, convert_datetime=convert_datetime, level=level, max_depth=max_depth) + if isinstance(value, dict): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif isinstance(value, (list, tuple)): + return [recursive(lv) for lv in value] + # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled if isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) - if isinstance(value, (list, tuple)): - return [recursive(v) for v in value] - elif isinstance(value, dict): - return dict((k, recursive(v)) for k, v in value.iteritems()) - elif convert_datetime and isinstance(value, datetime.datetime): + if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) @@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Ignore class member vars. return recursive(value.__dict__, level=level + 1) else: + if any(test(value) for test in _nasty_type_tests): + return unicode(value) return value except TypeError: # Class objects are tricky since they may define something like diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index a5ba8c22c..31265236a 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -112,9 +112,9 @@ generic_log_opts = [ log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s ' - '[%(request_id)s %(user)s %(tenant)s] %(instance)s' - '%(message)s', + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [%(request_id)s %(user)s %(tenant)s] ' + '%(instance)s%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' @@ -432,14 +432,11 @@ def _setup_logging_from_conf(): else: log_root.setLevel(logging.WARNING) - level = logging.NOTSET for pair in CONF.default_log_levels: mod, _sep, level_name = pair.partition('=') level = logging.getLevelName(level_name) logger = logging.getLogger(mod) logger.setLevel(level) - for handler in log_root.handlers: - logger.addHandler(handler) _loggers = {} diff --git a/quantum/openstack/common/network_utils.py b/quantum/openstack/common/network_utils.py index 5224e01aa..0f243fd8a 100644 --- a/quantum/openstack/common/network_utils.py +++ b/quantum/openstack/common/network_utils.py @@ -19,7 +19,8 @@ Network-related utilities and helper functions. """ -import logging +from quantum.openstack.common import log as logging + LOG = logging.getLogger(__name__) diff --git a/quantum/openstack/common/notifier/api.py b/quantum/openstack/common/notifier/api.py index 5af9f9a2d..4a390e501 100644 --- a/quantum/openstack/common/notifier/api.py +++ b/quantum/openstack/common/notifier/api.py @@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__) notifier_opts = [ cfg.MultiStrOpt('notification_driver', default=[], - deprecated_name='list_notifier_drivers', help='Driver or drivers to handle sending notifications'), cfg.StrOpt('default_notification_level', default='INFO', diff --git a/quantum/openstack/common/periodic_task.py b/quantum/openstack/common/periodic_task.py index 43d125b50..b0017c284 100644 --- a/quantum/openstack/common/periodic_task.py +++ b/quantum/openstack/common/periodic_task.py @@ -13,26 +13,72 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import time +from oslo.config import cfg + from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging +from quantum.openstack.common import timeutils + + +periodic_opts = [ + cfg.BoolOpt('run_external_periodic_tasks', + default=True, + help=('Some periodic tasks can be run in a separate process. ' + 'Should we run them here?')), +] + +CONF = cfg.CONF +CONF.register_opts(periodic_opts) LOG = logging.getLogger(__name__) +DEFAULT_INTERVAL = 60.0 + + +class InvalidPeriodicTaskArg(Exception): + message = _("Unexpected argument for periodic task creation: %(arg)s.") + def periodic_task(*args, **kwargs): """Decorator to indicate that a method is a periodic task. This decorator can be used in two ways: - 1. Without arguments '@periodic_task', this will be run on every tick + 1. Without arguments '@periodic_task', this will be run on every cycle of the periodic scheduler. - 2. With arguments, @periodic_task(ticks_between_runs=N), this will be - run on every N ticks of the periodic scheduler. + 2. With arguments: + @periodic_task(spacing=N [, run_immediately=[True|False]]) + this will be run on approximately every N seconds. If this number is + negative the periodic task will be disabled. If the run_immediately + argument is provided and has a value of 'True', the first run of the + task will be shortly after task scheduler starts. If + run_immediately is omitted or set to 'False', the first time the + task runs will be approximately N seconds after the task scheduler + starts. """ def decorator(f): + # Test for old style invocation + if 'ticks_between_runs' in kwargs: + raise InvalidPeriodicTaskArg(arg='ticks_between_runs') + + # Control if run at all f._periodic_task = True - f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + f._periodic_external_ok = kwargs.pop('external_process_ok', False) + if f._periodic_external_ok and not CONF.run_external_periodic_tasks: + f._periodic_enabled = False + else: + f._periodic_enabled = kwargs.pop('enabled', True) + + # Control frequency + f._periodic_spacing = kwargs.pop('spacing', 0) + f._periodic_immediate = kwargs.pop('run_immediately', False) + if f._periodic_immediate: + f._periodic_last_run = None + else: + f._periodic_last_run = timeutils.utcnow() return f # NOTE(sirp): The `if` is necessary to allow the decorator to be used with @@ -59,7 +105,7 @@ class _PeriodicTasksMeta(type): super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) # NOTE(sirp): if the attribute is not present then we must be the base - # class, so, go ahead and initialize it. If the attribute is present, + # class, so, go ahead an initialize it. If the attribute is present, # then we're a subclass so make a copy of it so we don't step on our # parent's toes. try: @@ -68,20 +114,39 @@ class _PeriodicTasksMeta(type): cls._periodic_tasks = [] try: - cls._ticks_to_skip = cls._ticks_to_skip.copy() + cls._periodic_last_run = cls._periodic_last_run.copy() except AttributeError: - cls._ticks_to_skip = {} + cls._periodic_last_run = {} + + try: + cls._periodic_spacing = cls._periodic_spacing.copy() + except AttributeError: + cls._periodic_spacing = {} - # This uses __dict__ instead of - # inspect.getmembers(cls, inspect.ismethod) so only the methods of the - # current class are added when this class is scanned, and base classes - # are not added redundantly. for value in cls.__dict__.values(): if getattr(value, '_periodic_task', False): task = value name = task.__name__ + + if task._periodic_spacing < 0: + LOG.info(_('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + continue + if not task._periodic_enabled: + LOG.info(_('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + continue + + # A periodic spacing of zero indicates that this task should + # be run every pass + if task._periodic_spacing == 0: + task._periodic_spacing = None + cls._periodic_tasks.append((name, task)) - cls._ticks_to_skip[name] = task._ticks_between_runs + cls._periodic_spacing[name] = task._periodic_spacing + cls._periodic_last_run[name] = task._periodic_last_run class PeriodicTasks(object): @@ -89,27 +154,34 @@ class PeriodicTasks(object): def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" + idle_for = DEFAULT_INTERVAL for task_name, task in self._periodic_tasks: full_task_name = '.'.join([self.__class__.__name__, task_name]) - ticks_to_skip = self._ticks_to_skip[task_name] - if ticks_to_skip > 0: - LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" - " ticks left until next run"), - dict(full_task_name=full_task_name, - ticks_to_skip=ticks_to_skip)) - self._ticks_to_skip[task_name] -= 1 - continue + now = timeutils.utcnow() + spacing = self._periodic_spacing[task_name] + last_run = self._periodic_last_run[task_name] + + # If a periodic task is _nearly_ due, then we'll run it early + if spacing is not None and last_run is not None: + due = last_run + datetime.timedelta(seconds=spacing) + if not timeutils.is_soon(due, 0.2): + idle_for = min(idle_for, timeutils.delta_seconds(now, due)) + continue - self._ticks_to_skip[task_name] = task._ticks_between_runs - LOG.debug(_("Running periodic task %(full_task_name)s"), - dict(full_task_name=full_task_name)) + if spacing is not None: + idle_for = min(idle_for, spacing) + + LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + self._periodic_last_run[task_name] = timeutils.utcnow() try: task(self, context) except Exception as e: if raise_on_error: raise - LOG.exception(_("Error during %(full_task_name)s:" - " %(e)s"), - dict(e=e, full_task_name=full_task_name)) + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) + time.sleep(0) + + return idle_for diff --git a/quantum/openstack/common/policy.py b/quantum/openstack/common/policy.py index 95b37f7de..88789ea90 100644 --- a/quantum/openstack/common/policy.py +++ b/quantum/openstack/common/policy.py @@ -57,7 +57,6 @@ as it allows particular rules to be explicitly disabled. """ import abc -import logging import re import urllib @@ -65,6 +64,7 @@ import urllib2 from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/quantum/openstack/common/processutils.py b/quantum/openstack/common/processutils.py index 9287cad94..4b806acbf 100644 --- a/quantum/openstack/common/processutils.py +++ b/quantum/openstack/common/processutils.py @@ -19,7 +19,6 @@ System-level utilities and helper functions. """ -import logging import random import shlex @@ -27,6 +26,7 @@ from eventlet.green import subprocess from eventlet import greenthread from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/quantum/openstack/common/rpc/__init__.py b/quantum/openstack/common/rpc/__init__.py index bd10bb777..cef3c349e 100644 --- a/quantum/openstack/common/rpc/__init__.py +++ b/quantum/openstack/common/rpc/__init__.py @@ -26,13 +26,13 @@ For some wrappers that add message versioning to rpc, see: """ import inspect -import logging from oslo.config import cfg from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils from quantum.openstack.common import local +from quantum.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index f2bb8f5a7..c3994f1b0 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index 7b428d9e0..76bb85a76 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -339,7 +339,7 @@ def deserialize_remote_exception(conf, data): if not issubclass(klass, Exception): raise TypeError("Can only deserialize Exceptions") - failure = klass(**failure.get('kwargs', {})) + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) except (AttributeError, TypeError, ImportError): return RemoteError(name, failure.get('message'), trace) diff --git a/quantum/openstack/common/rpc/dispatcher.py b/quantum/openstack/common/rpc/dispatcher.py index de212227c..16487a7a7 100644 --- a/quantum/openstack/common/rpc/dispatcher.py +++ b/quantum/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/quantum/openstack/common/rpc/impl_fake.py b/quantum/openstack/common/rpc/impl_fake.py index 727480281..ef4092856 100644 --- a/quantum/openstack/common/rpc/impl_fake.py +++ b/quantum/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index 7d3ed441a..f56e14b03 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -40,8 +40,8 @@ qpid_opts = [ cfg.StrOpt('qpid_hostname', default='localhost', help='Qpid broker hostname'), - cfg.StrOpt('qpid_port', - default='5672', + cfg.IntOpt('qpid_port', + default=5672, help='Qpid broker port'), cfg.ListOpt('qpid_hosts', default=['$qpid_hostname:$qpid_port'], @@ -320,7 +320,7 @@ class Connection(object): # Reconnection is done by self.reconnect() self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat - self.connection.protocol = self.conf.qpid_protocol + self.connection.transport = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay def _register_consumer(self, consumer): diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index 867a23ced..d69abe4ee 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index 4fa0d98a3..78650d1f2 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -35,10 +35,10 @@ matchmaker_opts = [ default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), cfg.IntOpt('matchmaker_heartbeat_freq', - default='300', + default=300, help='Heartbeat frequency'), cfg.IntOpt('matchmaker_heartbeat_ttl', - default='600', + default=600, help='Heartbeat time-to-live.'), ] diff --git a/quantum/openstack/common/rpc/proxy.py b/quantum/openstack/common/rpc/proxy.py index ed56d47f6..4946831f4 100644 --- a/quantum/openstack/common/rpc/proxy.py +++ b/quantum/openstack/common/rpc/proxy.py @@ -58,9 +58,13 @@ class RpcProxy(object): """Return the topic to use for a message.""" return topic if topic else self.topic + @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. diff --git a/quantum/openstack/common/rpc/zmq_receiver.py b/quantum/openstack/common/rpc/zmq_receiver.py new file mode 100755 index 000000000..57c32d710 --- /dev/null +++ b/quantum/openstack/common/rpc/zmq_receiver.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import sys + +from oslo.config import cfg + +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.setup("oslo") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() diff --git a/quantum/plugins/services/agent_loadbalancer/agent/manager.py b/quantum/plugins/services/agent_loadbalancer/agent/manager.py index 8d49cf7de..d84bdfc62 100644 --- a/quantum/plugins/services/agent_loadbalancer/agent/manager.py +++ b/quantum/plugins/services/agent_loadbalancer/agent/manager.py @@ -145,7 +145,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): self.needs_resync = False self.sync_state() - @periodic_task.periodic_task(ticks_between_runs=6) + @periodic_task.periodic_task(spacing=6) def collect_stats(self, context): for pool_id in self.cache.get_pool_ids(): try: diff --git a/quantum/tests/unit/test_security_groups_rpc.py b/quantum/tests/unit/test_security_groups_rpc.py index f99d0c52b..a6b7bfd78 100644 --- a/quantum/tests/unit/test_security_groups_rpc.py +++ b/quantum/tests/unit/test_security_groups_rpc.py @@ -519,8 +519,8 @@ class SecurityGroupServerRpcApiTestCase(base.BaseTestCase): [call(None, {'args': {'devices': ['fake_device']}, - 'method': - 'security_group_rules_for_devices'}, + 'method': 'security_group_rules_for_devices', + 'namespace': None}, version=sg_rpc.SG_RPC_VERSION, topic='fake_topic')]) @@ -544,7 +544,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase): [call(None, {'args': {'security_groups': ['fake_sgid']}, - 'method': 'security_groups_rule_updated'}, + 'method': 'security_groups_rule_updated', + 'namespace': None}, version=sg_rpc.SG_RPC_VERSION, topic='fake-security_group-update')]) @@ -555,7 +556,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase): [call(None, {'args': {'security_groups': ['fake_sgid']}, - 'method': 'security_groups_member_updated'}, + 'method': 'security_groups_member_updated', + 'namespace': None}, version=sg_rpc.SG_RPC_VERSION, topic='fake-security_group-update')])