]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Perform a sync with oslo-incubator.
authorClark Boylan <clark.boylan@gmail.com>
Wed, 22 May 2013 21:44:48 +0000 (14:44 -0700)
committerClark Boylan <clark.boylan@gmail.com>
Thu, 23 May 2013 22:58:33 +0000 (15:58 -0700)
This oslo-incubator sync pulls in a new log.py which will make quantum's
default log output format the same as nova, glance and cinder (once
cinder's corresponding oslo sync merges). This common log format
simplifies log indexing as part of CI and makes lives easier for
deployers.

This sync does add a requirement on six as jsonutils depends on it. It
updates install_venv_common.py to be python26 compatible. It also brings
in a bunch of recent python3 compatibility that was added to oslo.

Fixes bug 1183144

Change-Id: Id0f196d7b5680e5950e4a27d66042bf00ccd49e6

22 files changed:
quantum/openstack/common/eventlet_backdoor.py
quantum/openstack/common/exception.py
quantum/openstack/common/jsonutils.py
quantum/openstack/common/lockutils.py
quantum/openstack/common/log.py
quantum/openstack/common/loopingcall.py
quantum/openstack/common/periodic_task.py
quantum/openstack/common/policy.py
quantum/openstack/common/processutils.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/dispatcher.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/rpc/matchmaker.py
quantum/openstack/common/rpc/matchmaker_ring.py [new file with mode: 0644]
quantum/openstack/common/rpc/proxy.py
quantum/openstack/common/rpc/serializer.py [new file with mode: 0644]
quantum/openstack/common/service.py
quantum/openstack/common/threadgroup.py
tools/install_venv_common.py
tools/pip-requires

index c0ad460fe6e89a6edb3167671acfb73baca2e72c..57b89ae914ebdfcd4ad041135d749319b5f4de77 100644 (file)
@@ -16,6 +16,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from __future__ import print_function
+
 import gc
 import pprint
 import sys
@@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts)
 
 
 def _dont_use_this():
-    print "Don't use this, just disconnect instead"
+    print("Don't use this, just disconnect instead")
 
 
 def _find_objects(t):
@@ -46,16 +48,16 @@ def _find_objects(t):
 
 def _print_greenthreads():
     for i, gt in enumerate(_find_objects(greenlet.greenlet)):
-        print i, gt
+        print(i, gt)
         traceback.print_stack(gt.gr_frame)
-        print
+        print()
 
 
 def _print_nativethreads():
     for threadId, stack in sys._current_frames().items():
-        print threadId
+        print(threadId)
         traceback.print_stack(stack)
-        print
+        print()
 
 
 def initialize_if_enabled():
index 975a53a69211665667b0cc0869fa99da52a81bf9..05d3ca79234b45b5c90bf644a31f840ce5bb43ab 100644 (file)
@@ -98,7 +98,7 @@ def wrap_exception(f):
     def _wrap(*args, **kw):
         try:
             return f(*args, **kw)
-        except Exception, e:
+        except Exception as e:
             if not isinstance(e, Error):
                 #exc_type, exc_value, exc_traceback = sys.exc_info()
                 logging.exception(_('Uncaught exception'))
index ae7feee23dc20328423191121835b447fadf0d17..0c3b40e2f296a8c3c87781372b1d59180c03ac03 100644 (file)
@@ -41,6 +41,8 @@ import json
 import types
 import xmlrpclib
 
+import six
+
 from quantum.openstack.common import timeutils
 
 
@@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
     # value of itertools.count doesn't get caught by nasty_type_tests
     # and results in infinite loop when list(value) is called.
     if type(value) == itertools.count:
-        return unicode(value)
+        return six.text_type(value)
 
     # FIXME(vish): Workaround for LP bug 852095. Without this workaround,
     #              tests that raise an exception in a mocked method that
@@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             return recursive(value.__dict__, level=level + 1)
         else:
             if any(test(value) for test in _nasty_type_tests):
-                return unicode(value)
+                return six.text_type(value)
             return value
     except TypeError:
         # Class objects are tricky since they may define something like
         # __iter__ defined but it isn't callable as list().
-        return unicode(value)
+        return six.text_type(value)
 
 
 def dumps(value, default=to_primitive, **kwargs):
index f2e16e71f13d0ef9330ec5ae81a8da200242d7ba..d32fbd8156807c4c792a6414e0724ad34eb0adcf 100644 (file)
@@ -49,6 +49,10 @@ CONF = cfg.CONF
 CONF.register_opts(util_opts)
 
 
