]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update latest OSLO code
authorGary Kotton <gkotton@redhat.com>
Sat, 20 Apr 2013 12:19:26 +0000 (12:19 +0000)
committerGary Kotton <gkotton@redhat.com>
Sun, 21 Apr 2013 09:42:59 +0000 (09:42 +0000)
Change-Id: I804d1eae92e89740339546f0d0f490a3e4f21204

22 files changed:
quantum/common/rpc.py
quantum/openstack/common/context.py
quantum/openstack/common/gettextutils.py
quantum/openstack/common/jsonutils.py
quantum/openstack/common/log.py
quantum/openstack/common/network_utils.py
quantum/openstack/common/notifier/api.py
quantum/openstack/common/periodic_task.py
quantum/openstack/common/policy.py
quantum/openstack/common/processutils.py
quantum/openstack/common/rpc/__init__.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/dispatcher.py
quantum/openstack/common/rpc/impl_fake.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/rpc/matchmaker.py
quantum/openstack/common/rpc/proxy.py
quantum/openstack/common/rpc/zmq_receiver.py [new file with mode: 0755]
quantum/plugins/services/agent_loadbalancer/agent/manager.py
quantum/tests/unit/test_security_groups_rpc.py

index 06803873c92806461bf910e55a92ba873ba69b0d..0c19d4424e8e14e8f0695526d917c4ff15822400 100644 (file)
@@ -31,7 +31,7 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher):
     def __init__(self, callbacks):
         super(PluginRpcDispatcher, self).__init__(callbacks)
 
-    def dispatch(self, rpc_ctxt, version, method, **kwargs):
+    def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
         rpc_ctxt_dict = rpc_ctxt.to_dict()
         user_id = rpc_ctxt_dict.pop('user_id', None)
         if not user_id:
@@ -41,4 +41,4 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher):
             tenant_id = rpc_ctxt_dict.pop('project_id', None)
         quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict)
         return super(PluginRpcDispatcher, self).dispatch(
-            quantum_ctxt, version, method, **kwargs)
+            quantum_ctxt, version, method, namespace, **kwargs)
index e9cfd73cc110a1d67ff18cb6e1cf12f481c3f063..44cdbf1ea87e03c46aff0563c5de2d9ffd75bc16 100644 (file)
@@ -23,11 +23,12 @@ context or provide additional information in their specific WSGI pipeline.
 """
 
 import itertools
-import uuid
+
+from quantum.openstack.common import uuidutils
 
 
 def generate_request_id():
-    return 'req-' + str(uuid.uuid4())
+    return 'req-%s' % uuidutils.generate_uuid()
 
 
 class RequestContext(object):
index 81076c1f3bf22bb45dbbb35225a33d29431ccedc..967d93a219a0c4c11116cfcdb9bd5f50c30feb70 100644 (file)
@@ -24,10 +24,27 @@ Usual usage in an openstack.common module:
 """
 
 import gettext
+import os
 
-
-t = gettext.translation('openstack-common', 'locale', fallback=True)
+_localedir = os.environ.get('quantum'.upper() + '_LOCALEDIR')
+_t = gettext.translation('quantum', localedir=_localedir, fallback=True)
 
 
 def _(msg):
-    return t.ugettext(msg)
+    return _t.ugettext(msg)
+
+
+def install(domain):
+    """Install a _() function using the given translation domain.
+
+    Given a translation domain, install a _() function using gettext's
+    install() function.
+
+    The main difference from gettext.install() is that we allow
+    overriding the default localedir (e.g. /usr/share/locale) using
+    a translation-domain-specific environment variable (e.g.
+    NOVA_LOCALEDIR).
+    """
+    gettext.install(domain,
+                    localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
+                    unicode=True)
index ed98df3c138f7f3fc2cfab7585872ef929682661..ae7feee23dc20328423191121835b447fadf0d17 100644 (file)
@@ -38,11 +38,21 @@ import functools
 import inspect
 import itertools
 import json
+import types
 import xmlrpclib
 
 from quantum.openstack.common import timeutils
 
 
+_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+                     inspect.isfunction, inspect.isgeneratorfunction,
+                     inspect.isgenerator, inspect.istraceback, inspect.isframe,
+                     inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+                     inspect.isabstract]
+
+_simple_types = (types.NoneType, int, basestring, bool, float, long)
+
+
 def to_primitive(value, convert_instances=False, convert_datetime=True,
                  level=0, max_depth=3):
     """Convert a complex object into primitives.
@@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
     Therefore, convert_instances=True is lossy ... be aware.
 
     """
