#
# Version info
from quantum.version import version_info as quantum_version
-release = quantum_version.version_string_with_vcs()
+release = quantum_version.release_string()
# The short X.Y version.
-version = quantum_version.canonical_version_string()
+version = quantum_version.version_string()
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
+modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,processutils,rpc,service,setup,threadgroup,timeutils,uuidutils,version
# The base module to hold the copy of openstack.common
base=quantum
def parse(args):
cfg.CONF(args=args, project='quantum',
- version='%%prog %s' % quantum_version.version_string_with_vcs())
+ version='%%prog %s' % quantum_version.release_string())
# Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac,
"""
self._args = args
- for opt, group in self._all_cli_opts():
+ for opt, group in sorted(self._all_cli_opts()):
opt._add_to_cli(self._oparser, group)
return vars(self._oparser.parse_args(args))
from quantum.openstack.common.gettextutils import _
+_FATAL_EXCEPTION_FORMAT_ERRORS = False
+
class Error(Exception):
def __init__(self, message=None):
try:
self._error_string = self.message % kwargs
- except Exception:
- # at least get the core message out if something happened
- self._error_string = self.message
+ except Exception as e:
+ if _FATAL_EXCEPTION_FORMAT_ERRORS:
+ raise e
+ else:
+ # at least get the core message out if something happened
+ self._error_string = self.message
def __str__(self):
return self._error_string
import datetime
+import functools
import inspect
import itertools
import json
+import logging
import xmlrpclib
+from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import timeutils
+LOG = logging.getLogger(__name__)
-def to_primitive(value, convert_instances=False, level=0):
+
+def to_primitive(value, convert_instances=False, convert_datetime=True,
+ level=0, max_depth=3):
"""Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances,
if getattr(value, '__module__', None) == 'mox':
return 'mock'
- if level > 3:
+ if level > max_depth:
+ LOG.error(_('Max serialization depth exceeded on object: %d %s'),
+ level, value)
return '?'
# The try block may not be necessary after the class check above,
# but just in case ...
try:
+ recursive = functools.partial(to_primitive,
+ convert_instances=convert_instances,
+ convert_datetime=convert_datetime,
+ level=level,
+ max_depth=max_depth)
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)):
- o = []
- for v in value:
- o.append(to_primitive(v, convert_instances=convert_instances,
- level=level))
- return o
+ return [recursive(v) for v in value]
elif isinstance(value, dict):
- o = {}
- for k, v in value.iteritems():
- o[k] = to_primitive(v, convert_instances=convert_instances,
- level=level)
- return o
- elif isinstance(value, datetime.datetime):
+ return dict((k, recursive(v)) for k, v in value.iteritems())
+ elif convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
- return to_primitive(dict(value.iteritems()),
- convert_instances=convert_instances,
- level=level + 1)
+ return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
- return to_primitive(list(value),
- convert_instances=convert_instances,
- level=level)
+ return recursive(list(value))
elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
- return to_primitive(value.__dict__,
- convert_instances=convert_instances,
- level=level + 1)
+ return recursive(value.__dict__, level=level + 1)
else:
return value
except TypeError:
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
+ # NOTE(mikal): this bit is confusing. What is stored is a weak
+ # reference, not the value itself. We therefore need to lookup
+ # the weak reference and return the inner value here.
rval = rval()
return rval
return corolocal.local.__setattr__(self, attr, value)
+# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
+
+# A "weak" store uses weak references and allows an object to fall out of scope
+# when it falls out of scope in the code that uses the thread local storage. A
+# "strong" store will hold a reference to the object so that it never falls out
+# of scope.
+weak_store = WeakLocal()
+strong_store = corolocal.local
from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils
from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import local
from quantum.openstack.common import log as logging
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
- default=os.path.abspath(os.path.join(os.path.dirname(__file__),
- '../')),
- help='Directory to use for lock files')
+ help=('Directory to use for lock files. Default to a '
+ 'temp directory'))
]
def foo(self, *args):
...
- ensures that only one thread will execute the bar method at a time.
+ ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
- if external and not CONF.disable_process_locking:
- LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
- 'method "%(method)s"...'),
- {'lock': name, 'method': f.__name__})
- cleanup_dir = False
-
- # We need a copy of lock_path because it is non-local
- local_lock_path = lock_path
- if not local_lock_path:
- local_lock_path = CONF.lock_path
-
- if not local_lock_path:
- cleanup_dir = True
- local_lock_path = tempfile.mkdtemp()
-
- if not os.path.exists(local_lock_path):
- cleanup_dir = True
- fileutils.ensure_tree(local_lock_path)
-
- # NOTE(mikal): the lock name cannot contain directory
- # separators
- safe_name = name.replace(os.sep, '_')
- lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
- lock_file_path = os.path.join(local_lock_path,
- lock_file_name)
-
- try:
- lock = InterProcessLock(lock_file_path)
- with lock:
- LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
- 'for method "%(method)s"...'),
+
+ # NOTE(mikal): I know this looks odd
+ if not hasattr(local.strong_store, 'locks_held'):
+ local.strong_store.locks_held = []
+ local.strong_store.locks_held.append(name)
+
+ try:
+ if external and not CONF.disable_process_locking:
+ LOG.debug(_('Attempting to grab file lock "%(lock)s" '
+ 'for method "%(method)s"...'),
+ {'lock': name, 'method': f.__name__})
+ cleanup_dir = False
+
+ # We need a copy of lock_path because it is non-local
+ local_lock_path = lock_path
+ if not local_lock_path:
+ local_lock_path = CONF.lock_path
+
+ if not local_lock_path:
+ cleanup_dir = True
+ local_lock_path = tempfile.mkdtemp()
+
+ if not os.path.exists(local_lock_path):
+ cleanup_dir = True
+ fileutils.ensure_tree(local_lock_path)
+
+ # NOTE(mikal): the lock name cannot contain directory
+ # separators
+ safe_name = name.replace(os.sep, '_')
+ lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
+ lock_file_path = os.path.join(local_lock_path,
+ lock_file_name)
+
+ try:
+ lock = InterProcessLock(lock_file_path)
+ with lock:
+ LOG.debug(_('Got file lock "%(lock)s" at '
+ '%(path)s for method '
+ '"%(method)s"...'),
+ {'lock': name,
+ 'path': lock_file_path,
+ 'method': f.__name__})
+ retval = f(*args, **kwargs)
+ finally:
+ LOG.debug(_('Released file lock "%(lock)s" at '
+ '%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
- retval = f(*args, **kwargs)
- finally:
- LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
- ' for method "%(method)s"...'),
- {'lock': name,
- 'path': lock_file_path,
- 'method': f.__name__})
- # NOTE(vish): This removes the tempdir if we needed
- # to create one. This is used to cleanup
- # the locks left behind by unit tests.
- if cleanup_dir:
- shutil.rmtree(local_lock_path)
- else:
- retval = f(*args, **kwargs)
+ # NOTE(vish): This removes the tempdir if we needed
+ # to create one. This is used to
+ # cleanup the locks left behind by unit
+ # tests.
+ if cleanup_dir:
+ shutil.rmtree(local_lock_path)
+ else:
+ retval = f(*args, **kwargs)
+
+ finally:
+ local.strong_store.locks_held.remove(name)
return retval
return inner
+++ /dev/null
-# Copyright 2011 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from quantum.openstack.common import cfg
-from quantum.openstack.common.gettextutils import _
-from quantum.openstack.common import importutils
-from quantum.openstack.common import log as logging
-
-
-list_notifier_drivers_opt = cfg.MultiStrOpt(
- 'list_notifier_drivers',
- default=['quantum.openstack.common.notifier.no_op_notifier'],
- help='List of drivers to send notifications')
-
-CONF = cfg.CONF
-CONF.register_opt(list_notifier_drivers_opt)
-
-LOG = logging.getLogger(__name__)
-
-drivers = None
-
-
-class ImportFailureNotifier(object):
- """Noisily re-raises some exception over-and-over when notify is called."""
-
- def __init__(self, exception):
- self.exception = exception
-
- def notify(self, context, message):
- raise self.exception
-
-
-def _get_drivers():
- """Instantiates and returns drivers based on the flag values."""
- global drivers
- if drivers is None:
- drivers = []
- for notification_driver in CONF.list_notifier_drivers:
- try:
- drivers.append(importutils.import_module(notification_driver))
- except ImportError as e:
- drivers.append(ImportFailureNotifier(e))
- return drivers
-
-
-def add_driver(notification_driver):
- """Add a notification driver at runtime."""
- # Make sure the driver list is initialized.
- _get_drivers()
- if isinstance(notification_driver, basestring):
- # Load and add
- try:
- drivers.append(importutils.import_module(notification_driver))
- except ImportError as e:
- drivers.append(ImportFailureNotifier(e))
- else:
- # Driver is already loaded; just add the object.
- drivers.append(notification_driver)
-
-
-def _object_name(obj):
- name = []
- if hasattr(obj, '__module__'):
- name.append(obj.__module__)
- if hasattr(obj, '__name__'):
- name.append(obj.__name__)
- else:
- name.append(obj.__class__.__name__)
- return '.'.join(name)
-
-
-def remove_driver(notification_driver):
- """Remove a notification driver at runtime."""
- # Make sure the driver list is initialized.
- _get_drivers()
- removed = False
- if notification_driver in drivers:
- # We're removing an object. Easy.
- drivers.remove(notification_driver)
- removed = True
- else:
- # We're removing a driver by name. Search for it.
- for driver in drivers:
- if _object_name(driver) == notification_driver:
- drivers.remove(driver)
- removed = True
-
- if not removed:
- raise ValueError("Cannot remove; %s is not in list" %
- notification_driver)
-
-
-def notify(context, message):
- """Passes notification to multiple notifiers in a list."""
- for driver in _get_drivers():
- try:
- driver.notify(context, message)
- except Exception as e:
- LOG.exception(_("Problem '%(e)s' attempting to send to "
- "notification driver %(driver)s."), locals())
-
-
-def _reset_drivers():
- """Used by unit tests to reset the drivers."""
- global drivers
- drivers = None
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import logging
+import random
+import shlex
+
+from eventlet.green import subprocess
+from eventlet import greenthread
+
+from quantum.openstack.common.gettextutils import _
+
+
+LOG = logging.getLogger(__name__)
+
+
+class UnknownArgumentError(Exception):
+ def __init__(self, message=None):
+ super(UnknownArgumentError, self).__init__(message)
+
+
+class ProcessExecutionError(Exception):
+ def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ if description is None:
+ description = "Unexpected error while running command."
+ if exit_code is None:
+ exit_code = '-'
+ message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
+ % (description, cmd, exit_code, stdout, stderr))
+ super(ProcessExecutionError, self).__init__(message)
+
+
+def execute(*cmd, **kwargs):
+ """
+ Helper method to shell out and execute a command through subprocess with
+ optional retry.
+
+ :param cmd: Passed to subprocess.Popen.
+ :type cmd: string
+ :param process_input: Send to opened process.
+ :type proces_input: string
+ :param check_exit_code: Defaults to 0. Will raise
+ :class:`ProcessExecutionError`
+ if the command exits without returning this value
+ as a returncode
+ :type check_exit_code: int
+ :param delay_on_retry: True | False. Defaults to True. If set to True,
+ wait a short amount of time before retrying.
+ :type delay_on_retry: boolean
+ :param attempts: How many times to retry cmd.
+ :type attempts: int
+ :param run_as_root: True | False. Defaults to False. If set to True,
+ the command is prefixed by the command specified
+ in the root_helper kwarg.
+ :type run_as_root: boolean
+ :param root_helper: command to prefix all cmd's with
+ :type root_helper: string
+ :returns: (stdout, stderr) from process execution
+ :raises: :class:`UnknownArgumentError` on
+ receiving unknown arguments
+ :raises: :class:`ProcessExecutionError`
+ """
+
+ process_input = kwargs.pop('process_input', None)
+ check_exit_code = kwargs.pop('check_exit_code', 0)
+ delay_on_retry = kwargs.pop('delay_on_retry', True)
+ attempts = kwargs.pop('attempts', 1)
+ run_as_root = kwargs.pop('run_as_root', False)
+ root_helper = kwargs.pop('root_helper', '')
+ if len(kwargs):
+ raise UnknownArgumentError(_('Got unknown keyword args '
+ 'to utils.execute: %r') % kwargs)
+ if run_as_root:
+ cmd = shlex.split(root_helper) + list(cmd)
+ cmd = map(str, cmd)
+
+ while attempts > 0:
+ attempts -= 1
+ try:
+ LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
+ _PIPE = subprocess.PIPE # pylint: disable=E1101
+ obj = subprocess.Popen(cmd,
+ stdin=_PIPE,
+ stdout=_PIPE,
+ stderr=_PIPE,
+ close_fds=True)
+ result = None
+ if process_input is not None:
+ result = obj.communicate(process_input)
+ else:
+ result = obj.communicate()
+ obj.stdin.close() # pylint: disable=E1101
+ _returncode = obj.returncode # pylint: disable=E1101
+ if _returncode:
+ LOG.debug(_('Result was %s') % _returncode)
+ if (isinstance(check_exit_code, int) and
+ not isinstance(check_exit_code, bool) and
+ _returncode != check_exit_code):
+ (stdout, stderr) = result
+ raise ProcessExecutionError(exit_code=_returncode,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=' '.join(cmd))
+ return result
+ except ProcessExecutionError:
+ if not attempts:
+ raise
+ else:
+ LOG.debug(_('%r failed. Retrying.'), cmd)
+ if delay_on_retry:
+ greenthread.sleep(random.randint(20, 200) / 100.0)
+ finally:
+ # NOTE(termie): this appears to be necessary to let the subprocess
+ # call clean something up in between calls, without
+ # it two execute calls in a row hangs the second one
+ greenthread.sleep(0)
rpc.proxy
"""
+import inspect
+import logging
+
from quantum.openstack.common import cfg
+from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
+from quantum.openstack.common import local
+
+
+LOG = logging.getLogger(__name__)
rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
-cfg.CONF.register_opts(rpc_opts)
+CONF = cfg.CONF
+CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
:returns: An instance of openstack.common.rpc.common.Connection
"""
- return _get_impl().create_connection(cfg.CONF, new=new)
+ return _get_impl().create_connection(CONF, new=new)
+
+
+def _check_for_lock():
+ if not CONF.debug:
+ return None
+
+ if ((hasattr(local.strong_store, 'locks_held')
+ and local.strong_store.locks_held)):
+ stack = ' :: '.join([frame[3] for frame in inspect.stack()])
+ LOG.warn(_('A RPC is being made while holding a lock. The locks '
+ 'currently held are %(locks)s. This is probably a bug. '
+ 'Please report it. Include the following: [%(stack)s].'),
+ {'locks': local.strong_store.locks_held,
+ 'stack': stack})
+ return True
+
+ return False
-def call(context, topic, msg, timeout=None):
+def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
:returns: None
"""
- return _get_impl().cast(cfg.CONF, context, topic, msg)
+ return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
+ return _get_impl().fanout_cast(CONF, context, topic, msg)
-def multicall(context, topic, msg, timeout=None):
+def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
:returns: None
"""
- return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
:returns: None
"""
- return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
global _RPCIMPL
if _RPCIMPL is None:
try:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
- impl = cfg.CONF.rpc_backend.replace('nova.rpc',
- 'nova.openstack.common.rpc')
+ impl = CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
- if not module in conf.allowed_rpc_exception_modules:
+ if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
- help='the RabbitMQ password'),
+ help='the RabbitMQ password',
+ secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
channel=channel,
routing_key=self.routing_key)
- def send(self, msg):
+ def send(self, msg, timeout=None):
"""Send a message"""
- self.producer.publish(msg)
+ if timeout:
+ #
+ # AMQP TTL is in milliseconds when set in the header.
+ #
+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+ else:
+ self.producer.publish(msg)
class DirectPublisher(Publisher):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
- def publisher_send(self, cls, topic, msg, **kwargs):
+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg)
+ publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
- self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
- help='Password for qpid connection'),
+ help='Password for qpid connection',
+ secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ #
+ # We want to create a message with attributes, e.g. a TTL. We
+ # don't really need to keep 'msg' in its JSON format any longer
+ # so let's create an actual qpid message here and get some
+ # value-add on the go.
+ #
+ # WARNING: Request timeout happens to be in the same units as
+ # qpid's TTL (seconds). If this changes in the future, then this
+ # will need to be altered accordingly.
+ #
+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+ self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
import os
import pprint
import socket
-import string
import sys
import types
import uuid
Error if a developer passes us bad data.
"""
try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
+ return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ msg_id = msg_id or 0
+
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
def close(self):
self.outq.close()
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
return [result]
def process(self, style, target, proxy, ctx, data):
+ data.setdefault('version', None)
+ data.setdefault('args', {})
+
# Method starting with - are
# processed internally. (non-valid method name)
- method = data['method']
+ method = data.get('method')
+ if not method:
+ LOG.error(_("RPC message did not include method."))
+ return
# Internal method
# uses internal context for safety.
- if data['method'][0] == '-':
- # For reply / process_reply
- method = method[1:]
- if method == 'reply':
- self.private_ctx.reply(ctx, proxy, **data['args'])
+ if method == '-reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
return
- data.setdefault('version', None)
- data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- elif topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
+ if topic not in self.topic_proxy:
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None,
+def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload,
+ _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+ force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, serialize,
+ force_envelope, _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout,
+ return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
return ZMQ_CTX
-def _get_matchmaker():
+def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
- # rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = CONF.rpc_zmq_matchmaker.split('.')
- mm_module = '.'.join(mm_path[:-1])
- mm_class = mm_path[-1]
-
- # Only initialize a class.
- if mm_path[-1][0] not in string.ascii_uppercase:
- LOG.error(_("Matchmaker could not be loaded.\n"
- "rpc_zmq_matchmaker is not a class."))
- raise RPCException(_("Error loading Matchmaker."))
-
- mm_impl = importutils.import_module(mm_module)
- mm_constructor = getattr(mm_impl, mm_class)
- matchmaker = mm_constructor()
+ matchmaker = importutils.import_object(
+ CONF.rpc_zmq_matchmaker, *args, **kwargs)
return matchmaker
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
- def __init__(self):
+ def __init__(self, host='localhost'):
+ self.host = host
super(Exchange, self).__init__()
def run(self, key):
- return [(key.split('.')[0] + '.localhost', 'localhost')]
+ return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
- i.e. "compute.host" sends a message to "compute" running on "host"
+ i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
- b, e = key.split('.', 1)
- return [(b, e)]
+ e = key.split('.', 1)[1]
+ return [(key, e)]
class MatchMakerRing(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
- def __init__(self):
+ def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), LocalhostExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
:returns: None
"""
- self._services = threadgroup.ThreadGroup('launcher')
+ self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
- self.tg = threadgroup.ThreadGroup('service', threads)
+ self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass
return len(revlist.splitlines())
-def get_version_from_git(pre_version):
+def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
return None
-def get_version_from_pkg_info(package_name):
+def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version
- version = get_version_from_pkg_info(package_name)
+ version = _get_version_from_pkg_info(package_name)
if version:
return version
- version = get_version_from_git(pre_version)
+ version = _get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
- def __init__(self, name, thread, group):
- self.name = name
+ def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
when need be).
* provide an easy API to add timers.
"""
- def __init__(self, name, thread_pool_size=10):
- self.name = name
+ def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
- th = Thread(callback.__name__, gt, self)
+ th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):
return datetime.datetime.utcnow()
+def iso8601_from_timestamp(timestamp):
+ """Returns a iso8601 formated date from timestamp"""
+ return isotime(datetime.datetime.utcfromtimestamp(timestamp))
+
+
utcnow.override_time = None
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
+
+
+def is_soon(dt, window):
+ """
+ Determines if time is going to happen in the next window seconds.
+
+ :params dt: the time
+ :params window: minimum seconds to remain to consider the time not soon
+
+ :return: True if expiration is within the given duration
+ """
+ soon = (utcnow() + datetime.timedelta(seconds=window))
+ return normalize_time(dt) <= soon
self.version = None
self._cached_version = None
+ def __str__(self):
+ """Make the VersionInfo object behave like a string."""
+ return self.version_string()
+
+ def __repr__(self):
+ """Include the name."""
+ return "VersionInfo(%s:%s)" % (self.package, self.version_string())
+
def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record
associated with the package."""
provider = pkg_resources.get_provider(requirement)
return provider.version
except pkg_resources.DistributionNotFound:
- # The most likely cause for this is running tests in a tree with
+ # The most likely cause for this is running tests in a tree
# produced from a tarball where the package itself has not been
- # installed into anything. Check for a PKG-INFO file.
+ # installed into anything. Revert to setup-time logic.
from quantum.openstack.common import setup
- return setup.get_version_from_pkg_info(self.package)
+ return setup.get_version(self.package)
def release_string(self):
"""Return the full version of the package including suffixes indicating
import subprocess
import sys
+
possible_topdir = os.getcwd()
if os.path.exists(os.path.join(possible_topdir, "quantum",
"__init__.py")):