+def set_defaults(lock_path):
+    cfg.set_defaults(util_opts, lock_path=lock_path)
+
+
 class _InterProcessLock(object):
     """Lock implementation which allows multiple locks, working around
     issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
@@ -82,7 +86,7 @@ class _InterProcessLock(object):
                 # to have a laughable 10 attempts "blocking" mechanism.
                 self.trylock()
                 return self
-            except IOError, e:
+            except IOError as e:
                 if e.errno in (errno.EACCES, errno.EAGAIN):
                     # external locks synchronise things like iptables
                     # updates - give it some time to prevent busy spinning
@@ -247,3 +251,28 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
             return retval
         return inner
     return wrap
+
+
+def synchronized_with_prefix(lock_file_prefix):
+    """Partial object generator for the synchronization decorator.
+
+    Redefine @synchronized in each project like so::
+
+        (in nova/utils.py)
+        from nova.openstack.common import lockutils
+
+        synchronized = lockutils.synchronized_with_prefix('nova-')
+
+
+        (in nova/foo.py)
+        from nova import utils
+
+        @utils.synchronized('mylock')
+        def bar(self, *args):
+           ...
+
+    The lock_file_prefix argument is used to provide lock files on disk with a
+    meaningful prefix. The prefix should end with a hyphen ('-') if specified.
+    """
+
+    return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
index 31265236a75ee388a0767c61a560ede8e948d794..b167e896ad2af7c57d928307a5df848d94b3d8f9 100644 (file)
@@ -37,19 +37,17 @@ import logging
 import logging.config
 import logging.handlers
 import os
-import stat
 import sys
 import traceback
 
 from oslo.config import cfg
 
 from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
 from quantum.openstack.common import local
-from quantum.openstack.common import notifier
 
 
-_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
 _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
 
 common_cli_opts = [
@@ -74,11 +72,13 @@ logging_cli_opts = [
                     'documentation for details on logging configuration '
                     'files.'),
     cfg.StrOpt('log-format',
-               default=_DEFAULT_LOG_FORMAT,
+               default=None,
                metavar='FORMAT',
                help='A logging.Formatter log message format string which may '
                     'use any of the available logging.LogRecord attributes. '
-                    'Default: %(default)s'),
+                    'This option is deprecated.  Please use '
+                    'logging_context_format_string and '
+                    'logging_default_format_string instead.'),
     cfg.StrOpt('log-date-format',
                default=_DEFAULT_LOG_DATE_FORMAT,
                metavar='DATE_FORMAT',
@@ -104,10 +104,7 @@ logging_cli_opts = [
 generic_log_opts = [
     cfg.BoolOpt('use_stderr',
                 default=True,
-                help='Log output to standard error'),
-    cfg.StrOpt('logfile_mode',
-               default='0644',
-               help='Default file mode used when creating log files'),
+                help='Log output to standard error')
 ]
 
 log_opts = [
@@ -211,7 +208,27 @@ def _get_log_file_path(binary=None):
         return '%s.log' % (os.path.join(logdir, binary),)
 
 
-class ContextAdapter(logging.LoggerAdapter):
+class BaseLoggerAdapter(logging.LoggerAdapter):
+
+    def audit(self, msg, *args, **kwargs):
+        self.log(logging.AUDIT, msg, *args, **kwargs)
+
+
+class LazyAdapter(BaseLoggerAdapter):
+    def __init__(self, name='unknown', version='unknown'):
+        self._logger = None
+        self.extra = {}
+        self.name = name
+        self.version = version
+
+    @property
+    def logger(self):
+        if not self._logger:
+            self._logger = getLogger(self.name, self.version)
+        return self._logger
+
+
+class ContextAdapter(BaseLoggerAdapter):
     warn = logging.LoggerAdapter.warning
 
     def __init__(self, logger, project_name, version_string):
@@ -219,8 +236,9 @@ class ContextAdapter(logging.LoggerAdapter):
         self.project = project_name
         self.version = version_string
 
-    def audit(self, msg, *args, **kwargs):
-        self.log(logging.AUDIT, msg, *args, **kwargs)
+    @property
+    def handlers(self):
+        return self.logger.handlers
 
     def deprecated(self, msg, *args, **kwargs):
         stdmsg = _("Deprecated: %s") % msg
@@ -304,17 +322,6 @@ class JSONFormatter(logging.Formatter):
         return jsonutils.dumps(message)
 
 
-class PublishErrorsHandler(logging.Handler):
-    def emit(self, record):
-        if ('quantum.openstack.common.notifier.log_notifier' in
-                CONF.notification_driver):
-            return
-        notifier.api.notify(None, 'error.publisher',
-                            'error_notification',
-                            notifier.api.ERROR,
-                            dict(error=record.msg))
-
-
 def _create_logging_excepthook(product_name):
     def logging_excepthook(type, value, tb):
         extra = {}
@@ -340,7 +347,7 @@ class LogConfigError(Exception):
 def _load_log_config(log_config):
     try:
         logging.config.fileConfig(log_config)
-    except ConfigParser.Error, exc:
+    except ConfigParser.Error as exc:
         raise LogConfigError(log_config, str(exc))
 
 
@@ -399,11 +406,6 @@ def _setup_logging_from_conf():
         filelog = logging.handlers.WatchedFileHandler(logpath)
         log_root.addHandler(filelog)
 
-        mode = int(CONF.logfile_mode, 8)
-        st = os.stat(logpath)
-        if st.st_mode != (stat.S_IFREG | mode):
-            os.chmod(logpath, mode)
-
     if CONF.use_stderr:
         streamlog = ColorHandler()
         log_root.addHandler(streamlog)
@@ -415,15 +417,22 @@ def _setup_logging_from_conf():
         log_root.addHandler(streamlog)
 
     if CONF.publish_errors:
-        log_root.addHandler(PublishErrorsHandler(logging.ERROR))
+        handler = importutils.import_object(
+            "quantum.openstack.common.log_handler.PublishErrorsHandler",
+            logging.ERROR)
+        log_root.addHandler(handler)
 
+    datefmt = CONF.log_date_format
     for handler in log_root.handlers:
-        datefmt = CONF.log_date_format
+        # NOTE(alaski): CONF.log_format overrides everything currently.  This
+        # should be deprecated in favor of context aware formatting.
         if CONF.log_format:
             handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
                                                    datefmt=datefmt))
+            log_root.info('Deprecated: log_format is now deprecated and will '
+                          'be removed in the next release')
         else:
-            handler.setFormatter(LegacyFormatter(datefmt=datefmt))
+            handler.setFormatter(ContextFormatter(datefmt=datefmt))
 
     if CONF.debug:
         log_root.setLevel(logging.DEBUG)
@@ -449,6 +458,15 @@ def getLogger(name='unknown', version='unknown'):
     return _loggers[name]
 
 
+def getLazyLogger(name='unknown', version='unknown'):
+    """
+    create a pass-through logger that does not create the real logger
+    until it is really needed and delegates all calls to the real logger
+    once it is created
+    """
+    return LazyAdapter(name, version)
+
+
 class WritableLogger(object):
     """A thin wrapper that responds to `write` and logs."""
 
@@ -460,7 +478,7 @@ class WritableLogger(object):
         self.logger.log(self.level, msg)
 
 
-class LegacyFormatter(logging.Formatter):
+class ContextFormatter(logging.Formatter):
     """A context.RequestContext aware formatter configured through flags.
 
     The flags used to set format strings are: logging_context_format_string
