"""
- deferred_string = version.deferred_version_string(prefix='%prog ')
- oparser = optparse.OptionParser(version=str(deferred_string),
+ version_string = version.version_string()
+ oparser = optparse.OptionParser(version=version_string,
usage=usage.strip())
create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:])
metric-put-data Publish data-point for specified metric
"""
- deferred_string = version.deferred_version_string(prefix='%prog ')
- oparser = optparse.OptionParser(version=str(deferred_string),
+ version_string = version.version_string()
+ oparser = optparse.OptionParser(version=version_string,
usage=usage.strip())
create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:])
from heat.common import wsgi
from heat.openstack.common import cfg
+from heat.openstack.common import rpc
DEFAULT_PORT = 8000
help='Name of the engine node. '
'This can be an opaque identifier.'
'It is not necessarily a hostname, FQDN, or IP address.'),
- cfg.StrOpt('control_exchange',
- default='heat',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('engine_topic',
default='engine',
help='the topic engine nodes listen on')]
def register_api_opts():
cfg.CONF.register_opts(bind_opts)
cfg.CONF.register_opts(rpc_opts)
+ rpc.set_defaults(control_exchange='heat')
def register_engine_opts():
cfg.CONF.register_opts(db_opts)
cfg.CONF.register_opts(service_opts)
cfg.CONF.register_opts(rpc_opts)
+ rpc.set_defaults(control_exchange='heat')
def setup_logging():
# stg == "Stack Thread Groups"
self.stg = {}
- def _start_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+ def _start_in_thread(self, stack_id, func, *args, **kwargs):
if stack_id not in self.stg:
- thr_name = '%s-%s' % (stack_name, stack_id)
- self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+ self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id].add_thread(func, *args, **kwargs)
- def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+ def _timer_in_thread(self, stack_id, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.stg:
- thr_name = '%s-%s' % (stack_name, stack_id)
- self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+ self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
admin_context = context.get_admin_context()
stacks = db_api.stack_get_all(admin_context)
for s in stacks:
- self._timer_in_thread(s.id, s.name,
- self._periodic_watcher_task,
- sid=s.id)
+ self._timer_in_thread(s.id, self._periodic_watcher_task, sid=s.id)
@request_context
def identify_stack(self, context, stack_name):
stack_id = stack.store()
- self._start_in_thread(stack_id, stack_name, stack.create)
+ self._start_in_thread(stack_id, stack.create)
# Schedule a periodic watcher task for this stack
- self._timer_in_thread(stack_id, stack_name,
- self._periodic_watcher_task,
+ self._timer_in_thread(stack_id, self._periodic_watcher_task,
sid=stack_id)
return dict(stack.identifier())
if response:
return {'Description': response}
- self._start_in_thread(db_stack.id, db_stack.name,
- current_stack.update,
- updated_stack)
+ self._start_in_thread(db_stack.id, current_stack.update, updated_stack)
return dict(current_stack.identifier())
Positional command line arguments are supported via a 'positional' Opt
constructor argument::
- >>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True))
+ >>> conf = ConfigOpts()
+ >>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
True
- >>> CONF(['a', 'b'])
- >>> CONF.bar
+ >>> conf(['a', 'b'])
+ >>> conf.bar
['a', 'b']
It is also possible to use argparse "sub-parsers" to parse additional
... list_action = subparsers.add_parser('list')
... list_action.add_argument('id')
...
- >>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
+ >>> conf = ConfigOpts()
+ >>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
True
- >>> CONF(['list', '10'])
- >>> CONF.action.name, CONF.action.id
+ >>> conf(args=['list', '10'])
+ >>> conf.action.name, conf.action.id
('list', '10')
"""
return False
+def set_defaults(opts, **kwargs):
+ for opt in opts:
+ if opt.dest in kwargs:
+ opt.default = kwargs[opt.dest]
+ break
+
+
class Opt(object):
"""Base class for all configuration options.
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser."""
- return [v.split(',') for v in
+ return [[a.strip() for a in v.split(',')] for v in
self._cparser_get_with_deprecated(cparser, section)]
def _get_argparse_kwargs(self, group, **kwargs):
BoolOpt('debug',
short='d',
default=False,
- help='Print debugging output'),
+ help='Print debugging output (set logging level to '
+ 'DEBUG instead of default WARNING level).'),
BoolOpt('verbose',
short='v',
default=False,
- help='Print more verbose output'),
+ help='Print more verbose output (set logging level to '
+ 'INFO instead of default WARNING level).'),
]
logging_cli_opts = [
'Default: %(default)s'),
StrOpt('log-file',
metavar='PATH',
+ deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
StrOpt('log-dir',
+ deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
- '(will be prepended to --logfile)'),
+ '(will be prepended to --log-file)'),
BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
- '%(user)s %(tenant)s] %(instance)s'
+ default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
+ '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
- default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
- ' %(instance)s%(message)s',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
- default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
+ default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
+ '%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
_setup_logging_from_conf(product_name)
+def set_defaults(logging_context_format_string):
+ cfg.set_defaults(log_opts,
+ logging_context_format_string=
+ logging_context_format_string)
+
+
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
- if CONF.verbose or CONF.debug:
+ if CONF.debug:
log_root.setLevel(logging.DEBUG)
- else:
+ elif CONF.verbose:
log_root.setLevel(logging.INFO)
+ else:
+ log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:
default=['heat.openstack.common.exception',
'nova.exception',
'cinder.exception',
+ 'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- #
- # The following options are not registered here, but are expected to be
- # present. The project using this library must register these options with
- # the configuration so that project-specific defaults may be defined.
- #
- #cfg.StrOpt('control_exchange',
- # default='nova',
- # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.StrOpt('control_exchange',
+ default='openstack',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
+def set_defaults(control_exchange):
+ cfg.set_defaults(rpc_opts,
+ control_exchange=control_exchange)
+
+
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
from eventlet import pools
from eventlet import semaphore
-from heat.openstack.common import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import local
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False):
+ ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
- failure = rpc_common.serialize_remote_exception(failure)
+ failure = rpc_common.serialize_remote_exception(failure,
+ log_failure)
try:
msg = {'result': reply, 'failure': failure}
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
- connection_pool=None):
+ connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending)
+ ending, log_failure)
if ending:
self.msg_id = None
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
+ except rpc_common.ClientException as e:
+ LOG.debug(_('Expected exception during message handling (%s)') %
+ e._exc_info[1])
+ ctxt.reply(None, e._exc_info,
+ connection_pool=self.connection_pool,
+ log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
def get_control_exchange(conf):
- try:
- return conf.control_exchange
- except cfg.NoSuchOptError:
- return 'openstack'
+ return conf.control_exchange
# under the License.
import copy
+import sys
import traceback
+from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'heat.version': <RPC Envelope Version as a String>,
+ 'heat.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'heat.version'
+_MESSAGE_KEY = 'heat.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',), }
+ SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+ 'run_instance': [('args', 'admin_password')],
+ 'route_message': [('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'password'),
+ ('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
msg_data = copy.deepcopy(msg_data)
if has_method:
- method = msg_data['method']
- if method in SANITIZE:
- args_to_sanitize = SANITIZE[method]
- for arg in args_to_sanitize:
- try:
- msg_data['args'][arg] = "<SANITIZED>"
- except KeyError:
- pass
+ for arg in SANITIZE.get(msg_data['method'], []):
+ try:
+ d = msg_data
+ for elem in arg[:-1]:
+ d = d[elem]
+ d[arg[-1]] = '<SANITIZED>'
+ except KeyError, e:
+ LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+ {'item': arg,
+ 'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
- LOG.error(tb)
+ if log_failure:
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
context.values['read_deleted'] = read_deleted
return context
+
+
+class ClientException(Exception):
+ """This encapsulates some actual exception that is expected to be
+ hit by an RPC proxy object. Merely instantiating it records the
+ current exception information, which will be passed back to the
+ RPC client without exceptional logging."""
+ def __init__(self):
+ self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, e:
+ if type(e) in exceptions:
+ raise ClientException()
+ else:
+ raise
+
+
+def client_exceptions(*exceptions):
+ """Decorator for manager methods that raise expected exceptions.
+ Marking a Manager method with this decorator allows the declaration
+ of expected exceptions that the RPC layer should not consider fatal,
+ and not log as if they were generated in a real error scenario. Note
+ that this will cause listed exceptions to be wrapped in a
+ ClientException, which is used internally by the RPC layer."""
+ def outer(func):
+ def inner(*args, **kwargs):
+ return catch_client_exception(exceptions, func, *args, **kwargs)
+ return inner
+ return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
else:
res.append(rval)
done.send(res)
+ except rpc_common.ClientException as e:
+ done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
pass
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
check_serialize(msg)
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
# Default options
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
"""
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
- self.connection.close()
+ self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
import eventlet
import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
+ if server_params and 'hostname' in server_params:
+ # NOTE(russellb) This enables support for cast_to_server.
+ server_params['qpid_hosts'] = [
+ '%s:%d' % (server_params['hostname'],
+ server_params.get('port', 5672))
+ ]
+
params = {
'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
attempt = 0
try:
self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
+ if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
# License for the specific language governing permissions and limitations
# under the License.
+import os
import pprint
import socket
import string
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
+from heat.openstack.common import processutils as utils
from heat.openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
]
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = ZMQ_CTX.socket(zmq_type)
+ self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
pass
self.subscriptions = []
- # Linger -1 prevents lost/dropped messages
try:
- self.sock.close(linger=-1)
+ # Default is to linger
+ self.sock.close()
except Exception:
- pass
+ # While this is a bad thing to happen,
+ # it would be much worse if some of the code calling this
+ # were to fail. For now, lets log, and later evaluate
+ # if we can safely raise here.
+ LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ if serialize:
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
try:
result = proxy.dispatch(
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
+ except rpc_common.ClientException, e:
+ LOG.debug(_("Expected exception during message handling (%s)") %
+ e._exc_info[1])
+ return {'exc':
+ rpc_common.serialize_remote_exception(e._exc_info,
+ log_failure=False)}
except Exception:
+ LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
return
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
+
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+ serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, msg_id, topic, payload,
+ serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
- queues = matchmaker.queues(topic)
+ queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, _topic, msg, timeout,
+ serialize, force_envelope)
def create_connection(conf, new=True):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
+ if ZMQ_CTX:
+ ZMQ_CTX.term()
+ ZMQ_CTX = None
+
global matchmaker
matchmaker = None
- ZMQ_CTX.term()
- ZMQ_CTX = None
-def register_opts(conf):
- """Registration of options for this driver."""
- #NOTE(ewindisch): ZMQ_CTX and matchmaker
- # are initialized here as this is as good
- # an initialization method as any.
+def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
- # We memoize through these globals
global ZMQ_CTX
- global matchmaker
- global CONF
-
- if not CONF:
- conf.register_opts(zmq_opts)
- CONF = conf
- # Don't re-set, if this method is called twice.
if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+ return ZMQ_CTX
+
+
+def _get_matchmaker():
+ global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+ return matchmaker
:returns: None
"""
- self._services = threadgroup.ThreadGroup('launcher')
+ self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
- self.tg = threadgroup.ThreadGroup('service', threads)
+ self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Utilities with minimum-depends for use in setup.py
"""
-import datetime
+import email
import os
import re
import subprocess
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
- l = l.strip()
- if not l.startswith('#') and ' ' in l:
- canonical_email, alias = [x for x in l.split(' ')
- if x.startswith('<')]
- mapping[alias] = canonical_email
+ try:
+ canonical_email, alias = re.match(
+ r'[^#]*?(<.+>).*(<.+>).*', l).groups()
+ except AttributeError:
+ continue
+ mapping[alias] = canonical_email
return mapping
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
- for alias, email in mapping.iteritems():
- changelog = changelog.replace(alias, email)
+ for alias, email_address in mapping.iteritems():
+ changelog = changelog.replace(alias, email_address)
return changelog
return dependency_links
-def write_requirements():
- venv = os.environ.get('VIRTUAL_ENV', None)
- if venv is not None:
- with open("requirements.txt", "w") as req_file:
- output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
- stdout=subprocess.PIPE)
- requirements = output.communicate()[0].strip()
- req_file.write(requirements)
-
-
def _run_shell_command(cmd):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
return out[0].strip()
-def _get_git_next_version_suffix(branch_name):
- datestamp = datetime.datetime.now().strftime('%Y%m%d')
- if branch_name == 'milestone-proposed':
- revno_prefix = "r"
- else:
- revno_prefix = ""
- _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
- milestone_cmd = "git show meta/openstack/release:%s" % branch_name
- milestonever = _run_shell_command(milestone_cmd)
- if milestonever:
- first_half = "%s~%s" % (milestonever, datestamp)
- else:
- first_half = datestamp
-
- post_version = _get_git_post_version()
- # post version should look like:
- # 0.1.1.4.gcc9e28a
- # where the bit after the last . is the short sha, and the bit between
- # the last and second to last is the revno count
- (revno, sha) = post_version.split(".")[-2:]
- second_half = "%s%s.%s" % (revno_prefix, revno, sha)
- return ".".join((first_half, second_half))
-
-
-def _get_git_current_tag():
- return _run_shell_command("git tag --contains HEAD")
-
-
-def _get_git_tag_info():
- return _run_shell_command("git describe --tags")
-
-
-def _get_git_post_version():
- current_tag = _get_git_current_tag()
- if current_tag is not None:
- return current_tag
- else:
- tag_info = _get_git_tag_info()
- if tag_info is None:
- base_version = "0.0"
- cmd = "git --no-pager log --oneline"
- out = _run_shell_command(cmd)
- revno = len(out.split("\n"))
- sha = _run_shell_command("git describe --always")
- else:
- tag_infos = tag_info.split("-")
- base_version = "-".join(tag_infos[:-2])
- (revno, sha) = tag_infos[-2:]
- return "%s.%s.%s" % (base_version, revno, sha)
-
-
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
"""
-def read_versioninfo(project):
- """Read the versioninfo file. If it doesn't exist, we're in a github
- zipball, and there's really no way to know what version we really
- are, but that should be ok, because the utility of that should be
- just about nil if this code path is in use in the first place."""
- versioninfo_path = os.path.join(project, 'versioninfo')
- if os.path.exists(versioninfo_path):
- with open(versioninfo_path, 'r') as vinfo:
- version = vinfo.read().strip()
- else:
- version = "0.0.0"
- return version
-
-
-def write_versioninfo(project, version):
- """Write a simple file containing the version of the package."""
- with open(os.path.join(project, 'versioninfo'), 'w') as fil:
- fil.write("%s\n" % version)
-
-
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
+
+ builders = ['html', 'man']
+
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
- for builder in ['html', 'man']:
+ for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
+
+ class LocalBuildLatex(LocalBuildDoc):
+ builders = ['latex']
+
cmdclass['build_sphinx'] = LocalBuildDoc
+ cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
return cmdclass
-def get_git_branchname():
- for branch in _run_shell_command("git branch --color=never").split("\n"):
- if branch.startswith('*'):
- _branch_name = branch.split()[1].strip()
- if _branch_name == "(no":
- _branch_name = "no-branch"
- return _branch_name
-
-
-def get_pre_version(projectname, base_version):
- """Return a version which is leading up to a version that will
- be released in the future."""
- if os.path.isdir('.git'):
- current_tag = _get_git_current_tag()
- if current_tag is not None:
- version = current_tag
- else:
- branch_name = os.getenv('BRANCHNAME',
- os.getenv('GERRIT_REFNAME',
- get_git_branchname()))
- version_suffix = _get_git_next_version_suffix(branch_name)
- version = "%s~%s" % (base_version, version_suffix)
- write_versioninfo(projectname, version)
- return version
- else:
- version = read_versioninfo(projectname)
- return version
-
-
-def get_post_version(projectname):
+def get_version_from_git():
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
if os.path.isdir('.git'):
- version = _get_git_post_version()
- write_versioninfo(projectname, version)
+ return _run_shell_command(
+ "git describe --always").replace('-', '.')
+ return None
+
+
+def get_version_from_pkg_info(package_name):
+ """Get the version from PKG-INFO file if we can."""
+ try:
+ pkg_info_file = open('PKG-INFO', 'r')
+ except (IOError, OSError):
+ return None
+ try:
+ pkg_info = email.message_from_file(pkg_info_file)
+ except email.MessageError:
+ return None
+ # Check to make sure we're in our own dir
+ if pkg_info.get('Name', None) != package_name:
+ return None
+ return pkg_info.get('Version', None)
+
+
+def get_version(package_name):
+ """Get the version of the project. First, try getting it from PKG-INFO, if
+ it exists. If it does, that means we're in a distribution tarball or that
+ install has happened. Otherwise, if there is no PKG-INFO file, pull the
+ version from git.
+
+ We do not support setup.py version sanity in git archive tarballs, nor do
+ we support packagers directly sucking our git repo into theirs. We expect
+ that a source tarball be made from our git repo - or that if someone wants
+ to make a source tarball from a fork of our repo with additional tags in it
+ that they understand and desire the results of doing that.
+ """
+ version = get_version_from_pkg_info(package_name)
+ if version:
+ return version
+ version = get_version_from_git()
+ if version:
return version
- return read_versioninfo(projectname)
+ raise Exception("Versioning for this project requires either an sdist"
+ " tarball, or access to an upstream git repository.")
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
- def __init__(self, name, thread, group):
- self.name = name
+ def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
when need be).
* provide an easy API to add timers.
"""
- def __init__(self, name, thread_pool_size=10):
- self.name = name
+ def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
- th = Thread(callback.__name__, gt, self)
+ th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
+ if isinstance(before, basestring):
+ before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
+ if isinstance(after, basestring):
+ after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# under the License.
"""
-Utilities for consuming the auto-generated versioninfo files.
+Utilities for consuming the version from pkg_resources.
"""
-import datetime
import pkg_resources
-import setup
-
-
-class _deferred_version_string(str):
- """Internal helper class which provides delayed version calculation."""
-
- def __new__(cls, version_info, prefix):
- new_obj = str.__new__(cls, "")
- new_obj._version_info = version_info
- new_obj._prefix = prefix
- new_obj._cached_version = None
- return new_obj
-
- def _get_cached_version(self):
- if not self._cached_version:
- self._cached_version = \
- "%s%s" % (self._prefix,
- self._version_info.version_string())
- return self._cached_version
-
- def __len__(self):
- return self._get_cached_version().__len__()
-
- def __contains__(self, item):
- return self._get_cached_version().__contains__(item)
-
- def __getslice__(self, i, j):
- return self._get_cached_version().__getslice__(i, j)
-
- def __str__(self):
- return self._get_cached_version()
-
- def __repr__(self):
- return self._get_cached_version()
-
class VersionInfo(object):
- def __init__(self, package, python_package=None, pre_version=None):
+ def __init__(self, package):
"""Object that understands versioning for a package
- :param package: name of the top level python namespace. For glance,
- this would be "glance" for python-glanceclient, it
- would be "glanceclient"
- :param python_package: optional name of the project name. For
- glance this can be left unset. For
- python-glanceclient, this would be
- "python-glanceclient"
- :param pre_version: optional version that the project is working to
+ :param package: name of the python package, such as glance, or
+ python-glanceclient
"""
self.package = package
- if python_package is None:
- self.python_package = package
- else:
- self.python_package = python_package
- self.pre_version = pre_version
self.version = None
+ self._cached_version = None
- def _generate_version(self):
- """Defer to the openstack.common.setup routines for making a
- version from git."""
- if self.pre_version is None:
- return setup.get_post_version(self.python_package)
- else:
- return setup.get_pre_version(self.python_package, self.pre_version)
-
- def _newer_version(self, pending_version):
- """Check to see if we're working with a stale version or not.
- We expect a version string that either looks like:
- 2012.2~f3~20120708.10.4426392
- which is an unreleased version of a pre-version, or:
- 0.1.1.4.gcc9e28a
- which is an unreleased version of a post-version, or:
- 0.1.1
- Which is a release and which should match tag.
- For now, if we have a date-embedded version, check to see if it's
- old, and if so re-generate. Otherwise, just deal with it.
- """
- try:
- version_date = int(self.version.split("~")[-1].split('.')[0])
- if version_date < int(datetime.date.today().strftime('%Y%m%d')):
- return self._generate_version()
- else:
- return pending_version
- except Exception:
- return pending_version
+ def _get_version_from_pkg_resources(self):
+ """Get the version of the package from the pkg_resources record
+ associated with the package."""
+ requirement = pkg_resources.Requirement.parse(self.package)
+ provider = pkg_resources.get_provider(requirement)
+ return provider.version
- def version_string_with_vcs(self, always=False):
+ def version_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
-
- For instance, if we are working towards the 2012.2 release,
- canonical_version_string should return 2012.2 if this is a final
- release, or else something like 2012.2~f1~20120705.20 if it's not.
-
- :param always: if true, skip all version caching
"""
- if always:
- self.version = self._generate_version()
-
if self.version is None:
-
- requirement = pkg_resources.Requirement.parse(self.python_package)
- versioninfo = "%s/versioninfo" % self.package
- try:
- raw_version = pkg_resources.resource_string(requirement,
- versioninfo)
- self.version = self._newer_version(raw_version.strip())
- except (IOError, pkg_resources.DistributionNotFound):
- self.version = self._generate_version()
+ self.version = self._get_version_from_pkg_resources()
return self.version
- def canonical_version_string(self, always=False):
- """Return the simple version of the package excluding any suffixes.
-
- For instance, if we are working towards the 2012.2 release,
- canonical_version_string should return 2012.2 in all cases.
-
- :param always: if true, skip all version caching
- """
- return self.version_string_with_vcs(always).split('~')[0]
+ # Compatibility functions
+ canonical_version_string = version_string
+ version_string_with_vcs = version_string
- def version_string(self, always=False):
- """Return the base version of the package.
-
- For instance, if we are working towards the 2012.2 release,
- version_string should return 2012.2 if this is a final release, or
- 2012.2-dev if it is not.
-
- :param always: if true, skip all version caching
- """
- version_parts = self.version_string_with_vcs(always).split('~')
- if len(version_parts) == 1:
- return version_parts[0]
- else:
- return '%s-dev' % (version_parts[0],)
-
- def deferred_version_string(self, prefix=""):
+ def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
- return _deferred_version_string(self, prefix)
+ if not self._cached_version:
+ self._cached_version = "%s%s" % (prefix,
+ self.version_string())
+ return self._cached_version
stack.validate().AndReturn(None)
self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
- name_match = mox.StrContains(stack.name)
- threadgroup.ThreadGroup(name_match).AndReturn(DummyThreadGroup())
+ threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
self.m.ReplayAll()
from heat.openstack.common import version as common_version
-NEXT_VERSION = '2013.1'
-version_info = common_version.VersionInfo('heat', pre_version=NEXT_VERSION)
+version_info = common_version.VersionInfo('heat')
import setuptools
from heat.openstack.common import setup
-from heat.version import version_info as version
requires = setup.parse_requirements()
+project = 'heat'
+
setuptools.setup(
- name='heat',
- version=version.canonical_version_string(always=True),
+ name=project,
+ version=setup.get_version(project),
description='The heat project provides services for provisioning '
'virtual machines',
license='Apache License (2.0)',