def __init__(self, callbacks):
super(PluginRpcDispatcher, self).__init__(callbacks)
- def dispatch(self, rpc_ctxt, version, method, **kwargs):
+ def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
rpc_ctxt_dict = rpc_ctxt.to_dict()
user_id = rpc_ctxt_dict.pop('user_id', None)
if not user_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict)
return super(PluginRpcDispatcher, self).dispatch(
- quantum_ctxt, version, method, **kwargs)
+ quantum_ctxt, version, method, namespace, **kwargs)
"""
import itertools
-import uuid
+
+from quantum.openstack.common import uuidutils
def generate_request_id():
- return 'req-' + str(uuid.uuid4())
+ return 'req-%s' % uuidutils.generate_uuid()
class RequestContext(object):
"""
import gettext
+import os
-
-t = gettext.translation('openstack-common', 'locale', fallback=True)
+_localedir = os.environ.get('quantum'.upper() + '_LOCALEDIR')
+_t = gettext.translation('quantum', localedir=_localedir, fallback=True)
def _(msg):
- return t.ugettext(msg)
+ return _t.ugettext(msg)
+
+
+def install(domain):
+ """Install a _() function using the given translation domain.
+
+ Given a translation domain, install a _() function using gettext's
+ install() function.
+
+ The main difference from gettext.install() is that we allow
+ overriding the default localedir (e.g. /usr/share/locale) using
+ a translation-domain-specific environment variable (e.g.
+ NOVA_LOCALEDIR).
+ """
+ gettext.install(domain,
+ localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
+ unicode=True)
import inspect
import itertools
import json
+import types
import xmlrpclib
from quantum.openstack.common import timeutils
+_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+ inspect.isfunction, inspect.isgeneratorfunction,
+ inspect.isgenerator, inspect.istraceback, inspect.isframe,
+ inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+ inspect.isabstract]
+
+_simple_types = (types.NoneType, int, basestring, bool, float, long)
+
+
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives.
Therefore, convert_instances=True is lossy ... be aware.
"""
- nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
- inspect.isfunction, inspect.isgeneratorfunction,
- inspect.isgenerator, inspect.istraceback, inspect.isframe,
- inspect.iscode, inspect.isbuiltin, inspect.isroutine,
- inspect.isabstract]
- for test in nasty:
- if test(value):
- return unicode(value)
-
- # value of itertools.count doesn't get caught by inspects
- # above and results in infinite loop when list(value) is called.
+ # handle obvious types first - order of basic types determined by running
+ # full tests on nova project, resulting in the following counts:
+ # 572754 <type 'NoneType'>
+ # 460353 <type 'int'>
+ # 379632 <type 'unicode'>
+ # 274610 <type 'str'>
+ # 199918 <type 'dict'>
+ # 114200 <type 'datetime.datetime'>
+ # 51817 <type 'bool'>
+ # 26164 <type 'list'>
+ # 6491 <type 'float'>
+ # 283 <type 'tuple'>
+ # 19 <type 'long'>
+ if isinstance(value, _simple_types):
+ return value
+
+ if isinstance(value, datetime.datetime):
+ if convert_datetime:
+ return timeutils.strtime(value)
+ else:
+ return value
+
+ # value of itertools.count doesn't get caught by nasty_type_tests
+ # and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
return unicode(value)
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
+ if isinstance(value, dict):
+ return dict((k, recursive(v)) for k, v in value.iteritems())
+ elif isinstance(value, (list, tuple)):
+ return [recursive(lv) for lv in value]
+
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
- if isinstance(value, (list, tuple)):
- return [recursive(v) for v in value]
- elif isinstance(value, dict):
- return dict((k, recursive(v)) for k, v in value.iteritems())
- elif convert_datetime and isinstance(value, datetime.datetime):
+ if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
else:
+ if any(test(value) for test in _nasty_type_tests):
+ return unicode(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
- '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
- '%(message)s',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+ '%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
else:
log_root.setLevel(logging.WARNING)
- level = logging.NOTSET
for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod)
logger.setLevel(level)
- for handler in log_root.handlers:
- logger.addHandler(handler)
_loggers = {}
Network-related utilities and helper functions.
"""
-import logging
+from quantum.openstack.common import log as logging
+
LOG = logging.getLogger(__name__)
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
- deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',
# License for the specific language governing permissions and limitations
# under the License.
+import datetime
+import time
+from oslo.config import cfg
+
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
+
+
+periodic_opts = [
+ cfg.BoolOpt('run_external_periodic_tasks',
+ default=True,
+ help=('Some periodic tasks can be run in a separate process. '
+ 'Should we run them here?')),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
+DEFAULT_INTERVAL = 60.0
+
+
+class InvalidPeriodicTaskArg(Exception):
+ message = _("Unexpected argument for periodic task creation: %(arg)s.")
+
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
- 1. Without arguments '@periodic_task', this will be run on every tick
+ 1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.
- 2. With arguments, @periodic_task(ticks_between_runs=N), this will be
- run on every N ticks of the periodic scheduler.
+ 2. With arguments:
+ @periodic_task(spacing=N [, run_immediately=[True|False]])
+ this will be run on approximately every N seconds. If this number is
+ negative the periodic task will be disabled. If the run_immediately
+ argument is provided and has a value of 'True', the first run of the
+ task will be shortly after task scheduler starts. If
+ run_immediately is omitted or set to 'False', the first time the
+ task runs will be approximately N seconds after the task scheduler
+ starts.
"""
def decorator(f):
+ # Test for old style invocation
+ if 'ticks_between_runs' in kwargs:
+ raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
+
+ # Control if run at all
f._periodic_task = True
- f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+ f._periodic_external_ok = kwargs.pop('external_process_ok', False)
+ if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
+ f._periodic_enabled = False
+ else:
+ f._periodic_enabled = kwargs.pop('enabled', True)
+
+ # Control frequency
+ f._periodic_spacing = kwargs.pop('spacing', 0)
+ f._periodic_immediate = kwargs.pop('run_immediately', False)
+ if f._periodic_immediate:
+ f._periodic_last_run = None
+ else:
+ f._periodic_last_run = timeutils.utcnow()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
- # class, so, go ahead and initialize it. If the attribute is present,
+ # class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = []
try:
- cls._ticks_to_skip = cls._ticks_to_skip.copy()
+ cls._periodic_last_run = cls._periodic_last_run.copy()
except AttributeError:
- cls._ticks_to_skip = {}
+ cls._periodic_last_run = {}
+
+ try:
+ cls._periodic_spacing = cls._periodic_spacing.copy()
+ except AttributeError:
+ cls._periodic_spacing = {}
- # This uses __dict__ instead of
- # inspect.getmembers(cls, inspect.ismethod) so only the methods of the
- # current class are added when this class is scanned, and base classes
- # are not added redundantly.
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
+
+ if task._periodic_spacing < 0:
+ LOG.info(_('Skipping periodic task %(task)s because '
+ 'its interval is negative'),
+ {'task': name})
+ continue
+ if not task._periodic_enabled:
+ LOG.info(_('Skipping periodic task %(task)s because '
+ 'it is disabled'),
+ {'task': name})
+ continue
+
+ # A periodic spacing of zero indicates that this task should
+ # be run every pass
+ if task._periodic_spacing == 0:
+ task._periodic_spacing = None
+
cls._periodic_tasks.append((name, task))
- cls._ticks_to_skip[name] = task._ticks_between_runs
+ cls._periodic_spacing[name] = task._periodic_spacing
+ cls._periodic_last_run[name] = task._periodic_last_run
class PeriodicTasks(object):
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
+ idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
- ticks_to_skip = self._ticks_to_skip[task_name]
- if ticks_to_skip > 0:
- LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
- " ticks left until next run"),
- dict(full_task_name=full_task_name,
- ticks_to_skip=ticks_to_skip))
- self._ticks_to_skip[task_name] -= 1
- continue
+ now = timeutils.utcnow()
+ spacing = self._periodic_spacing[task_name]
+ last_run = self._periodic_last_run[task_name]
+
+ # If a periodic task is _nearly_ due, then we'll run it early
+ if spacing is not None and last_run is not None:
+ due = last_run + datetime.timedelta(seconds=spacing)
+ if not timeutils.is_soon(due, 0.2):
+ idle_for = min(idle_for, timeutils.delta_seconds(now, due))
+ continue
- self._ticks_to_skip[task_name] = task._ticks_between_runs
- LOG.debug(_("Running periodic task %(full_task_name)s"),
- dict(full_task_name=full_task_name))
+ if spacing is not None:
+ idle_for = min(idle_for, spacing)
+
+ LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+ self._periodic_last_run[task_name] = timeutils.utcnow()
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
- LOG.exception(_("Error during %(full_task_name)s:"
- " %(e)s"),
- dict(e=e, full_task_name=full_task_name))
+ LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+ locals())
+ time.sleep(0)
+
+ return idle_for
"""
import abc
-import logging
import re
import urllib
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
System-level utilities and helper functions.
"""
-import logging
import random
import shlex
from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
"""
import inspect
-import logging
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
- version = message_data.get('version', None)
+ version = message_data.get('version')
+ namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method,
+ namespace, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
"""
ctxt.update_store()
try:
- rval = self.proxy.dispatch(ctxt, version, method, **args)
+ rval = self.proxy.dispatch(ctxt, version, method, namespace,
+ **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
- failure = klass(**failure.get('kwargs', {}))
+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- def dispatch(self, ctxt, version, method, **kwargs):
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
+ :param namespace: The namespace for the requested method. If None,
+ the dispatcher will look for a method on a callback
+ object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
had_compatible = False
for proxyobj in self.callbacks:
- if hasattr(proxyobj, 'RPC_API_VERSION'):
+ # Check for namespace compatibility
+ try:
+ cb_namespace = proxyobj.RPC_API_NAMESPACE
+ except AttributeError:
+ cb_namespace = None
+
+ if namespace != cb_namespace:
+ continue
+
+ # Check for version compatibility
+ try:
rpc_api_version = proxyobj.RPC_API_VERSION
- else:
+ except AttributeError:
rpc_api_version = '1.0'
+
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
+
if not hasattr(proxyobj, method):
continue
if is_compatible:
self.topic = topic
self.proxy = proxy
- def call(self, context, version, method, args, timeout):
+ def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = self.proxy.dispatch(context, version, method, **args)
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, version, method, args, timeout)
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
def call(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, version, method, args, None)
+ consumer.call(context, version, method, namespace, args, None)
except Exception:
pass
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
- cfg.StrOpt('qpid_port',
- default='5672',
+ cfg.IntOpt('qpid_port',
+ default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.protocol = self.conf.qpid_protocol
+ self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):
try:
result = proxy.dispatch(
- ctx, data['version'], data['method'], **data['args'])
+ ctx, data['version'], data['method'],
+ data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
return
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
- default='300',
+ default=300,
help='Heartbeat frequency'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
- default='600',
+ default=600,
help='Heartbeat time-to-live.'),
]
"""Return the topic to use for a message."""
return topic if topic else self.topic
+ @staticmethod
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
@staticmethod
def make_msg(method, **kwargs):
- return {'method': method, 'args': kwargs}
+ return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+ CONF(sys.argv[1:], project='oslo')
+ logging.setup("oslo")
+
+ with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+ reactor.consume_in_thread()
+ reactor.wait()
self.needs_resync = False
self.sync_state()
- @periodic_task.periodic_task(ticks_between_runs=6)
+ @periodic_task.periodic_task(spacing=6)
def collect_stats(self, context):
for pool_id in self.cache.get_pool_ids():
try:
[call(None,
{'args':
{'devices': ['fake_device']},
- 'method':
- 'security_group_rules_for_devices'},
+ 'method': 'security_group_rules_for_devices',
+ 'namespace': None},
version=sg_rpc.SG_RPC_VERSION,
topic='fake_topic')])
[call(None,
{'args':
{'security_groups': ['fake_sgid']},
- 'method': 'security_groups_rule_updated'},
+ 'method': 'security_groups_rule_updated',
+ 'namespace': None},
version=sg_rpc.SG_RPC_VERSION,
topic='fake-security_group-update')])
[call(None,
{'args':
{'security_groups': ['fake_sgid']},
- 'method': 'security_groups_member_updated'},
+ 'method': 'security_groups_member_updated',
+ 'namespace': None},
version=sg_rpc.SG_RPC_VERSION,
topic='fake-security_group-update')])