index 68b3ab77292aad83d1df12a8d6f05cfa8a17ffd6..c75556542343fa0d6a0a199168d56ed2e4981789 100644 (file)
@@ -84,7 +84,7 @@ class FixedIntervalLoopingCall(LoopingCallBase):
                         LOG.warn(_('task run outlasted interval by %s sec') %
                                  -delay)
                     greenthread.sleep(delay if delay > 0 else 0)
-            except LoopingCallDone, e:
+            except LoopingCallDone as e:
                 self.stop()
                 done.send(e.retvalue)
             except Exception:
@@ -131,7 +131,7 @@ class DynamicLoopingCall(LoopingCallBase):
                     LOG.debug(_('Dynamic looping call sleeping for %.02f '
                                 'seconds'), idle)
                     greenthread.sleep(idle)
-            except LoopingCallDone, e:
+            except LoopingCallDone as e:
                 self.stop()
                 done.send(e.retvalue)
             except Exception:
index b0017c284b2a7443443dddee0b5866d692b33e63..50a35f41421db6cfbd1fd59fbfafef9368318453 100644 (file)
@@ -15,6 +15,7 @@
 
 import datetime
 import time
+
 from oslo.config import cfg
 
 from quantum.openstack.common.gettextutils import _
index 0b9eedc8786dc48822e9b36979d11f0f19dec31a..fa1bfa2383fee3131c01975efe02e98d7e86a6ca 100644 (file)
@@ -60,6 +60,7 @@ import abc
 import re
 import urllib
 
+import six
 import urllib2
 
 from quantum.openstack.common.gettextutils import _
@@ -436,7 +437,7 @@ def _parse_list_rule(rule):
             or_list.append(AndCheck(and_list))
 
     # If we have only one check, omit the "or"
-    if len(or_list) == 0:
+    if not or_list:
         return FalseCheck()
     elif len(or_list) == 1:
         return or_list[0]
@@ -738,6 +739,7 @@ class RuleCheck(Check):
 class RoleCheck(Check):
     def __call__(self, target, creds):
         """Check that there is a matching role in the cred dict."""
+
         return self.match.lower() in [x.lower() for x in creds['roles']]
 
 
@@ -774,5 +776,5 @@ class GenericCheck(Check):
         # TODO(termie): do dict inspection via dot syntax
         match = self.match % target
         if self.kind in creds:
-            return match == unicode(creds[self.kind])
+            return match == six.text_type(creds[self.kind])
         return False
index 3d1aabcf85430edba463bb7ecb976699eda24445..a05633ab2a36acac9d3a164f3e0ec963ca6b3456 100644 (file)
@@ -34,6 +34,11 @@ from quantum.openstack.common import log as logging
 LOG = logging.getLogger(__name__)
 
 
+class InvalidArgumentError(Exception):
+    def __init__(self, message=None):
+        super(InvalidArgumentError, self).__init__(message)
+
+
 class UnknownArgumentError(Exception):
     def __init__(self, message=None):
         super(UnknownArgumentError, self).__init__(message)
@@ -118,7 +123,7 @@ def execute(*cmd, **kwargs):
     elif isinstance(check_exit_code, int):
         check_exit_code = [check_exit_code]
 
-    if len(kwargs):
+    if kwargs:
         raise UnknownArgumentError(_('Got unknown keyword args '
                                      'to utils.execute: %r') % kwargs)
 
@@ -179,3 +184,64 @@ def execute(*cmd, **kwargs):
             #               call clean something up in between calls, without
             #               it two execute calls in a row hangs the second one
             greenthread.sleep(0)
