class RequestContext(object):
- """
+ """Helper class to represent useful information about a request context.
+
Stores information about the security context under which the user
accesses the system, as well as additional request information.
"""
class OpenstackException(Exception):
- """
- Base Exception
+ """Base Exception class.
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
+# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
from heat.openstack.common.gettextutils import _
"""
+import copy
import gettext
+import logging.handlers
import os
+import UserString
_localedir = os.environ.get('heat'.upper() + '_LOCALEDIR')
_t = gettext.translation('heat', localedir=_localedir, fallback=True)
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)
+
+
+"""
+Lazy gettext functionality.
+
+The following is an attempt to introduce a deferred way
+to do translations on messages in OpenStack. We attempt to
+override the standard _() function and % (format string) operation
+to build Message objects that can later be translated when we have
+more information. Also included is an example LogHandler that
+translates Messages to an associated locale, effectively allowing
+many logs, each with their own locale.
+"""
+
+
+def get_lazy_gettext(domain):
+ """Assemble and return a lazy gettext function for a given domain.
+
+ Factory method for a project/module to get a lazy gettext function
+ for its own translation domain (i.e. nova, glance, cinder, etc.)
+ """
+
+ def _lazy_gettext(msg):
+ """Create and return a Message object.
+
+ Message encapsulates a string so that we can translate it later when
+ needed.
+ """
+ return Message(msg, domain)
+
+ return _lazy_gettext
+
+
+class Message(UserString.UserString, object):
+ """Class used to encapsulate translatable messages."""
+ def __init__(self, msg, domain):
+ # _msg is the gettext msgid and should never change
+ self._msg = msg
+ self._left_extra_msg = ''
+ self._right_extra_msg = ''
+ self.params = None
+ self.locale = None
+ self.domain = domain
+
+ @property
+ def data(self):
+ # NOTE(mrodden): this should always resolve to a unicode string
+ # that best represents the state of the message currently
+
+ localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
+ if self.locale:
+ lang = gettext.translation(self.domain,
+ localedir=localedir,
+ languages=[self.locale],
+ fallback=True)
+ else:
+ # use system locale for translations
+ lang = gettext.translation(self.domain,
+ localedir=localedir,
+ fallback=True)
+
+ full_msg = (self._left_extra_msg +
+ lang.ugettext(self._msg) +
+ self._right_extra_msg)
+
+ if self.params is not None:
+ full_msg = full_msg % self.params
+
+ return unicode(full_msg)
+
+ def _save_parameters(self, other):
+ # we check for None later to see if
+ # we actually have parameters to inject,
+ # so encapsulate if our parameter is actually None
+ if other is None:
+ self.params = (other, )
+ else:
+ self.params = copy.deepcopy(other)
+
+ return self
+
+ # overrides to be more string-like
+ def __unicode__(self):
+ return self.data
+
+ def __str__(self):
+ return self.data.encode('utf-8')
+
+ def __getstate__(self):
+ to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
+ 'domain', 'params', 'locale']
+ new_dict = self.__dict__.fromkeys(to_copy)
+ for attr in to_copy:
+ new_dict[attr] = copy.deepcopy(self.__dict__[attr])
+
+ return new_dict
+
+ def __setstate__(self, state):
+ for (k, v) in state.items():
+ setattr(self, k, v)
+
+ # operator overloads
+ def __add__(self, other):
+ copied = copy.deepcopy(self)
+ copied._right_extra_msg += other.__str__()
+ return copied
+
+ def __radd__(self, other):
+ copied = copy.deepcopy(self)
+ copied._left_extra_msg += other.__str__()
+ return copied
+
+ def __mod__(self, other):
+ # do a format string to catch and raise
+ # any possible KeyErrors from missing parameters
+ self.data % other
+ copied = copy.deepcopy(self)
+ return copied._save_parameters(other)
+
+ def __mul__(self, other):
+ return self.data * other
+
+ def __rmul__(self, other):
+ return other * self.data
+
+ def __getitem__(self, key):
+ return self.data[key]
+
+ def __getslice__(self, start, end):
+ return self.data.__getslice__(start, end)
+
+ def __getattribute__(self, name):
+ # NOTE(mrodden): handle lossy operations that we can't deal with yet
+ # These override the UserString implementation, since UserString
+ # uses our __class__ attribute to try and build a new message
+ # after running the inner data string through the operation.
+ # At that point, we have lost the gettext message id and can just
+ # safely resolve to a string instead.
+ ops = ['capitalize', 'center', 'decode', 'encode',
+ 'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
+ 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
+ if name in ops:
+ return getattr(self.data, name)
+ else:
+ return UserString.UserString.__getattribute__(self, name)
+
+
+class LocaleHandler(logging.Handler):
+ """Handler that can have a locale associated to translate Messages.
+
+ A quick example of how to utilize the Message class above.
+ LocaleHandler takes a locale and a target logging.Handler object
+ to forward LogRecord objects to after translating the internal Message.
+ """
+
+ def __init__(self, locale, target):
+ """Initialize a LocaleHandler
+
+ :param locale: locale to use for translating messages
+ :param target: logging.Handler object to forward
+ LogRecord objects to after translation
+ """
+ logging.Handler.__init__(self)
+ self.locale = locale
+ self.target = target
+
+ def emit(self, record):
+ if isinstance(record.msg, Message):
+ # set the locale and resolve to a string
+ record.msg.locale = self.locale
+
+ self.target.emit(record)
def import_class(import_str):
- """Returns a class from a string including module and class"""
+ """Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
try:
__import__(mod_str)
def import_object_ns(name_space, import_str, *args, **kwargs):
- """
- Import a class and return an instance of it, first by trying
+ """Tries to import object from default namespace.
+
+ Imports a class and return an instance of it, first by trying
to find the class in a default namespace, then failing back to
a full path if not found in the default namespace.
"""
def getLazyLogger(name='unknown', version='unknown'):
- """
- create a pass-through logger that does not create the real logger
+ """Returns lazy logger.
+
+ Creates a pass-through logger that does not create the real logger
until it is really needed and delegates all calls to the real logger
- once it is created
+ once it is created.
"""
return LazyAdapter(name, version)
def parse_host_port(address, default_port=None):
- """
- Interpret a string as a host:port pair.
+ """Interpret a string as a host:port pair.
+
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
def notify_decorator(name, fn):
- """ decorator for notify which is used from utils.monkey_patch()
+ """Decorator for notify which is used from utils.monkey_patch().
:param name: name of the function
:param function: - object of the function
def notify(_context, message):
"""Notifies the recipient of the desired event given the model.
- Log notifications using openstack's default logging system"""
+
+ Log notifications using openstack's default logging system.
+ """
priority = message.get('priority',
CONF.default_notification_level)
def notify(_context, message):
- """Notifies the recipient of the desired event given the model"""
+ """Notifies the recipient of the desired event given the model."""
pass
def notify(context, message):
- """Sends a notification via RPC"""
+ """Sends a notification via RPC."""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
def notify(context, message):
- """Sends a notification via RPC"""
+ """Sends a notification via RPC."""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
-from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common.rpc import common as rpc_common
-# TODO(pekowski): Remove this option in Havana.
-amqp_opts = [
- cfg.BoolOpt('amqp_rpc_single_reply_queue',
- default=False,
- help='Enable a fast single reply queue if using AMQP based '
- 'RPC like RabbitMQ or Qpid.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the teatDown() method. In the run
+ # pool. The unit tests get here via the tearDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
class ConnectionContext(rpc_common.Connection):
- """The class that is actually returned to the caller of
- create_connection(). This is essentially a wrapper around
- Connection that supports 'with'. It can also return a new
- Connection, or one from a pool. The function will also catch
- when an instance of this class is to be deleted. With that
- we can return Connections to the pool on exceptions and so
- forth without making the caller be responsible for catching
- them. If possible the function makes sure to return a
- connection to the pool.
+ """The class that is actually returned to the create_connection() caller.
+
+ This is essentially a wrapper around Connection that supports 'with'.
+ It can also return a new Connection, or one from a pool.
+
+ The function will also catch when an instance of this class is to be
+ deleted. With that we can return Connections to the pool on exceptions
+ and so forth without making the caller be responsible for catching them.
+ If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
- """Create a new connection, or get one from the pool"""
+ """Create a new connection, or get one from the pool."""
self.connection = None
self.conf = conf
self.connection_pool = connection_pool
self.pooled = pooled
def __enter__(self):
- """When with ConnectionContext() is used, return self"""
+ """When with ConnectionContext() is used, return self."""
return self
def _done(self):
self.connection.consume_in_thread()
def __getattr__(self, key):
- """Proxy all other calls to the Connection instance"""
+ """Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
class ReplyProxy(ConnectionContext):
- """ Connection class for RPC replies / callbacks """
+ """Connection class for RPC replies / callbacks."""
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
+ LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
else:
waiter.put(message_data)
class RpcContext(rpc_common.CommonRpcContext):
- """Context that supports replying to a rpc.call"""
+ """Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
class _ThreadPoolWithWait(object):
- """Base class for a delayed invocation manager used by
- the Connection class to start up green threads
+ """Base class for a delayed invocation manager.
+
+ Used by the Connection class to start up green threads
to handle incoming messages.
"""
class CallbackWrapper(_ThreadPoolWithWait):
- """Wraps a straight callback to allow it to be invoked in a green
- thread.
+ """Wraps a straight callback.
+
+ Allows it to be invoked in a green thread.
"""
def __init__(self, conf, callback, connection_pool):
- """
+ """Initiates CallbackWrapper object.
+
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
return result
def __iter__(self):
- """Return a result until we get a reply with an 'ending" flag"""
+ """Return a result until we get a reply with an 'ending' flag."""
if self._done:
raise StopIteration
while True:
yield result
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
- def __init__(self, conf, connection, timeout):
- self._connection = connection
- self._iterator = connection.iterconsume(timeout=timeout or
- conf.rpc_response_timeout)
- self._result = None
- self._done = False
- self._got_ending = False
- self._conf = conf
- self.msg_id_cache = _MsgIdCache()
-
- def done(self):
- if self._done:
- return
- self._done = True
- self._iterator.close()
- self._iterator = None
- self._connection.close()
-
- def __call__(self, data):
- """The consume() callback will call this. Store the result."""
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
-
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._result = data['result']
-
- def __iter__(self):
- """Return a result until we get a 'None' response from consumer"""
- if self._done:
- raise StopIteration
- while True:
- try:
- self._iterator.next()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- result = self._result
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
def create_connection(conf, new, connection_pool):
- """Create a connection"""
+ """Create a connection."""
return ConnectionContext(conf, connection_pool, pooled=not new)
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- # TODO(pekowski): Remove all these comments in Havana.
- # For amqp_rpc_single_reply_queue = False,
- # Can't use 'with' for multicall, as it returns an iterator
- # that will continue to use the connection. When it's done,
- # connection.close() will get called which will put it back into
- # the pool
- # For amqp_rpc_single_reply_queue = True,
- # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
_add_unique_id(msg)
pack_context(msg, context)
- # TODO(pekowski): Remove this flag and the code under the if clause
- # in Havana.
- if not conf.amqp_rpc_single_reply_queue:
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- else:
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
+_REMOTE_POSTFIX = '_Remote'
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
'info: "%(info)s"')
def __init__(self, info=None, topic=None, method=None):
- """
+ """Initiates Timeout object.
+
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
raise NotImplementedError()
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
- """Register as a member of a group of consumers for a given topic from
- the specified exchange.
+ """Register as a member of a group of consumers.
+ Uses given topic from the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
if hasattr(failure, 'kwargs'):
kwargs = failure.kwargs
+ # NOTE(matiu): With cells, it's possible to re-raise remote, remote
+ # exceptions. Lets turn it back into the original exception type.
+ cls_name = str(failure.__class__.__name__)
+ mod_name = str(failure.__class__.__module__)
+ if (cls_name.endswith(_REMOTE_POSTFIX) and
+ mod_name.endswith(_REMOTE_POSTFIX)):
+ cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
+ mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
+
data = {
- 'class': str(failure.__class__.__name__),
- 'module': str(failure.__class__.__module__),
+ 'class': cls_name,
+ 'module': mod_name,
'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
ex_type = type(failure)
str_override = lambda self: message
- new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
+ new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
{'__str__': str_override, '__unicode__': str_override})
+ new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
class ClientException(Exception):
- """This encapsulates some actual exception that is expected to be
- hit by an RPC proxy object. Merely instantiating it records the
- current exception information, which will be passed back to the
- RPC client without exceptional logging."""
+ """Encapsulates actual exception expected to be hit by a RPC proxy object.
+
+ Merely instantiating it records the current exception information, which
+ will be passed back to the RPC client without exceptional logging.
+ """
def __init__(self):
self._exc_info = sys.exc_info()
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
+
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
- ClientException, which is used internally by the RPC layer."""
+ ClientException, which is used internally by the RPC layer.
+ """
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return Connection()
def fanout_cast(conf, context, topic, msg):
- """Cast to all consumers of a topic"""
+ """Cast to all consumers of a topic."""
check_serialize(msg)
method = msg.get('method')
if not method:
self.reconnect(channel)
def reconnect(self, channel):
- """Re-declare the queue after a rabbit reconnect"""
+ """Re-declare the queue after a rabbit reconnect."""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.consume(*args, callback=_callback, **options)
def cancel(self):
- """Cancel the consuming from the queue, if it has started"""
+ """Cancel the consuming from the queue, if it has started."""
try:
self.queue.cancel(self.tag)
except KeyError as e:
class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'"""
+ """Queue/consumer class for 'direct'."""
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'"""
+ """Consumer class for 'topic'."""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name=None, **kwargs):
class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'"""
+ """Consumer class for 'fanout'."""
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
class Publisher(object):
- """Base Publisher class"""
+ """Base Publisher class."""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
"""Init the Publisher class with the exchange_name, routing_key,
self.reconnect(channel)
def reconnect(self, channel):
- """Re-establish the Producer after a rabbit reconnection"""
+ """Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
routing_key=self.routing_key)
def send(self, msg, timeout=None):
- """Send a message"""
+ """Send a message."""
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
class DirectPublisher(Publisher):
- """Publisher class for 'direct'"""
+ """Publisher class for 'direct'."""
def __init__(self, conf, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
class TopicPublisher(Publisher):
- """Publisher class for 'topic'"""
+ """Publisher class for 'topic'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'topic' publisher.
class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'"""
+ """Publisher class for 'fanout'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'fanout' publisher.
class NotifyPublisher(TopicPublisher):
- """Publisher class for 'notify'"""
+ """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.reconnect()
def _fetch_ssl_params(self):
- """Handles fetching what ssl params
- should be used for the connection (if any)"""
+ """Handles fetching what ssl params should be used for the connection
+ (if any).
+ """
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
self.reconnect()
def get_channel(self):
- """Convenience call for bin/clear_rabbit_queues"""
+ """Convenience call for bin/clear_rabbit_queues."""
return self.channel
def close(self):
- """Close/release this connection"""
+ """Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
- """Reset a connection so it can be used again"""
+ """Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close()
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers"""
+ """Return an iterator that will consume from all queues/consumers."""
info = {'do_consume': True}
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
+ """Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
- """Send to a publisher based on the publisher class"""
+ """Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
topic, callback)
def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer"""
+ """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
- """Send a 'direct' message"""
+ """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message"""
+ """Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
- """Send a 'fanout' message"""
+ """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic"""
+ """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
- """Consume from all queues/consumers"""
+ """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
return
def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
+ """Consumer from all queues/consumers in a greenthread."""
def _consumer_thread():
try:
self.consume()
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
+ """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.declare_topic_consumer(topic, proxy_cb)
def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object"""
+ """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common
+qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
cfg.CONF.register_opts(qpid_opts)
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
class ConsumerBase(object):
"""Consumer base class."""
self.reconnect(session)
def reconnect(self, session):
- """Re-declare the receiver after a qpid reconnect"""
+ """Re-declare the receiver after a qpid reconnect."""
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
+ def _unpack_json_msg(self, msg):
+ """Load the JSON data in msg if msg.content_type indicates that it
+ is necessary. Put the loaded data back into msg.content and
+ update msg.content_type appropriately.
+
+ A Qpid Message containing a dict will have a content_type of
+ 'amqp/map', whereas one containing a string that needs to be converted
+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+ :param msg: a Qpid Message object
+ :returns: None
+ """
+ if msg.content_type == JSON_CONTENT_TYPE:
+ msg.content = jsonutils.loads(msg.content)
+ msg.content_type = 'amqp/map'
+
def consume(self):
- """Fetch the message and pass it to the callback object"""
+ """Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
+ self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'"""
+ """Queue/consumer class for 'direct'."""
def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'"""
+ """Consumer class for 'topic'."""
def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'"""
+ """Consumer class for 'fanout'."""
def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
class Publisher(object):
- """Base Publisher class"""
+ """Base Publisher class."""
def __init__(self, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
self.reconnect(session)
def reconnect(self, session):
- """Re-establish the Sender after a reconnection"""
+ """Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
+ def _pack_json_msg(self, msg):
+ """Qpid cannot serialize dicts containing strings longer than 65535
+ characters. This function dumps the message content to a JSON
+ string, which Qpid is able to handle.
+
+ :param msg: May be either a Qpid Message object or a bare dict.
+ :returns: A Qpid Message with its content field JSON encoded.
+ """
+ try:
+ msg.content = jsonutils.dumps(msg.content)
+ except AttributeError:
+ # Need to have a Qpid message so we can set the content_type.
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
+ msg.content_type = JSON_CONTENT_TYPE
+ return msg
+
def send(self, msg):
- """Send a message"""
+ """Send a message."""
+ try:
+ # Check if Qpid can encode the message
+ check_msg = msg
+ if not hasattr(check_msg, 'content_type'):
+ check_msg = qpid_messaging.Message(msg)
+ content_type = check_msg.content_type
+ enc, dec = qpid_messaging.message.get_codec(content_type)
+ enc(check_msg.content)
+ except qpid_codec.CodecException:
+ # This means the message couldn't be serialized as a dict.
+ msg = self._pack_json_msg(msg)
self.sender.send(msg)
class DirectPublisher(Publisher):
- """Publisher class for 'direct'"""
+ """Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
class TopicPublisher(Publisher):
- """Publisher class for 'topic'"""
+ """Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'"""
+ """Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
class NotifyPublisher(Publisher):
- """Publisher class for notifications"""
+ """Publisher class for notifications."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
return self.consumers[str(receiver)]
def reconnect(self):
- """Handles reconnecting and re-establishing sessions and queues"""
+ """Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
self.reconnect()
def close(self):
- """Close/release this connection"""
+ """Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
- self.connection.close()
+ try:
+ self.connection.close()
+ except Exception:
+ # NOTE(dripton) Logging exceptions that happen during cleanup just
+ # causes confusion; there's really nothing useful we can do with
+ # them.
+ pass
self.connection = None
def reset(self):
- """Reset a connection so it can be used again"""
+ """Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers"""
+ """Return an iterator that will consume from all queues/consumers."""
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
+ """Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg):
- """Send to a publisher based on the publisher class"""
+ """Send to a publisher based on the publisher class."""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
topic, callback)
def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer"""
+ """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
- """Send a 'direct' message"""
+ """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message"""
+ """Send a 'topic' message."""
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
- """Send a 'fanout' message"""
+ """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic"""
+ """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg)
def consume(self, limit=None):
- """Consume from all queues/consumers"""
+ """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
return
def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
+ """Consumer from all queues/consumers in a greenthread."""
def _consumer_thread():
try:
self.consume()
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
+ """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
return consumer
def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object"""
+ """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
-from heat.openstack.common import processutils as utils
from heat.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
def _serialize(data):
- """
- Serialization wrapper
+ """Serialization wrapper.
+
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
def _deserialize(data):
- """
- Deserialization wrapper
- """
+ """Deserialization wrapper."""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object):
- """
- A tiny wrapper around ZeroMQ to simplify the send/recv protocol
- and connection management.
+ """A tiny wrapper around ZeroMQ.
+ Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
- def recv(self):
+ def recv(self, **kwargs):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart()
+ return self.sock.recv_multipart(**kwargs)
- def send(self, data):
+ def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data)
+ self.sock.send_multipart(data, **kwargs)
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=None, bind=False):
- if socket_type is None:
- socket_type = zmq.PUSH
- self.outq = ZmqSocket(addr, socket_type, bind=bind)
+ def __init__(self, addr):
+ self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
- def cast(self, msg_id, topic, data, envelope=False):
+ def cast(self, msg_id, topic, data, envelope):
msg_id = msg_id or 0
if not envelope:
class ZmqBaseReactor(ConsumerBase):
- """
- A consumer class implementing a
- centralized casting broker (PULL-PUSH)
- for RoundRobin requests.
+ """A consumer class implementing a centralized casting broker (PULL-PUSH).
+
+ Used for RoundRobin requests.
"""
def __init__(self, conf):
class ZmqProxy(ZmqBaseReactor):
- """
- A consumer class implementing a
- topic-based proxy, forwarding to
- IPC sockets.
+ """A consumer class implementing a topic-based proxy.
+
+ Forwards to IPC sockets.
"""
def __init__(self, conf):
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
- #TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- topic = data[1]
-
- LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+ data = sock.recv(copy=False)
+ topic = data[1].bytes
if topic.startswith('fanout~'):
sock_type = zmq.PUB
while(True):
data = self.topic_proxy[topic].get()
- out_sock.send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
- {'data': data})
+ out_sock.send(data, copy=False)
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
self.topic_proxy[topic].put_nowait(data)
- LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
- {'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
- """Runs the ZmqProxy service"""
+ """Runs the ZmqProxy service."""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
- if not os.path.isdir(ipc_dir):
- try:
- utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
- utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
- ipc_dir, run_as_root=True)
- utils.execute('chmod', '750', ipc_dir, run_as_root=True)
- except utils.ProcessExecutionError:
+ try:
+ os.makedirs(ipc_dir)
+ except os.error:
+ if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
- LOG.error(_("Could not create IPC directory %s") %
- (ipc_dir, ))
-
+ LOG.error(_("Required IPC directory does not exist at"
+ " %s") % (ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
+ if os.access(ipc_dir, os.X_OK):
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Permission denied to IPC directory at"
+ " %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
+
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
class ZmqReactor(ZmqBaseReactor):
- """
- A consumer class implementing a
- consumer for messages. Can also be
- used as a 1:1 proxy
+ """A consumer class implementing a consumer for messages.
+
+ Can also be used as a 1:1 proxy
"""
def __init__(self, conf):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
- """
- Wraps the sending of messages,
- dispatches to the matchmaker and sends
- message to all relevant hosts.
+ """Wraps the sending of messages.
+
+ Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
def notify(conf, context, topic, msg, envelope):
- """
- Send notification event.
+ """Send notification event.
+
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
class Exchange(object):
- """
- Implements lookups.
+ """Implements lookups.
+
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
class Binding(object):
- """
- A binding on which to perform a lookup.
- """
+ """A binding on which to perform a lookup."""
def __init__(self):
pass
class MatchMakerBase(object):
- """
- Match Maker Base Class.
- Build off HeartbeatMatchMakerBase if building a
- heartbeat-capable MatchMaker.
+ """Match Maker Base Class.
+
+ Build off HeartbeatMatchMakerBase if building a heartbeat-capable
+ MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
'registration or heartbeat.')
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
- """
- Acknowledge that a key.host is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a key.host is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publically to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
pass
def is_alive(self, topic, host):
- """
- Checks if a host is alive.
- """
+ """Checks if a host is alive."""
pass
def expire(self, topic, host):
- """
- Explicitly expire a host's registration.
- """
+ """Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
pass
def start_heartbeat(self):
- """
- Spawn heartbeat greenthread.
- """
+ """Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
class HeartbeatMatchMakerBase(MatchMakerBase):
- """
- Base for a heart-beat capable MatchMaker.
- Provides common methods for registering,
- unregistering, and maintaining heartbeats.
+ """Base for a heart-beat capable MatchMaker.
+
+ Provides common methods for registering, unregistering, and maintaining
+ heartbeats.
"""
def __init__(self):
self.hosts = set()
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
self.ack_alive(key, host)
def ack_alive(self, key, host):
- """
- Acknowledge that a host.topic is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a host.topic is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publically to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
- """
- Implements registration logic.
+ """Implements registration logic.
+
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
- """
- Implements de-registration logic.
+ """Implements de-registration logic.
+
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.ack_alive(key, host)
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
{'key': key, 'host': host})
def start_heartbeat(self):
- """
- Implementation of MatchMakerBase.start_heartbeat
+ """Implementation of MatchMakerBase.start_heartbeat.
+
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
- """
- Specifies a host in the key via a '.' character
+ """Specifies a host in the key via a '.' character.
+
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
class TopicBinding(Binding):
- """
- Where a 'bare' key without dots.
+ """Where a 'bare' key without dots.
+
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
class DirectExchange(Exchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
class MatchMakerLocalhost(MatchMakerBase):
- """
- Match Maker where all bare topics resolve to localhost.
+ """Match Maker where all bare topics resolve to localhost.
+
Useful for testing.
"""
def __init__(self, host='localhost'):
class MatchMakerStub(MatchMakerBase):
- """
- Match Maker where topics are untouched.
+ """Match Maker where topics are untouched.
+
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
- super(MatchMakerLocalhost, self).__init__()
+ super(MatchMakerStub, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
class RedisTopicExchange(RedisExchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
class RedisFanoutExchange(RedisExchange):
- """
- Return a list of all hosts.
- """
+ """Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
- """
- MatchMaker registering and looking-up hosts with a Redis server.
- """
+ """MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
class RingExchange(mm.Exchange):
- """
- Match Maker where hosts are loaded from a static file containing
- a hashmap (JSON formatted).
+ """Match Maker where hosts are loaded from a static JSON formatted file.
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
class MatchMakerRing(mm.MatchMakerBase):
- """
- Match Maker where hosts are loaded from a static hashmap.
- """
+ """Match Maker where hosts are loaded from a static hashmap."""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
"""Return the topic to use for a message."""
return topic if topic else self.topic
+ def can_send_version(self, version):
+ """Check to see if a version is compatible with the version cap."""
+ return (not self.version_cap or
+ rpc_common.version_is_compatible(self.version_cap, version))
+
@staticmethod
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
class Serializer(object):
- """Generic (de-)serialization definition base class"""
+ """Generic (de-)serialization definition base class."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
class NoOpSerializer(Serializer):
- """A serializer that does nothing"""
+ """A serializer that does nothing."""
def serialize_entity(self, context, entity):
return entity
class Service(service.Service):
"""Service object for binaries running on hosts.
- A service enables rpc by listening to queues based on topic and host."""
+ A service enables rpc by listening to queues based on topic and host.
+ """
def __init__(self, host, topic, manager=None):
super(Service, self).__init__()
self.host = host
return wrap
def wait(self):
- """Loop waiting on children to die and respawning as necessary"""
+ """Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _thread_done(gt, *args, **kwargs):
- """ Callback function to be passed to GreenThread.link() when we spawn()
+ """Callback function to be passed to GreenThread.link() when we spawn()
Calls the :class:`ThreadGroup` to notify if.
"""
class Thread(object):
- """ Wrapper around a greenthread, that holds a reference to the
+ """Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
class ThreadGroup(object):
- """ The point of the ThreadGroup classis to:
+ """The point of the ThreadGroup classis to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
def isotime(at=None, subsecond=False):
- """Stringify time in ISO 8601 format"""
+ """Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
def parse_isotime(timestr):
- """Parse time from ISO 8601 format"""
+ """Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
def normalize_time(timestamp):
- """Normalize time in arbitrary timezone to UTC naive object"""
+ """Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
def iso8601_from_timestamp(timestamp):
- """Returns a iso8601 formated date from timestamp"""
+ """Returns a iso8601 formated date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
def set_time_override(override_time=datetime.datetime.utcnow()):
- """
- Override utils.utcnow to return a constant time or a list thereof,
- one at a time.
+ """Overrides utils.utcnow.
+
+ Make it return a constant time or a list thereof, one at a time.
"""
utcnow.override_time = override_time
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
- Note: tzinfo is stripped, but not required for relative times."""
+ Note: tzinfo is stripped, but not required for relative times.
+ """
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
def delta_seconds(before, after):
- """
+ """Return the difference between two timing objects.
+
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
def is_soon(dt, window):
- """
- Determines if time is going to happen in the next window seconds.
+ """Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
import install_venv_common as install_venv
+def first_file(file_list):
+ for candidate in file_list:
+ if os.path.exists(candidate):
+ return candidate
+
+
def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
venv = os.environ['VIRTUAL_ENV']
- pip_requires = os.path.join(root, 'tools', 'pip-requires')
- test_requires = os.path.join(root, 'tools', 'test-requires')
+ pip_requires = first_file([
+ os.path.join(root, 'requirements.txt'),
+ os.path.join(root, 'tools', 'pip-requires'),
+ ])
+ test_requires = first_file([
+ os.path.join(root, 'test-requirements.txt'),
+ os.path.join(root, 'tools', 'test-requires'),
+ ])
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
project = 'heat'
install = install_venv.InstallVenv(root, venv, pip_requires, test_requires,