-    nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
-             inspect.isfunction, inspect.isgeneratorfunction,
-             inspect.isgenerator, inspect.istraceback, inspect.isframe,
-             inspect.iscode, inspect.isbuiltin, inspect.isroutine,
-             inspect.isabstract]
-    for test in nasty:
-        if test(value):
-            return unicode(value)
-
-    # value of itertools.count doesn't get caught by inspects
-    # above and results in infinite loop when list(value) is called.
+    # handle obvious types first - order of basic types determined by running
+    # full tests on nova project, resulting in the following counts:
+    # 572754 <type 'NoneType'>
+    # 460353 <type 'int'>
+    # 379632 <type 'unicode'>
+    # 274610 <type 'str'>
+    # 199918 <type 'dict'>
+    # 114200 <type 'datetime.datetime'>
+    #  51817 <type 'bool'>
+    #  26164 <type 'list'>
+    #   6491 <type 'float'>
+    #    283 <type 'tuple'>
+    #     19 <type 'long'>
+    if isinstance(value, _simple_types):
+        return value
+
+    if isinstance(value, datetime.datetime):
+        if convert_datetime:
+            return timeutils.strtime(value)
+        else:
+            return value
+
+    # value of itertools.count doesn't get caught by nasty_type_tests
+    # and results in infinite loop when list(value) is called.
     if type(value) == itertools.count:
         return unicode(value)
 
@@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
                                       convert_datetime=convert_datetime,
                                       level=level,
                                       max_depth=max_depth)
+        if isinstance(value, dict):
+            return dict((k, recursive(v)) for k, v in value.iteritems())
+        elif isinstance(value, (list, tuple)):
+            return [recursive(lv) for lv in value]
+
         # It's not clear why xmlrpclib created their own DateTime type, but
         # for our purposes, make it a datetime type which is explicitly
         # handled
         if isinstance(value, xmlrpclib.DateTime):
             value = datetime.datetime(*tuple(value.timetuple())[:6])
 
-        if isinstance(value, (list, tuple)):
-            return [recursive(v) for v in value]
-        elif isinstance(value, dict):
-            return dict((k, recursive(v)) for k, v in value.iteritems())
-        elif convert_datetime and isinstance(value, datetime.datetime):
+        if convert_datetime and isinstance(value, datetime.datetime):
             return timeutils.strtime(value)
         elif hasattr(value, 'iteritems'):
             return recursive(dict(value.iteritems()), level=level + 1)
@@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             # Ignore class member vars.
             return recursive(value.__dict__, level=level + 1)
         else:
+            if any(test(value) for test in _nasty_type_tests):
+                return unicode(value)
             return value
     except TypeError:
         # Class objects are tricky since they may define something like