+
+
+def trycmd(*args, **kwargs):
+    """
+    A wrapper around execute() to more easily handle warnings and errors.
+
+    Returns an (out, err) tuple of strings containing the output of
+    the command's stdout and stderr.  If 'err' is not empty then the
+    command can be considered to have failed.
+
+    :discard_warnings   True | False. Defaults to False. If set to True,
+                        then for succeeding commands, stderr is cleared
+
+    """
+    discard_warnings = kwargs.pop('discard_warnings', False)
+
+    try:
+        out, err = execute(*args, **kwargs)
+        failed = False
+    except ProcessExecutionError, exn:
+        out, err = '', str(exn)
+        failed = True
+
+    if not failed and discard_warnings and err:
+        # Handle commands that output to stderr but otherwise succeed
+        err = ''
+
+    return out, err
+
+
+def ssh_execute(ssh, cmd, process_input=None,
+                addl_env=None, check_exit_code=True):
+    LOG.debug(_('Running cmd (SSH): %s'), cmd)
+    if addl_env:
+        raise InvalidArgumentError(_('Environment not supported over SSH'))
+
+    if process_input:
+        # This is (probably) fixable if we need it...
+        raise InvalidArgumentError(_('process_input not supported over SSH'))
+
+    stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
+    channel = stdout_stream.channel
+
+    # NOTE(justinsb): This seems suspicious...
+    # ...other SSH clients have buffering issues with this approach
+    stdout = stdout_stream.read()
+    stderr = stderr_stream.read()
+    stdin_stream.close()
+
+    exit_status = channel.recv_exit_status()
+
+    # exit_status == -1 if no exit code was returned
+    if exit_status != -1:
+        LOG.debug(_('Result was %s') % exit_status)
+        if check_exit_code and exit_status != 0:
+            raise ProcessExecutionError(exit_code=exit_status,
+                                        stdout=stdout,
+                                        stderr=stderr,
+                                        cmd=cmd)
+
+    return (stdout, stderr)
index c3994f1b0eb375df1277611b4883b16285aeef68..f7ce5907c993cba7e2eecc17fc18588c7d5a73e2 100644 (file)
@@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext):
         msg_id = message_data.pop('_msg_id', None)
         waiter = self._call_waiters.get(msg_id)
         if not waiter:
-            LOG.warn(_('no calling threads waiting for msg_id : %s'
-                       ', message : %s') % (msg_id, message_data))
+            LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+                       ', message : %(data)s'), {'msg_id': msg_id,
+                                                 'data': message_data})
         else:
             waiter.put(message_data)
 
index 5fc7ecbb3c2210108c06feac1d5133cb15c0ecfb..f2728ecb62cd7626f084612d1525a5746f10a618 100644 (file)
@@ -22,6 +22,7 @@ import sys
 import traceback
 
 from oslo.config import cfg
+import six
 
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import importutils
@@ -157,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
                 "not supported by this endpoint.")
 
 
+class RpcVersionCapError(RPCException):
+    message = _("Specified RPC version cap, %(version_cap)s, is too low")
+
+
 class Connection(object):
     """A connection, returned by rpc.create_connection().
 
@@ -299,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True):
     tb = traceback.format_exception(*failure_info)
     failure = failure_info[1]
     if log_failure:
-        LOG.error(_("Returning exception %s to caller"), unicode(failure))
+        LOG.error(_("Returning exception %s to caller"),
+                  six.text_type(failure))
         LOG.error(tb)
 
     kwargs = {}
@@ -309,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
     data = {
         'class': str(failure.__class__.__name__),
         'module': str(failure.__class__.__module__),
-        'message': unicode(failure),
+        'message': six.text_type(failure),
         'tb': tb,
         'args': failure.args,
         'kwargs': kwargs
index 16487a7a7c7ea28a6f279bb3110ddc1f90678d72..2b5fa06fa2321a26464ddce110138e588df8844c 100644 (file)
@@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
 """
 
 from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import serializer as rpc_serializer
 
 
 class RpcDispatcher(object):
@@ -93,16 +94,38 @@ class RpcDispatcher(object):
     contains a list of underlying managers that have an API_VERSION attribute.
     """
 
-    def __init__(self, callbacks):
+    def __init__(self, callbacks, serializer=None):
         """Initialize the rpc dispatcher.
 
         :param callbacks: List of proxy objects that are an instance
                           of a class with rpc methods exposed.  Each proxy
                           object should have an RPC_API_VERSION attribute.
+        :param serializer: The Serializer object that will be used to
+                           deserialize arguments before the method call and
+                           to serialize the result after it returns.
         """
         self.callbacks = callbacks
+        if serializer is None:
+            serializer = rpc_serializer.NoOpSerializer()
+        self.serializer = serializer
         super(RpcDispatcher, self).__init__()
 
+    def _deserialize_args(self, context, kwargs):
+        """Helper method called to deserialize args before dispatch.
+
+        This calls our serializer on each argument, returning a new set of
+        args that have been deserialized.
+
+        :param context: The request context
+        :param kwargs: The arguments to be deserialized
+        :returns: A new set of deserialized args
+        """
+        new_kwargs = dict()
+        for argname, arg in kwargs.iteritems():
+            new_kwargs[argname] = self.serializer.deserialize_entity(context,
+                                                                     arg)
+        return new_kwargs
+
     def dispatch(self, ctxt, version, method, namespace, **kwargs):
         """Dispatch a message based on a requested version.
 
