# License for the specific language governing permissions and limitations
# under the License.
+from __future__ import print_function
+
import gc
import pprint
import sys
def _dont_use_this():
- print "Don't use this, just disconnect instead"
+ print("Don't use this, just disconnect instead")
def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
- print i, gt
+ print(i, gt)
traceback.print_stack(gt.gr_frame)
- print
+ print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
- print threadId
+ print(threadId)
traceback.print_stack(stack)
- print
+ print()
def initialize_if_enabled():
def _wrap(*args, **kw):
try:
return f(*args, **kw)
- except Exception, e:
+ except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))
import types
import xmlrpclib
+import six
+
from quantum.openstack.common import timeutils
# value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
- return unicode(value)
+ return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
return recursive(value.__dict__, level=level + 1)
else:
if any(test(value) for test in _nasty_type_tests):
- return unicode(value)
+ return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
- return unicode(value)
+ return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):
CONF.register_opts(util_opts)
+def set_defaults(lock_path):
+ cfg.set_defaults(util_opts, lock_path=lock_path)
+
+
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
- except IOError, e:
+ except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
return retval
return inner
return wrap
+
+
+def synchronized_with_prefix(lock_file_prefix):
+ """Partial object generator for the synchronization decorator.
+
+ Redefine @synchronized in each project like so::
+
+ (in nova/utils.py)
+ from nova.openstack.common import lockutils
+
+ synchronized = lockutils.synchronized_with_prefix('nova-')
+
+
+ (in nova/foo.py)
+ from nova import utils
+
+ @utils.synchronized('mylock')
+ def bar(self, *args):
+ ...
+
+ The lock_file_prefix argument is used to provide lock files on disk with a
+ meaningful prefix. The prefix should end with a hyphen ('-') if specified.
+ """
+
+ return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
import logging.config
import logging.handlers
import os
-import stat
import sys
import traceback
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import local
-from quantum.openstack.common import notifier
-_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
- default=_DEFAULT_LOG_FORMAT,
+ default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
- 'Default: %(default)s'),
+ 'This option is deprecated. Please use '
+ 'logging_context_format_string and '
+ 'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
- help='Log output to standard error'),
- cfg.StrOpt('logfile_mode',
- default='0644',
- help='Default file mode used when creating log files'),
+ help='Log output to standard error')
]
log_opts = [
return '%s.log' % (os.path.join(logdir, binary),)
-class ContextAdapter(logging.LoggerAdapter):
+class BaseLoggerAdapter(logging.LoggerAdapter):
+
+ def audit(self, msg, *args, **kwargs):
+ self.log(logging.AUDIT, msg, *args, **kwargs)
+
+
+class LazyAdapter(BaseLoggerAdapter):
+ def __init__(self, name='unknown', version='unknown'):
+ self._logger = None
+ self.extra = {}
+ self.name = name
+ self.version = version
+
+ @property
+ def logger(self):
+ if not self._logger:
+ self._logger = getLogger(self.name, self.version)
+ return self._logger
+
+
+class ContextAdapter(BaseLoggerAdapter):
warn = logging.LoggerAdapter.warning
def __init__(self, logger, project_name, version_string):
self.project = project_name
self.version = version_string
- def audit(self, msg, *args, **kwargs):
- self.log(logging.AUDIT, msg, *args, **kwargs)
+ @property
+ def handlers(self):
+ return self.logger.handlers
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated: %s") % msg
return jsonutils.dumps(message)
-class PublishErrorsHandler(logging.Handler):
- def emit(self, record):
- if ('quantum.openstack.common.notifier.log_notifier' in
- CONF.notification_driver):
- return
- notifier.api.notify(None, 'error.publisher',
- 'error_notification',
- notifier.api.ERROR,
- dict(error=record.msg))
-
-
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
- except ConfigParser.Error, exc:
+ except ConfigParser.Error as exc:
raise LogConfigError(log_config, str(exc))
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
- mode = int(CONF.logfile_mode, 8)
- st = os.stat(logpath)
- if st.st_mode != (stat.S_IFREG | mode):
- os.chmod(logpath, mode)
-
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
log_root.addHandler(streamlog)
if CONF.publish_errors:
- log_root.addHandler(PublishErrorsHandler(logging.ERROR))
+ handler = importutils.import_object(
+ "quantum.openstack.common.log_handler.PublishErrorsHandler",
+ logging.ERROR)
+ log_root.addHandler(handler)
+ datefmt = CONF.log_date_format
for handler in log_root.handlers:
- datefmt = CONF.log_date_format
+ # NOTE(alaski): CONF.log_format overrides everything currently. This
+ # should be deprecated in favor of context aware formatting.
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
+ log_root.info('Deprecated: log_format is now deprecated and will '
+ 'be removed in the next release')
else:
- handler.setFormatter(LegacyFormatter(datefmt=datefmt))
+ handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)
return _loggers[name]
+def getLazyLogger(name='unknown', version='unknown'):
+ """
+ create a pass-through logger that does not create the real logger
+ until it is really needed and delegates all calls to the real logger
+ once it is created
+ """
+ return LazyAdapter(name, version)
+
+
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
self.logger.log(self.level, msg)
-class LegacyFormatter(logging.Formatter):
+class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
- except LoopingCallDone, e:
+ except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
- except LoopingCallDone, e:
+ except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
import datetime
import time
+
from oslo.config import cfg
from quantum.openstack.common.gettextutils import _
import re
import urllib
+import six
import urllib2
from quantum.openstack.common.gettextutils import _
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
- if len(or_list) == 0:
+ if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
class RoleCheck(Check):
def __call__(self, target, creds):
"""Check that there is a matching role in the cred dict."""
+
return self.match.lower() in [x.lower() for x in creds['roles']]
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
if self.kind in creds:
- return match == unicode(creds[self.kind])
+ return match == six.text_type(creds[self.kind])
return False
LOG = logging.getLogger(__name__)
+class InvalidArgumentError(Exception):
+ def __init__(self, message=None):
+ super(InvalidArgumentError, self).__init__(message)
+
+
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
- if len(kwargs):
+ if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
+
+
+def trycmd(*args, **kwargs):
+ """
+ A wrapper around execute() to more easily handle warnings and errors.
+
+ Returns an (out, err) tuple of strings containing the output of
+ the command's stdout and stderr. If 'err' is not empty then the
+ command can be considered to have failed.
+
+ :discard_warnings True | False. Defaults to False. If set to True,
+ then for succeeding commands, stderr is cleared
+
+ """
+ discard_warnings = kwargs.pop('discard_warnings', False)
+
+ try:
+ out, err = execute(*args, **kwargs)
+ failed = False
+ except ProcessExecutionError, exn:
+ out, err = '', str(exn)
+ failed = True
+
+ if not failed and discard_warnings and err:
+ # Handle commands that output to stderr but otherwise succeed
+ err = ''
+
+ return out, err
+
+
+def ssh_execute(ssh, cmd, process_input=None,
+ addl_env=None, check_exit_code=True):
+ LOG.debug(_('Running cmd (SSH): %s'), cmd)
+ if addl_env:
+ raise InvalidArgumentError(_('Environment not supported over SSH'))
+
+ if process_input:
+ # This is (probably) fixable if we need it...
+ raise InvalidArgumentError(_('process_input not supported over SSH'))
+
+ stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
+ channel = stdout_stream.channel
+
+ # NOTE(justinsb): This seems suspicious...
+ # ...other SSH clients have buffering issues with this approach
+ stdout = stdout_stream.read()
+ stderr = stderr_stream.read()
+ stdin_stream.close()
+
+ exit_status = channel.recv_exit_status()
+
+ # exit_status == -1 if no exit code was returned
+ if exit_status != -1:
+ LOG.debug(_('Result was %s') % exit_status)
+ if check_exit_code and exit_status != 0:
+ raise ProcessExecutionError(exit_code=exit_status,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=cmd)
+
+ return (stdout, stderr)
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %s'
- ', message : %s') % (msg_id, message_data))
+ LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ ', message : %(data)s'), {'msg_id': msg_id,
+ 'data': message_data})
else:
waiter.put(message_data)
import traceback
from oslo.config import cfg
+import six
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
"not supported by this endpoint.")
+class RpcVersionCapError(RPCException):
+ message = _("Specified RPC version cap, %(version_cap)s, is too low")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(_("Returning exception %s to caller"),
+ six.text_type(failure))
LOG.error(tb)
kwargs = {}
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
- 'message': unicode(failure),
+ 'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs
"""
from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
- def __init__(self, callbacks):
+ def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
+ :param serializer: The Serializer object that will be used to
+ deserialize arguments before the method call and
+ to serialize the result after it returns.
"""
self.callbacks = callbacks
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcDispatcher, self).__init__()
+ def _deserialize_args(self, context, kwargs):
+ """Helper method called to deserialize args before dispatch.
+
+ This calls our serializer on each argument, returning a new set of
+ args that have been deserialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to be deserialized
+ :returns: A new set of deserialized args
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.deserialize_entity(context,
+ arg)
+ return new_kwargs
+
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
if not hasattr(proxyobj, method):
continue
if is_compatible:
- return getattr(proxyobj, method)(ctxt, **kwargs)
+ kwargs = self._deserialize_args(ctxt, kwargs)
+ result = getattr(proxyobj, method)(ctxt, **kwargs)
+ return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
- qpid_exceptions.ConnectionError), e:
+ qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()
return
# We must unsubscribe, or we'll leak descriptors.
- if len(self.subscriptions) > 0:
+ if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
- if len(queues) == 0:
+ if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
- matchmaker = importutils.import_object(
- CONF.rpc_zmq_matchmaker, *args, **kwargs)
+ mm = CONF.rpc_zmq_matchmaker
+ if mm.endswith('matchmaker.MatchMakerRing'):
+ mm.replace('matchmaker', 'matchmaker_ring')
+ LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
+ ' %(new)s instead') % dict(
+ orig=CONF.rpc_zmq_matchmaker, new=mm))
+ matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker
"""
import contextlib
-import itertools
-import json
import eventlet
from oslo.config import cfg
matchmaker_opts = [
- # Matchmaker ring file
- cfg.StrOpt('matchmaker_ringfile',
- default='/etc/nova/matchmaker_ring.json',
- help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
- LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+ LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+ {'key': key, 'host': host})
def start_heartbeat(self):
"""
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
- if len(self.hosts) == 0:
+ if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
return [(key, None)]
-class RingExchange(Exchange):
- """
- Match Maker where hosts are loaded from a static file containing
- a hashmap (JSON formatted).
-
- __init__ takes optional ring dictionary argument, otherwise
- loads the ringfile from CONF.mathcmaker_ringfile.
- """
- def __init__(self, ring=None):
- super(RingExchange, self).__init__()
-
- if ring:
- self.ring = ring
- else:
- fh = open(CONF.matchmaker_ringfile, 'r')
- self.ring = json.load(fh)
- fh.close()
-
- self.ring0 = {}
- for k in self.ring.keys():
- self.ring0[k] = itertools.cycle(self.ring[k])
-
- def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
-
-
-class RoundRobinRingExchange(RingExchange):
- """A Topic Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(RoundRobinRingExchange, self).__init__(ring)
-
- def run(self, key):
- if not self._ring_has(key):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (key, )
- )
- return []
- host = next(self.ring0[key])
- return [(key + '.' + host, host)]
-
-
-class FanoutRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(FanoutRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "fanout~", strip it for lookup.
- nkey = key.split('fanout~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
return [(key, e)]
-class MatchMakerRing(MatchMakerBase):
- """
- Match Maker where hosts are loaded from a static hashmap.
- """
- def __init__(self, ring=None):
- super(MatchMakerRing, self).__init__()
- self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
- self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
-
-
class MatchMakerLocalhost(MatchMakerBase):
"""
Match Maker where all bare topics resolve to localhost.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011-2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import itertools
+import json
+
+from oslo.config import cfg
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import matchmaker as mm
+
+
+matchmaker_opts = [
+ # Matchmaker ring file
+ cfg.StrOpt('ringfile',
+ deprecated_name='matchmaker_ringfile',
+ deprecated_group='DEFAULT',
+ default='/etc/oslo/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
+LOG = logging.getLogger(__name__)
+
+
+class RingExchange(mm.Exchange):
+ """
+ Match Maker where hosts are loaded from a static file containing
+ a hashmap (JSON formatted).
+
+ __init__ takes optional ring dictionary argument, otherwise
+ loads the ringfile from CONF.mathcmaker_ringfile.
+ """
+ def __init__(self, ring=None):
+ super(RingExchange, self).__init__()
+
+ if ring:
+ self.ring = ring
+ else:
+ fh = open(CONF.matchmaker_ring.ringfile, 'r')
+ self.ring = json.load(fh)
+ fh.close()
+
+ self.ring0 = {}
+ for k in self.ring.keys():
+ self.ring0[k] = itertools.cycle(self.ring[k])
+
+ def _ring_has(self, key):
+ if key in self.ring0:
+ return True
+ return False
+
+
+class RoundRobinRingExchange(RingExchange):
+ """A Topic Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(RoundRobinRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ if not self._ring_has(key):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
+ )
+ return []
+ host = next(self.ring0[key])
+ return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(FanoutRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "fanout~", strip it for lookup.
+ nkey = key.split('fanout~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class MatchMakerRing(mm.MatchMakerBase):
+ """
+ Match Maker where hosts are loaded from a static hashmap.
+ """
+ def __init__(self, ring=None):
+ super(MatchMakerRing, self).__init__()
+ self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
+ self.add_binding(mm.DirectBinding(), mm.DirectExchange())
+ self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2012 Red Hat, Inc.
+# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
rpc API.
"""
- def __init__(self, topic, default_version):
+ # The default namespace, which can be overriden in a subclass.
+ RPC_API_NAMESPACE = None
+
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
+ :param version_cap: Optionally cap the maximum version used for sent
+ messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
+ self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
- msg['version'] = vers if vers else self.default_version
+ v = vers if vers else self.default_version
+ if (self.version_cap and not
+ rpc_common.version_is_compatible(self.version_cap, v)):
+ raise rpc_common.RpcVersionCapError(version=self.version_cap)
+ msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
- @staticmethod
- def make_msg(method, **kwargs):
- return RpcProxy.make_namespaced_msg(method, None, **kwargs)
+ def make_msg(self, method, **kwargs):
+ return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
+ **kwargs)
+
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.call(context, real_topic, msg, timeout)
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.multicall(context, real_topic, msg, timeout)
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+
+class Serializer(object):
+ """Generic (de-)serialization definition base class"""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def serialize_entity(self, context, entity):
+ """Serialize something to primitive form.
+
+ :param context: Security context
+ :param entity: Entity to be serialized
+ :returns: Serialized form of entity
+ """
+ pass
+
+ @abc.abstractmethod
+ def deserialize_entity(self, context, entity):
+ """Deserialize something from primitive form.
+
+ :param context: Security context
+ :param entity: Primitive to be deserialized
+ :returns: Deserialized form of entity
+ """
+ pass
+
+
+class NoOpSerializer(Serializer):
+ """A serializer that does nothing"""
+
+ def serialize_entity(self, context, entity):
+ return entity
+
+ def deserialize_entity(self, context, entity):
+ return entity
"""
self._services = threadgroup.ThreadGroup()
- eventlet_backdoor.initialize_if_enabled()
+ self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
:returns: None
"""
+ service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
def stop(self):
self.threads = []
self.timers = []
+ def add_dynamic_timer(self, callback, initial_delay=None,
+ periodic_interval_max=None, *args, **kwargs):
+ timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
+ timer.start(initial_delay=initial_delay,
+ periodic_interval_max=periodic_interval_max)
+ self.timers.append(timer)
+
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
"""Provides methods needed by installation script for OpenStack development
virtual environments.
+Since this script is used to bootstrap a virtualenv from the system's Python
+environment, it should be kept strictly compatible with Python 2.6.
+
Synced in from openstack-common
"""
-import argparse
+from __future__ import print_function
+
+import optparse
import os
import subprocess
import sys
self.project = project
def die(self, message, *args):
- print >> sys.stderr, message % args
+ print(message % args, file=sys.stderr)
sys.exit(1)
def check_python_version(self):
virtual environment.
"""
if not os.path.isdir(self.venv):
- print 'Creating venv...',
+ print('Creating venv...', end=' ')
if no_site_packages:
self.run_command(['virtualenv', '-q', '--no-site-packages',
self.venv])
else:
self.run_command(['virtualenv', '-q', self.venv])
- print 'done.'
- print 'Installing pip in venv...',
+ print('done.')
+ print('Installing pip in venv...', end=' ')
if not self.run_command(['tools/with_venv.sh', 'easy_install',
'pip>1.0']).strip():
self.die("Failed to install pip.")
- print 'done.'
+ print('done.')
else:
- print "venv already exists..."
+ print("venv already exists...")
pass
def pip_install(self, *args):
redirect_output=False)
def install_dependencies(self):
- print 'Installing dependencies with pip (this can take a while)...'
+ print('Installing dependencies with pip (this can take a while)...')
# First things first, make sure our venv has the latest pip and
# distribute.
def parse_args(self, argv):
"""Parses command-line arguments."""
- parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--no-site-packages',
- action='store_true',
- help="Do not inherit packages from global Python "
- "install")
- return parser.parse_args(argv[1:])
+ parser = optparse.OptionParser()
+ parser.add_option('-n', '--no-site-packages',
+ action='store_true',
+ help="Do not inherit packages from global Python "
+ "install")
+ return parser.parse_args(argv[1:])[0]
class Distro(InstallVenv):
return
if self.check_cmd('easy_install'):
- print 'Installing virtualenv via easy_install...',
+ print('Installing virtualenv via easy_install...', end=' ')
if self.run_command(['easy_install', 'virtualenv']):
- print 'Succeeded'
+ print('Succeeded')
return
else:
- print 'Failed'
+ print('Failed')
self.die('ERROR: virtualenv not found.\n\n%s development'
' requires virtualenv, please install it using your'
return self.run_command_with_code(['rpm', '-q', pkg],
check_exit_code=False)[1] == 0
- def yum_install(self, pkg, **kwargs):
- print "Attempting to install '%s' via yum" % pkg
- self.run_command(['sudo', 'yum', 'install', '-y', pkg], **kwargs)
-
def apply_patch(self, originalfile, patchfile):
self.run_command(['patch', '-N', originalfile, patchfile],
check_exit_code=False)
return
if not self.check_pkg('python-virtualenv'):
- self.yum_install('python-virtualenv', check_exit_code=False)
+ self.die("Please install 'python-virtualenv'.")
super(Fedora, self).install_virtualenv()
This can be removed when the fix is applied upstream.
Nova: https://bugs.launchpad.net/nova/+bug/884915
- Upstream: https://bitbucket.org/which_linden/eventlet/issue/89
+ Upstream: https://bitbucket.org/eventlet/eventlet/issue/89
+ RHEL: https://bugzilla.redhat.com/958868
"""
# Install "patch" program if it's not there
if not self.check_pkg('patch'):
- self.yum_install('patch')
+ self.die("Please install 'patch'.")
# Apply the eventlet patch
self.apply_patch(os.path.join(self.venv, 'lib', self.py_version,
python-keystoneclient>=0.2.0
alembic>=0.4.1
http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config
+six
# Cisco plugin dependencies
python-novaclient