index a5ba8c22c80b9a42647f1ad5910b23c09f0eb898..31265236a75ee388a0767c61a560ede8e948d794 100644 (file)
@@ -112,9 +112,9 @@ generic_log_opts = [
 
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
-               default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
-                       '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
-                       '%(message)s',
+               default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+                       '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+                       '%(instance)s%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
                default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
@@ -432,14 +432,11 @@ def _setup_logging_from_conf():
     else:
         log_root.setLevel(logging.WARNING)
 
-    level = logging.NOTSET
     for pair in CONF.default_log_levels:
         mod, _sep, level_name = pair.partition('=')
         level = logging.getLevelName(level_name)
         logger = logging.getLogger(mod)
         logger.setLevel(level)
-        for handler in log_root.handlers:
-            logger.addHandler(handler)
 
 _loggers = {}
 
index 5224e01aa9495faac90a3074b1677a4ebb430f79..0f243fd8ae94021f563a7ea5b601391849d34b09 100644 (file)
@@ -19,7 +19,8 @@
 Network-related utilities and helper functions.
 """
 
-import logging
+from quantum.openstack.common import log as logging
+
 
 LOG = logging.getLogger(__name__)
 
index 5af9f9a2d2969e0bbe0a6458040e5c47cc5a645f..4a390e5014e4daee3294aa05872fd63ebcbff859 100644 (file)
@@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
 notifier_opts = [
     cfg.MultiStrOpt('notification_driver',
                     default=[],
-                    deprecated_name='list_notifier_drivers',
                     help='Driver or drivers to handle sending notifications'),
     cfg.StrOpt('default_notification_level',
                default='INFO',
index 43d125b50d3eda1aae8624f15e55621039b3b128..b0017c284b2a7443443dddee0b5866d692b33e63 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import datetime
+import time
+from oslo.config import cfg
+
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
+
+
+periodic_opts = [
+    cfg.BoolOpt('run_external_periodic_tasks',
+                default=True,
+                help=('Some periodic tasks can be run in a separate process. '
+                      'Should we run them here?')),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(periodic_opts)
 
 LOG = logging.getLogger(__name__)
 
+DEFAULT_INTERVAL = 60.0
+
+
+class InvalidPeriodicTaskArg(Exception):
+    message = _("Unexpected argument for periodic task creation: %(arg)s.")
+
 
 def periodic_task(*args, **kwargs):
     """Decorator to indicate that a method is a periodic task.
 
     This decorator can be used in two ways:
 
-        1. Without arguments '@periodic_task', this will be run on every tick
+        1. Without arguments '@periodic_task', this will be run on every cycle
            of the periodic scheduler.
 
-        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
-           run on every N ticks of the periodic scheduler.
+        2. With arguments:
+           @periodic_task(spacing=N [, run_immediately=[True|False]])
+           this will be run on approximately every N seconds. If this number is
+           negative the periodic task will be disabled. If the run_immediately
+           argument is provided and has a value of 'True', the first run of the
+           task will be shortly after task scheduler starts.  If
+           run_immediately is omitted or set to 'False', the first time the
+           task runs will be approximately N seconds after the task scheduler
+           starts.
     """
     def decorator(f):
+        # Test for old style invocation
+        if 'ticks_between_runs' in kwargs:
+            raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
+
+        # Control if run at all
         f._periodic_task = True
-        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+        f._periodic_external_ok = kwargs.pop('external_process_ok', False)
+        if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
+            f._periodic_enabled = False
+        else:
+            f._periodic_enabled = kwargs.pop('enabled', True)
+
+        # Control frequency
+        f._periodic_spacing = kwargs.pop('spacing', 0)
+        f._periodic_immediate = kwargs.pop('run_immediately', False)
+        if f._periodic_immediate:
+            f._periodic_last_run = None
+        else:
+            f._periodic_last_run = timeutils.utcnow()
         return f
 
     # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
@@ -59,7 +105,7 @@ class _PeriodicTasksMeta(type):
         super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
 
         # NOTE(sirp): if the attribute is not present then we must be the base
-        # class, so, go ahead and initialize it. If the attribute is present,
+        # class, so, go ahead an initialize it. If the attribute is present,
         # then we're a subclass so make a copy of it so we don't step on our
         # parent's toes.
         try:
@@ -68,20 +114,39 @@ class _PeriodicTasksMeta(type):
             cls._periodic_tasks = []
 
         try:
-            cls._ticks_to_skip = cls._ticks_to_skip.copy()
+            cls._periodic_last_run = cls._periodic_last_run.copy()
         except AttributeError:
-            cls._ticks_to_skip = {}
+            cls._periodic_last_run = {}
+
+        try:
+            cls._periodic_spacing = cls._periodic_spacing.copy()
+        except AttributeError:
+            cls._periodic_spacing = {}
 
-        # This uses __dict__ instead of
-        # inspect.getmembers(cls, inspect.ismethod) so only the methods of the
-        # current class are added when this class is scanned, and base classes
-        # are not added redundantly.
         for value in cls.__dict__.values():
             if getattr(value, '_periodic_task', False):
                 task = value
                 name = task.__name__
+
+                if task._periodic_spacing < 0:
+                    LOG.info(_('Skipping periodic task %(task)s because '
+                               'its interval is negative'),
+                             {'task': name})
+                    continue
+                if not task._periodic_enabled:
+                    LOG.info(_('Skipping periodic task %(task)s because '
+                               'it is disabled'),
+                             {'task': name})
+                    continue
+
+                # A periodic spacing of zero indicates that this task should
+                # be run every pass
+                if task._periodic_spacing == 0:
+                    task._periodic_spacing = None
+
                 cls._periodic_tasks.append((name, task))
-                cls._ticks_to_skip[name] = task._ticks_between_runs
+                cls._periodic_spacing[name] = task._periodic_spacing
+                cls._periodic_last_run[name] = task._periodic_last_run
 
 
 class PeriodicTasks(object):
@@ -89,27 +154,34 @@ class PeriodicTasks(object):
 
     def run_periodic_tasks(self, context, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
+        idle_for = DEFAULT_INTERVAL
         for task_name, task in self._periodic_tasks:
             full_task_name = '.'.join([self.__class__.__name__, task_name])
 
-            ticks_to_skip = self._ticks_to_skip[task_name]
-            if ticks_to_skip > 0:
-                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
-                            " ticks left until next run"),
-                          dict(full_task_name=full_task_name,
-                               ticks_to_skip=ticks_to_skip))
-                self._ticks_to_skip[task_name] -= 1
-                continue
+            now = timeutils.utcnow()
+            spacing = self._periodic_spacing[task_name]
+            last_run = self._periodic_last_run[task_name]
+
+            # If a periodic task is _nearly_ due, then we'll run it early
+            if spacing is not None and last_run is not None:
+                due = last_run + datetime.timedelta(seconds=spacing)
+                if not timeutils.is_soon(due, 0.2):
+                    idle_for = min(idle_for, timeutils.delta_seconds(now, due))
+                    continue
 
-            self._ticks_to_skip[task_name] = task._ticks_between_runs
-            LOG.debug(_("Running periodic task %(full_task_name)s"),
-                      dict(full_task_name=full_task_name))
+            if spacing is not None:
+                idle_for = min(idle_for, spacing)
+
+            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+            self._periodic_last_run[task_name] = timeutils.utcnow()
 
             try:
                 task(self, context)
             except Exception as e:
                 if raise_on_error:
                     raise
-                LOG.exception(_("Error during %(full_task_name)s:"
-                                " %(e)s"),
-                              dict(e=e, full_task_name=full_task_name))
+                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+                              locals())
+            time.sleep(0)
+
+        return idle_for
index 95b37f7de128244c18d872834ff394110fad778f..88789ea904c926d581d4f928bdacc0029124e838 100644 (file)
@@ -57,7 +57,6 @@ as it allows particular rules to be explicitly disabled.
 """
 
 import abc
-import logging
 import re
 import urllib
 
@@ -65,6 +64,7 @@ import urllib2
 
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
 
 
 LOG = logging.getLogger(__name__)
index 9287cad94fb16826354c28246821b92786e87653..4b806acbf62ad24aafd7d2ed073ec005e05a9576 100644 (file)
@@ -19,7 +19,6 @@
 System-level utilities and helper functions.
 """
 
-import logging
 import random
 import shlex
 
@@ -27,6 +26,7 @@ from eventlet.green import subprocess
 from eventlet import greenthread
 
 from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
 
 
 LOG = logging.getLogger(__name__)
index bd10bb777d9102a26280cadc5c1a5f0038c9696c..cef3c349e59a9af413ac0bb06e3eaf9e0d858f8e 100644 (file)
@@ -26,13 +26,13 @@ For some wrappers that add message versioning to rpc, see:
 """
 
 import inspect
-import logging
 
 from oslo.config import cfg
 
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import importutils
 from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
 
 
 LOG = logging.getLogger(__name__)
index f2bb8f5a77b57b8c7d5d71fa1ea1733c1b4b5f36..c3994f1b0eb375df1277611b4883b16285aeef68 100644 (file)
@@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
         ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
         args = message_data.get('args', {})
-        version = message_data.get('version', None)
+        version = message_data.get('version')
+        namespace = message_data.get('namespace')
         if not method:
             LOG.warn(_('no method for message: %s') % message_data)
             ctxt.reply(_('No method for message: %s') % message_data,
                        connection_pool=self.connection_pool)
             return
-        self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+        self.pool.spawn_n(self._process_data, ctxt, version, method,
+                          namespace, args)
 
-    def _process_data(self, ctxt, version, method, args):
+    def _process_data(self, ctxt, version, method, namespace, args):
         """Process a message in a new thread.
 
         If the proxy object we have has a dispatch method
@@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
         """
         ctxt.update_store()
         try:
-            rval = self.proxy.dispatch(ctxt, version, method, **args)
+            rval = self.proxy.dispatch(ctxt, version, method, namespace,
+                                       **args)
             # Check if the result was a generator
             if inspect.isgenerator(rval):
                 for x in rval:
index 7b428d9e0097b9d6e1a393463d1f749174ab54b6..76bb85a76bdd76f9169914474dbb250dd93b3364 100644 (file)
@@ -339,7 +339,7 @@ def deserialize_remote_exception(conf, data):
         if not issubclass(klass, Exception):
             raise TypeError("Can only deserialize Exceptions")
 
-        failure = klass(**failure.get('kwargs', {}))
+        failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
     except (AttributeError, TypeError, ImportError):
         return RemoteError(name, failure.get('message'), trace)
 
index de212227cb896660050ef2d449d1c9cc93df56b1..16487a7a7c7ea28a6f279bb3110ddc1f90678d72 100644 (file)
@@ -103,13 +103,16 @@ class RpcDispatcher(object):
         self.callbacks = callbacks
         super(RpcDispatcher, self).__init__()
 
-    def dispatch(self, ctxt, version, method, **kwargs):
+    def dispatch(self, ctxt, version, method, namespace, **kwargs):
         """Dispatch a message based on a requested version.
 
         :param ctxt: The request context
         :param version: The requested API version from the incoming message
         :param method: The method requested to be called by the incoming
                        message.
+        :param namespace: The namespace for the requested method.  If None,
+                          the dispatcher will look for a method on a callback
+                          object with no namespace set.
         :param kwargs: A dict of keyword arguments to be passed to the method.
 
         :returns: Whatever is returned by the underlying method that gets
@@ -120,13 +123,25 @@ class RpcDispatcher(object):
 
         had_compatible = False
         for proxyobj in self.callbacks:
-            if hasattr(proxyobj, 'RPC_API_VERSION'):
+            # Check for namespace compatibility
+            try:
+                cb_namespace = proxyobj.RPC_API_NAMESPACE
+            except AttributeError:
+                cb_namespace = None
+
+            if namespace != cb_namespace:
+                continue
+
+            # Check for version compatibility
+            try:
                 rpc_api_version = proxyobj.RPC_API_VERSION
-            else:
+            except AttributeError:
                 rpc_api_version = '1.0'
+
             is_compatible = rpc_common.version_is_compatible(rpc_api_version,
                                                              version)
             had_compatible = had_compatible or is_compatible
+
             if not hasattr(proxyobj, method):
                 continue
             if is_compatible:
index 7274802810cac263ee22627aac834e9b913d7137..ef4092856ab1985c6e0e5eed44d5b1b7f6f679f5 100644 (file)
@@ -57,13 +57,14 @@ class Consumer(object):
         self.topic = topic
         self.proxy = proxy
 
-    def call(self, context, version, method, args, timeout):
+    def call(self, context, version, method, namespace, args, timeout):
         done = eventlet.event.Event()
 
         def _inner():
             ctxt = RpcContext.from_dict(context.to_dict())
             try:
-                rval = self.proxy.dispatch(context, version, method, **args)
+                rval = self.proxy.dispatch(context, version, method,
+                                           namespace, **args)
                 res = []
                 # Caller might have called ctxt.reply() manually
                 for (reply, failure) in ctxt._response:
@@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
         return
     args = msg.get('args', {})
     version = msg.get('version', None)
+    namespace = msg.get('namespace', None)
 
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
         return iter([None])
     else:
-        return consumer.call(context, version, method, args, timeout)
+        return consumer.call(context, version, method, namespace, args,
+                             timeout)
 
 
 def call(conf, context, topic, msg, timeout=None):
@@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
         return
     args = msg.get('args', {})
     version = msg.get('version', None)
+    namespace = msg.get('namespace', None)
 
     for consumer in CONSUMERS.get(topic, []):
         try:
-            consumer.call(context, version, method, args, None)
+            consumer.call(context, version, method, namespace, args, None)
         except Exception:
             pass
index 7d3ed441ad196556013fbd090149671013fa6261..f56e14b03b633ed46e08c1410c7cccb0a2089e1e 100644 (file)
@@ -40,8 +40,8 @@ qpid_opts = [
     cfg.StrOpt('qpid_hostname',
                default='localhost',
                help='Qpid broker hostname'),
-    cfg.StrOpt('qpid_port',
-               default='5672',
+    cfg.IntOpt('qpid_port',
+               default=5672,
                help='Qpid broker port'),
     cfg.ListOpt('qpid_hosts',
                 default=['$qpid_hostname:$qpid_port'],
@@ -320,7 +320,7 @@ class Connection(object):
         # Reconnection is done by self.reconnect()
         self.connection.reconnect = False
         self.connection.heartbeat = self.conf.qpid_heartbeat
-        self.connection.protocol = self.conf.qpid_protocol
+        self.connection.transport = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
     def _register_consumer(self, consumer):
index 867a23ced05607a35d2cb83d745aaec00c30b045..d69abe4eef14951076bc643c7b1bed65f87b4c1f 100644 (file)
@@ -276,7 +276,8 @@ class InternalContext(object):
 
         try:
             result = proxy.dispatch(
-                ctx, data['version'], data['method'], **data['args'])
+                ctx, data['version'], data['method'],
+                data.get('namespace'), **data['args'])
             return ConsumerBase.normalize_reply(result, ctx.replies)
         except greenlet.GreenletExit:
             # ignore these since they are just from shutdowns
@@ -351,7 +352,7 @@ class ConsumerBase(object):
             return
 
         proxy.dispatch(ctx, data['version'],
-                       data['method'], **data['args'])
+                       data['method'], data.get('namespace'), **data['args'])
 
 
 class ZmqBaseReactor(ConsumerBase):
index 4fa0d98a3e8e99f64f89226089dd69fcfe0214dd..78650d1f2eb6994f21a3fca322c4b07970053a4e 100644 (file)
@@ -35,10 +35,10 @@ matchmaker_opts = [
                default='/etc/nova/matchmaker_ring.json',
                help='Matchmaker ring file (JSON)'),
     cfg.IntOpt('matchmaker_heartbeat_freq',
-               default='300',
+               default=300,
                help='Heartbeat frequency'),
     cfg.IntOpt('matchmaker_heartbeat_ttl',
-               default='600',
+               default=600,
                help='Heartbeat time-to-live.'),
 ]
 
index ed56d47f6c6657ef477f2834bd0351f5e38cf8a1..4946831f46447d8eeb2f1e5772fffda73c4dd913 100644 (file)
@@ -58,9 +58,13 @@ class RpcProxy(object):
         """Return the topic to use for a message."""
         return topic if topic else self.topic
 
+    @staticmethod
+    def make_namespaced_msg(method, namespace, **kwargs):
+        return {'method': method, 'namespace': namespace, 'args': kwargs}
+
     @staticmethod
     def make_msg(method, **kwargs):
-        return {'method': method, 'args': kwargs}
+        return RpcProxy.make_namespaced_msg(method, None, **kwargs)
 
     def call(self, context, msg, topic=None, version=None, timeout=None):
         """rpc.call() a remote method.
diff --git a/quantum/openstack/common/rpc/zmq_receiver.py b/quantum/openstack/common/rpc/zmq_receiver.py
new file mode 100755 (executable)
index 0000000..57c32d7
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+    CONF(sys.argv[1:], project='oslo')
+    logging.setup("oslo")
+
+    with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+        reactor.consume_in_thread()
+        reactor.wait()
index 8d49cf7de6aa157b87228229e1f7c0abd5169e5f..d84bdfc62ae27d7c89d3b463e356ceafc27f014e 100644 (file)
@@ -145,7 +145,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
             self.needs_resync = False
             self.sync_state()
 
-    @periodic_task.periodic_task(ticks_between_runs=6)
+    @periodic_task.periodic_task(spacing=6)
     def collect_stats(self, context):
         for pool_id in self.cache.get_pool_ids():
             try:
index f99d0c52b854256262fb02fa6e407a96a0bd81a8..a6b7bfd7868d9b49dff93d24332d4c4309d7c33b 100644 (file)
@@ -519,8 +519,8 @@ class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
             [call(None,
              {'args':
                  {'devices': ['fake_device']},
-             'method':
-                 'security_group_rules_for_devices'},
+             'method': 'security_group_rules_for_devices',
+             'namespace': None},
              version=sg_rpc.SG_RPC_VERSION,
              topic='fake_topic')])
 
@@ -544,7 +544,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
             [call(None,
                   {'args':
                       {'security_groups': ['fake_sgid']},
-                      'method': 'security_groups_rule_updated'},
+                      'method': 'security_groups_rule_updated',
+                      'namespace': None},
                   version=sg_rpc.SG_RPC_VERSION,
                   topic='fake-security_group-update')])
 
@@ -555,7 +556,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
             [call(None,
                   {'args':
                       {'security_groups': ['fake_sgid']},
-                      'method': 'security_groups_member_updated'},
+                      'method': 'security_groups_member_updated',
+                      'namespace': None},
                   version=sg_rpc.SG_RPC_VERSION,
                   topic='fake-security_group-update')])