@@ -145,7 +168,9 @@ class RpcDispatcher(object):
             if not hasattr(proxyobj, method):
                 continue
             if is_compatible:
-                return getattr(proxyobj, method)(ctxt, **kwargs)
+                kwargs = self._deserialize_args(ctxt, kwargs)
+                result = getattr(proxyobj, method)(ctxt, **kwargs)
+                return self.serializer.serialize_entity(ctxt, result)
 
         if had_compatible:
             raise AttributeError("No such RPC function '%s'" % method)
index 951dd6c649bea720f5001d0c1507fc2119334585..5a677743e550fe93cee87363fda7f8d0c316bc93 100644 (file)
@@ -375,7 +375,7 @@ class Connection(object):
             try:
                 return method(*args, **kwargs)
             except (qpid_exceptions.Empty,
-                    qpid_exceptions.ConnectionError), e:
+                    qpid_exceptions.ConnectionError) as e:
                 if error_callback:
                     error_callback(e)
                 self.reconnect()
index 170e8a109fe0f901244d172c31a9871c54fac093..92318681fa6f880ec98adbef7b1f4088de8ae5b6 100644 (file)
@@ -180,7 +180,7 @@ class ZmqSocket(object):
             return
 
         # We must unsubscribe, or we'll leak descriptors.
-        if len(self.subscriptions) > 0:
+        if self.subscriptions:
             for f in self.subscriptions:
                 try:
                     self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@@ -763,7 +763,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
     LOG.debug(_("Sending message(s) to: %s"), queues)
 
     # Don't stack if we have no matchmaker results
-    if len(queues) == 0:
+    if not queues:
         LOG.warn(_("No matchmaker results. Not casting."))
         # While not strictly a timeout, callers know how to handle
         # this exception and a timeout isn't too big a lie.
@@ -846,6 +846,11 @@ def _get_ctxt():
 def _get_matchmaker(*args, **kwargs):
     global matchmaker
     if not matchmaker:
-        matchmaker = importutils.import_object(
-            CONF.rpc_zmq_matchmaker, *args, **kwargs)
+        mm = CONF.rpc_zmq_matchmaker
+        if mm.endswith('matchmaker.MatchMakerRing'):
+            mm.replace('matchmaker', 'matchmaker_ring')
+            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
+                       ' %(new)s instead') % dict(
+                     orig=CONF.rpc_zmq_matchmaker, new=mm))
+        matchmaker = importutils.import_object(mm, *args, **kwargs)
     return matchmaker
index 78650d1f2eb6994f21a3fca322c4b07970053a4e..de6b35aaee75af929973df530527046be007f1a5 100644 (file)
@@ -19,8 +19,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
 """
 
 import contextlib
-import itertools
-import json
 
 import eventlet
 from oslo.config import cfg
@@ -30,10 +28,6 @@ from quantum.openstack.common import log as logging
 
 
 matchmaker_opts = [
-    # Matchmaker ring file
-    cfg.StrOpt('matchmaker_ringfile',
-               default='/etc/nova/matchmaker_ring.json',
-               help='Matchmaker ring file (JSON)'),
     cfg.IntOpt('matchmaker_heartbeat_freq',
                default=300,
                help='Heartbeat frequency'),
@@ -236,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         self.hosts.discard(host)
         self.backend_unregister(key, '.'.join((key, host)))
 
-        LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+        LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+                 {'key': key, 'host': host})
 
     def start_heartbeat(self):
         """
@@ -245,7 +240,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
         yielding for CONF.matchmaker_heartbeat_freq seconds
         between iterations.
         """
-        if len(self.hosts) == 0:
+        if not self.hosts:
             raise MatchMakerException(
                 _("Register before starting heartbeat."))
 
@@ -304,67 +299,6 @@ class StubExchange(Exchange):
         return [(key, None)]
 
 
-class RingExchange(Exchange):
-    """
-    Match Maker where hosts are loaded from a static file containing
-    a hashmap (JSON formatted).
-
-    __init__ takes optional ring dictionary argument, otherwise
-    loads the ringfile from CONF.mathcmaker_ringfile.
-    """
-    def __init__(self, ring=None):
-        super(RingExchange, self).__init__()
-
-        if ring:
-            self.ring = ring
-        else:
-            fh = open(CONF.matchmaker_ringfile, 'r')
-            self.ring = json.load(fh)
-            fh.close()
-
-        self.ring0 = {}
-        for k in self.ring.keys():
-            self.ring0[k] = itertools.cycle(self.ring[k])
-
-    def _ring_has(self, key):
-        if key in self.ring0:
-            return True
-        return False
-
-
-class RoundRobinRingExchange(RingExchange):
-    """A Topic Exchange based on a hashmap."""
-    def __init__(self, ring=None):
-        super(RoundRobinRingExchange, self).__init__(ring)
-
-    def run(self, key):
-        if not self._ring_has(key):
-            LOG.warn(
-                _("No key defining hosts for topic '%s', "
-                  "see ringfile") % (key, )
-            )
-            return []
-        host = next(self.ring0[key])
-        return [(key + '.' + host, host)]
-
-
-class FanoutRingExchange(RingExchange):
-    """Fanout Exchange based on a hashmap."""
-    def __init__(self, ring=None):
-        super(FanoutRingExchange, self).__init__(ring)
-
-    def run(self, key):
-        # Assume starts with "fanout~", strip it for lookup.
-        nkey = key.split('fanout~')[1:][0]
-        if not self._ring_has(nkey):
-            LOG.warn(
-                _("No key defining hosts for topic '%s', "
-                  "see ringfile") % (nkey, )
-            )
-            return []
-        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
 class LocalhostExchange(Exchange):
     """Exchange where all direct topics are local."""
     def __init__(self, host='localhost'):
@@ -388,17 +322,6 @@ class DirectExchange(Exchange):
         return [(key, e)]
 
 
-class MatchMakerRing(MatchMakerBase):
-    """
-    Match Maker where hosts are loaded from a static hashmap.
-    """
-    def __init__(self, ring=None):
-        super(MatchMakerRing, self).__init__()
-        self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
-        self.add_binding(DirectBinding(), DirectExchange())
-        self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
-
-
 class MatchMakerLocalhost(MatchMakerBase):
     """
     Match Maker where all bare topics resolve to localhost.
diff --git a/quantum/openstack/common/rpc/matchmaker_ring.py b/quantum/openstack/common/rpc/matchmaker_ring.py
new file mode 100644 (file)
index 0000000..fcc27b7
--- /dev/null
@@ -0,0 +1,114 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011-2013 Cloudscaling Group, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import itertools
+import json
+
+from oslo.config import cfg
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import matchmaker as mm
+
+
+matchmaker_opts = [
+    # Matchmaker ring file
+    cfg.StrOpt('ringfile',
+               deprecated_name='matchmaker_ringfile',
+               deprecated_group='DEFAULT',
+               default='/etc/oslo/matchmaker_ring.json',
+               help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
+LOG = logging.getLogger(__name__)
+
+
+class RingExchange(mm.Exchange):
+    """
+    Match Maker where hosts are loaded from a static file containing
+    a hashmap (JSON formatted).
+
+    __init__ takes optional ring dictionary argument, otherwise
+    loads the ringfile from CONF.mathcmaker_ringfile.
+    """
+    def __init__(self, ring=None):
+        super(RingExchange, self).__init__()
+
+        if ring:
+            self.ring = ring
+        else:
+            fh = open(CONF.matchmaker_ring.ringfile, 'r')
+            self.ring = json.load(fh)
+            fh.close()
+
+        self.ring0 = {}
+        for k in self.ring.keys():
+            self.ring0[k] = itertools.cycle(self.ring[k])
+
+    def _ring_has(self, key):
+        if key in self.ring0:
+            return True
+        return False
+
+
+class RoundRobinRingExchange(RingExchange):
+    """A Topic Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(RoundRobinRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        if not self._ring_has(key):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (key, )
+            )
+            return []
+        host = next(self.ring0[key])
+        return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+    """Fanout Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(FanoutRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        # Assume starts with "fanout~", strip it for lookup.
+        nkey = key.split('fanout~')[1:][0]
+        if not self._ring_has(nkey):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (nkey, )
+            )
+            return []
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class MatchMakerRing(mm.MatchMakerBase):
+    """
+    Match Maker where hosts are loaded from a static hashmap.
+    """
+    def __init__(self, ring=None):
+        super(MatchMakerRing, self).__init__()
+        self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
+        self.add_binding(mm.DirectBinding(), mm.DirectExchange())
+        self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))
index 4946831f46447d8eeb2f1e5772fffda73c4dd913..c06ad54dfd867cbb62174c29d32dd73af13a4a05 100644 (file)
@@ -1,6 +1,6 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
-# Copyright 2012 Red Hat, Inc.
+# Copyright 2012-2013 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -23,6 +23,8 @@ For more information about rpc API version numbers, see:
 
 
 from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import serializer as rpc_serializer
 
 
 class RpcProxy(object):
@@ -34,16 +36,28 @@ class RpcProxy(object):
     rpc API.
     """
 
-    def __init__(self, topic, default_version):
+    # The default namespace, which can be overriden in a subclass.
+    RPC_API_NAMESPACE = None
+
+    def __init__(self, topic, default_version, version_cap=None,
+                 serializer=None):
         """Initialize an RpcProxy.
 
         :param topic: The topic to use for all messages.
         :param default_version: The default API version to request in all
                outgoing messages.  This can be overridden on a per-message
                basis.
+        :param version_cap: Optionally cap the maximum version used for sent
+               messages.
+        :param serializer: Optionaly (de-)serialize entities with a
+               provided helper.
         """
         self.topic = topic
         self.default_version = default_version
+        self.version_cap = version_cap
+        if serializer is None:
+            serializer = rpc_serializer.NoOpSerializer()
+        self.serializer = serializer
         super(RpcProxy, self).__init__()
 
     def _set_version(self, msg, vers):
@@ -52,7 +66,11 @@ class RpcProxy(object):
         :param msg: The message having a version added to it.
         :param vers: The version number to add to the message.
         """
-        msg['version'] = vers if vers else self.default_version
+        v = vers if vers else self.default_version
+        if (self.version_cap and not
+                rpc_common.version_is_compatible(self.version_cap, v)):
+            raise rpc_common.RpcVersionCapError(version=self.version_cap)
+        msg['version'] = v
 
     def _get_topic(self, topic):
         """Return the topic to use for a message."""
@@ -62,9 +80,25 @@ class RpcProxy(object):
     def make_namespaced_msg(method, namespace, **kwargs):
         return {'method': method, 'namespace': namespace, 'args': kwargs}
 
-    @staticmethod
-    def make_msg(method, **kwargs):
-        return RpcProxy.make_namespaced_msg(method, None, **kwargs)
+    def make_msg(self, method, **kwargs):
+        return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
+                                        **kwargs)
+
+    def _serialize_msg_args(self, context, kwargs):
+        """Helper method called to serialize message arguments.
+
+        This calls our serializer on each argument, returning a new
+        set of args that have been serialized.
+
+        :param context: The request context
+        :param kwargs: The arguments to serialize
+        :returns: A new set of serialized arguments
+        """
+        new_kwargs = dict()
+        for argname, arg in kwargs.iteritems():
+            new_kwargs[argname] = self.serializer.serialize_entity(context,
+                                                                   arg)
+        return new_kwargs
 
     def call(self, context, msg, topic=None, version=None, timeout=None):
         """rpc.call() a remote method.
@@ -81,9 +115,11 @@ class RpcProxy(object):
         :returns: The return value from the remote method.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         real_topic = self._get_topic(topic)
         try:
-            return rpc.call(context, real_topic, msg, timeout)
+            result = rpc.call(context, real_topic, msg, timeout)
+            return self.serializer.deserialize_entity(context, result)
         except rpc.common.Timeout as exc:
             raise rpc.common.Timeout(
                 exc.info, real_topic, msg.get('method'))
@@ -104,9 +140,11 @@ class RpcProxy(object):
                   from the remote method as they arrive.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         real_topic = self._get_topic(topic)
         try:
-            return rpc.multicall(context, real_topic, msg, timeout)
+            result = rpc.multicall(context, real_topic, msg, timeout)
+            return self.serializer.deserialize_entity(context, result)
         except rpc.common.Timeout as exc:
             raise rpc.common.Timeout(
                 exc.info, real_topic, msg.get('method'))
@@ -124,6 +162,7 @@ class RpcProxy(object):
                   remote method.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         rpc.cast(context, self._get_topic(topic), msg)
 
     def fanout_cast(self, context, msg, topic=None, version=None):
@@ -139,6 +178,7 @@ class RpcProxy(object):
                   from the remote method.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         rpc.fanout_cast(context, self._get_topic(topic), msg)
 
     def cast_to_server(self, context, server_params, msg, topic=None,
@@ -157,6 +197,7 @@ class RpcProxy(object):
                   return values.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
 
     def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@@ -175,5 +216,6 @@ class RpcProxy(object):
                   return values.
         """
         self._set_version(msg, version)
+        msg['args'] = self._serialize_msg_args(context, msg['args'])
         rpc.fanout_cast_to_server(context, server_params,
                                   self._get_topic(topic), msg)
diff --git a/quantum/openstack/common/rpc/serializer.py b/quantum/openstack/common/rpc/serializer.py
new file mode 100644 (file)
index 0000000..0a2c9c4
--- /dev/null
@@ -0,0 +1,52 @@
+#    Copyright 2013 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+
+class Serializer(object):
+    """Generic (de-)serialization definition base class"""
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def serialize_entity(self, context, entity):
+        """Serialize something to primitive form.
+
+        :param context: Security context
+        :param entity: Entity to be serialized
+        :returns: Serialized form of entity
+        """
+        pass
+
+    @abc.abstractmethod
+    def deserialize_entity(self, context, entity):
+        """Deserialize something from primitive form.
+
+        :param context: Security context
+        :param entity: Primitive to be deserialized
+        :returns: Deserialized form of entity
+        """
+        pass
+
+
+class NoOpSerializer(Serializer):
+    """A serializer that does nothing"""
+
+    def serialize_entity(self, context, entity):
+        return entity
+
+    def deserialize_entity(self, context, entity):
+        return entity
index 3e8514475aedda5e86e43c0b9071087894fe6fc1..05294fe776414531a11ae27a79534da93634b5bb 100644 (file)
@@ -52,7 +52,7 @@ class Launcher(object):
 
         """
         self._services = threadgroup.ThreadGroup()
-        eventlet_backdoor.initialize_if_enabled()
+        self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
 
     @staticmethod
     def run_service(service):
@@ -72,6 +72,7 @@ class Launcher(object):
         :returns: None
 
         """
+        service.backdoor_port = self.backdoor_port
         self._services.add_thread(self.run_service, service)
 
     def stop(self):
index 4442cf7cfe696eadd75c64b48cb8e9cb5f8a78ae..e47873db40ecbb538e5f98333df98e16c4ec637c 100644 (file)
@@ -61,6 +61,13 @@ class ThreadGroup(object):
         self.threads = []
         self.timers = []
 
+    def add_dynamic_timer(self, callback, initial_delay=None,
+                          periodic_interval_max=None, *args, **kwargs):
+        timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
+        timer.start(initial_delay=initial_delay,
+                    periodic_interval_max=periodic_interval_max)
+        self.timers.append(timer)
+
     def add_timer(self, interval, callback, initial_delay=None,
                   *args, **kwargs):
         pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
index 413065640f42ea46bcf1829fb6b40858bb76185d..42a44e8cd266b2b7b6a116c6c44362060c0f933c 100644 (file)
 """Provides methods needed by installation script for OpenStack development
 virtual environments.
 
+Since this script is used to bootstrap a virtualenv from the system's Python
+environment, it should be kept strictly compatible with Python 2.6.
+
 Synced in from openstack-common
 """
 
-import argparse
+from __future__ import print_function
+
+import optparse
 import os
 import subprocess
 import sys
@@ -39,7 +44,7 @@ class InstallVenv(object):
         self.project = project
 
     def die(self, message, *args):
-        print >> sys.stderr, message % args
+        print(message % args, file=sys.stderr)
         sys.exit(1)
 
     def check_python_version(self):
@@ -86,20 +91,20 @@ class InstallVenv(object):
         virtual environment.
         """
         if not os.path.isdir(self.venv):
-            print 'Creating venv...',
+            print('Creating venv...', end=' ')
             if no_site_packages:
                 self.run_command(['virtualenv', '-q', '--no-site-packages',
                                  self.venv])
             else:
                 self.run_command(['virtualenv', '-q', self.venv])
-            print 'done.'
-            print 'Installing pip in venv...',
+            print('done.')
+            print('Installing pip in venv...', end=' ')
             if not self.run_command(['tools/with_venv.sh', 'easy_install',
                                     'pip>1.0']).strip():
                 self.die("Failed to install pip.")
-            print 'done.'
+            print('done.')
         else:
-            print "venv already exists..."
+            print("venv already exists...")
             pass
 
     def pip_install(self, *args):
@@ -108,7 +113,7 @@ class InstallVenv(object):
                          redirect_output=False)
 
     def install_dependencies(self):
-        print 'Installing dependencies with pip (this can take a while)...'
+        print('Installing dependencies with pip (this can take a while)...')
 
         # First things first, make sure our venv has the latest pip and
         # distribute.
@@ -131,12 +136,12 @@ class InstallVenv(object):
 
     def parse_args(self, argv):
         """Parses command-line arguments."""
-        parser = argparse.ArgumentParser()
-        parser.add_argument('-n', '--no-site-packages',
-                            action='store_true',
-                            help="Do not inherit packages from global Python "
-                                 "install")
-        return parser.parse_args(argv[1:])
+        parser = optparse.OptionParser()
+        parser.add_option('-n', '--no-site-packages',
+                          action='store_true',
+                          help="Do not inherit packages from global Python "
+                               "install")
+        return parser.parse_args(argv[1:])[0]
 
 
 class Distro(InstallVenv):
@@ -150,12 +155,12 @@ class Distro(InstallVenv):
             return
 
         if self.check_cmd('easy_install'):
-            print 'Installing virtualenv via easy_install...',
+            print('Installing virtualenv via easy_install...', end=' ')
             if self.run_command(['easy_install', 'virtualenv']):
-                print 'Succeeded'
+                print('Succeeded')
                 return
             else:
-                print 'Failed'
+                print('Failed')
 
         self.die('ERROR: virtualenv not found.\n\n%s development'
                  ' requires virtualenv, please install it using your'
@@ -180,10 +185,6 @@ class Fedora(Distro):
         return self.run_command_with_code(['rpm', '-q', pkg],
                                           check_exit_code=False)[1] == 0
 
-    def yum_install(self, pkg, **kwargs):
-        print "Attempting to install '%s' via yum" % pkg
-        self.run_command(['sudo', 'yum', 'install', '-y', pkg], **kwargs)
-
     def apply_patch(self, originalfile, patchfile):
         self.run_command(['patch', '-N', originalfile, patchfile],
                          check_exit_code=False)
@@ -193,7 +194,7 @@ class Fedora(Distro):
             return
 
         if not self.check_pkg('python-virtualenv'):
-            self.yum_install('python-virtualenv', check_exit_code=False)
+            self.die("Please install 'python-virtualenv'.")
 
         super(Fedora, self).install_virtualenv()
 
@@ -206,12 +207,13 @@ class Fedora(Distro):
         This can be removed when the fix is applied upstream.
 
         Nova: https://bugs.launchpad.net/nova/+bug/884915
-        Upstream: https://bitbucket.org/which_linden/eventlet/issue/89
+        Upstream: https://bitbucket.org/eventlet/eventlet/issue/89
+        RHEL: https://bugzilla.redhat.com/958868
         """
 
         # Install "patch" program if it's not there
         if not self.check_pkg('patch'):
-            self.yum_install('patch')
+            self.die("Please install 'patch'.")
 
         # Apply the eventlet patch
         self.apply_patch(os.path.join(self.venv, 'lib', self.py_version,
index 5c6384fed434f99706e08c16325ae35b68b64dae..4d82fd089df28f13e997e6e53b4ceb5f00d747ad 100644 (file)
@@ -19,6 +19,7 @@ WebOb>=1.2
 python-keystoneclient>=0.2.0
 alembic>=0.4.1
 http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config
+six
 
 # Cisco plugin dependencies
 python-novaclient