]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
openstack/common : rebase to latest oslo
authorSteven Hardy <shardy@redhat.com>
Fri, 18 Jan 2013 11:26:01 +0000 (11:26 +0000)
committerSteven Hardy <shardy@redhat.com>
Fri, 18 Jan 2013 15:31:45 +0000 (15:31 +0000)
Rebased to latest oslo, now at 8837013, note a few interfaces
changed requiring a some tweaks to the heat code

Change-Id: I7dfc634b7c459edebca19dee522b396e3736aecc
Signed-off-by: Steven Hardy <shardy@redhat.com>
22 files changed:
bin/heat-cfn
bin/heat-watch
heat/common/config.py
heat/engine/service.py
heat/openstack/common/cfg.py
heat/openstack/common/log.py
heat/openstack/common/rpc/__init__.py
heat/openstack/common/rpc/amqp.py
heat/openstack/common/rpc/common.py
heat/openstack/common/rpc/dispatcher.py
heat/openstack/common/rpc/impl_fake.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/rpc/impl_zmq.py
heat/openstack/common/service.py
heat/openstack/common/setup.py
heat/openstack/common/threadgroup.py
heat/openstack/common/timeutils.py
heat/openstack/common/version.py
heat/tests/test_engine_service.py
heat/version.py
setup.py

index 8157a6fede27e6375c8adc86f3d924dc971ea412..2ec7d336da319901758165c248c912f3f3ff0b91 100755 (executable)
@@ -637,8 +637,8 @@ Commands:
 
 """
 
-    deferred_string = version.deferred_version_string(prefix='%prog ')
-    oparser = optparse.OptionParser(version=str(deferred_string),
+    version_string = version.version_string()
+    oparser = optparse.OptionParser(version=version_string,
                                     usage=usage.strip())
     create_options(oparser)
     (opts, cmd, args) = parse_options(oparser, sys.argv[1:])
index b2c4280ff0879f29defbb5a68f801172572959cf..ff67ee890102e0807aa23251c823c120a2e7e3ed 100755 (executable)
@@ -255,8 +255,8 @@ Commands:
     metric-put-data             Publish data-point for specified  metric
 
 """
-    deferred_string = version.deferred_version_string(prefix='%prog ')
-    oparser = optparse.OptionParser(version=str(deferred_string),
+    version_string = version.version_string()
+    oparser = optparse.OptionParser(version=version_string,
                                     usage=usage.strip())
     create_options(oparser)
     (opts, cmd, args) = parse_options(oparser, sys.argv[1:])
index 83b5dbead3f795593d71e53f588625ebe8762655..ff80aff73c5662254e1ca2f94d73b2be21e514a7 100644 (file)
@@ -28,6 +28,7 @@ from eventlet.green import socket
 
 from heat.common import wsgi
 from heat.openstack.common import cfg
+from heat.openstack.common import rpc
 
 DEFAULT_PORT = 8000
 
@@ -102,9 +103,6 @@ rpc_opts = [
                help='Name of the engine node. '
                     'This can be an opaque identifier.'
                     'It is not necessarily a hostname, FQDN, or IP address.'),
-    cfg.StrOpt('control_exchange',
-               default='heat',
-               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
     cfg.StrOpt('engine_topic',
                default='engine',
                help='the topic engine nodes listen on')]
@@ -113,6 +111,7 @@ rpc_opts = [
 def register_api_opts():
     cfg.CONF.register_opts(bind_opts)
     cfg.CONF.register_opts(rpc_opts)
+    rpc.set_defaults(control_exchange='heat')
 
 
 def register_engine_opts():
@@ -120,6 +119,7 @@ def register_engine_opts():
     cfg.CONF.register_opts(db_opts)
     cfg.CONF.register_opts(service_opts)
     cfg.CONF.register_opts(rpc_opts)
+    rpc.set_defaults(control_exchange='heat')
 
 
 def setup_logging():
index 907808bf9c44345415c3498ade70b9f9426c0781..a45c3d4444bb6e13a7e8eade34c1e8b94422cae5 100644 (file)
@@ -63,20 +63,18 @@ class EngineService(service.Service):
         # stg == "Stack Thread Groups"
         self.stg = {}
 
-    def _start_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+    def _start_in_thread(self, stack_id, func, *args, **kwargs):
         if stack_id not in self.stg:
-            thr_name = '%s-%s' % (stack_name, stack_id)
-            self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+            self.stg[stack_id] = threadgroup.ThreadGroup()
         self.stg[stack_id].add_thread(func, *args, **kwargs)
 
-    def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+    def _timer_in_thread(self, stack_id, func, *args, **kwargs):
         """
         Define a periodic task, to be run in a separate thread, in the stack
         threadgroups.  Periodicity is cfg.CONF.periodic_interval
         """
         if stack_id not in self.stg:
-            thr_name = '%s-%s' % (stack_name, stack_id)
-            self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+            self.stg[stack_id] = threadgroup.ThreadGroup()
         self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
                                      func, *args, **kwargs)
 
@@ -102,9 +100,7 @@ class EngineService(service.Service):
         admin_context = context.get_admin_context()
         stacks = db_api.stack_get_all(admin_context)
         for s in stacks:
-            self._timer_in_thread(s.id, s.name,
-                                  self._periodic_watcher_task,
-                                  sid=s.id)
+            self._timer_in_thread(s.id, self._periodic_watcher_task, sid=s.id)
 
     @request_context
     def identify_stack(self, context, stack_name):
@@ -214,11 +210,10 @@ class EngineService(service.Service):
 
         stack_id = stack.store()
 
-        self._start_in_thread(stack_id, stack_name, stack.create)
+        self._start_in_thread(stack_id, stack.create)
 
         # Schedule a periodic watcher task for this stack
-        self._timer_in_thread(stack_id, stack_name,
-                              self._periodic_watcher_task,
+        self._timer_in_thread(stack_id, self._periodic_watcher_task,
                               sid=stack_id)
 
         return dict(stack.identifier())
@@ -257,9 +252,7 @@ class EngineService(service.Service):
         if response:
             return {'Description': response}
 
-        self._start_in_thread(db_stack.id, db_stack.name,
-                              current_stack.update,
-                              updated_stack)
+        self._start_in_thread(db_stack.id, current_stack.update, updated_stack)
 
         return dict(current_stack.identifier())
 
index ac34a954cc5db7943869336a8964bfe9bb37c565..07f9914755fd8ecc61ed410daa417fe113158eac 100644 (file)
@@ -236,10 +236,11 @@ in order to support a common usage pattern in OpenStack::
 Positional command line arguments are supported via a 'positional' Opt
 constructor argument::
 
-    >>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True))
+    >>> conf = ConfigOpts()
+    >>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
     True
-    >>> CONF(['a', 'b'])
-    >>> CONF.bar
+    >>> conf(['a', 'b'])
+    >>> conf.bar
     ['a', 'b']
 
 It is also possible to use argparse "sub-parsers" to parse additional
@@ -249,10 +250,11 @@ command line arguments using the SubCommandOpt class:
     ...     list_action = subparsers.add_parser('list')
     ...     list_action.add_argument('id')
     ...
-    >>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
+    >>> conf = ConfigOpts()
+    >>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
     True
-    >>> CONF(['list', '10'])
-    >>> CONF.action.name, CONF.action.id
+    >>> conf(args=['list', '10'])
+    >>> conf.action.name, conf.action.id
     ('list', '10')
 
 """
@@ -480,6 +482,13 @@ def _is_opt_registered(opts, opt):
         return False
 
 
+def set_defaults(opts, **kwargs):
+    for opt in opts:
+        if opt.dest in kwargs:
+            opt.default = kwargs[opt.dest]
+            break
+
+
 class Opt(object):
 
     """Base class for all configuration options.
@@ -771,7 +780,7 @@ class ListOpt(Opt):
 
     def _get_from_config_parser(self, cparser, section):
         """Retrieve the opt value as a list from ConfigParser."""
-        return [v.split(',') for v in
+        return [[a.strip() for a in v.split(',')] for v in
                 self._cparser_get_with_deprecated(cparser, section)]
 
     def _get_argparse_kwargs(self, group, **kwargs):
@@ -1728,11 +1737,13 @@ class CommonConfigOpts(ConfigOpts):
         BoolOpt('debug',
                 short='d',
                 default=False,
-                help='Print debugging output'),
+                help='Print debugging output (set logging level to '
+                     'DEBUG instead of default WARNING level).'),
         BoolOpt('verbose',
                 short='v',
                 default=False,
-                help='Print more verbose output'),
+                help='Print more verbose output (set logging level to '
+                     'INFO instead of default WARNING level).'),
     ]
 
     logging_cli_opts = [
@@ -1756,11 +1767,13 @@ class CommonConfigOpts(ConfigOpts):
                     'Default: %(default)s'),
         StrOpt('log-file',
                metavar='PATH',
+               deprecated_name='logfile',
                help='(Optional) Name of log file to output to. '
                     'If not set, logging will go to stdout.'),
         StrOpt('log-dir',
+               deprecated_name='logdir',
                help='(Optional) The directory to keep log files in '
-                    '(will be prepended to --logfile)'),
+                    '(will be prepended to --log-file)'),
         BoolOpt('use-syslog',
                 default=False,
                 help='Use syslog for logging.'),
index c7777239790670b01dcb0912b37ce8552add6526..50adedbbc362351cd2c26a8e7ceefcb132dd906d 100644 (file)
@@ -49,19 +49,20 @@ from heat.openstack.common import notifier
 
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
-               default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
-                       '%(user)s %(tenant)s] %(instance)s'
+               default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
+                       '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
                        '%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
-               default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
-                       ' %(instance)s%(message)s',
+               default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+                       '%(name)s [-] %(instance)s%(message)s',
                help='format string to use for log messages without context'),
     cfg.StrOpt('logging_debug_format_suffix',
                default='%(funcName)s %(pathname)s:%(lineno)d',
                help='data to append to log format when level is DEBUG'),
     cfg.StrOpt('logging_exception_prefix',
-               default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
+               default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
+               '%(instance)s',
                help='prefix each line of exception output with this format'),
     cfg.ListOpt('default_log_levels',
                 default=[
@@ -289,6 +290,12 @@ def setup(product_name):
         _setup_logging_from_conf(product_name)
 
 
+def set_defaults(logging_context_format_string):
+    cfg.set_defaults(log_opts,
+                     logging_context_format_string=
+                     logging_context_format_string)
+
+
 def _find_facility_from_conf():
     facility_names = logging.handlers.SysLogHandler.facility_names
     facility = getattr(logging.handlers.SysLogHandler,
@@ -354,10 +361,12 @@ def _setup_logging_from_conf(product_name):
                                                    datefmt=datefmt))
         handler.setFormatter(LegacyFormatter(datefmt=datefmt))
 
-    if CONF.verbose or CONF.debug:
+    if CONF.debug:
         log_root.setLevel(logging.DEBUG)
-    else:
+    elif CONF.verbose:
         log_root.setLevel(logging.INFO)
+    else:
+        log_root.setLevel(logging.WARNING)
 
     level = logging.NOTSET
     for pair in CONF.default_log_levels:
index 8046c216f8fc66ca51006ef560af68615fe04677..cee009818f7f1815b71379facd889e05b2cc8dc9 100644 (file)
@@ -50,25 +50,26 @@ rpc_opts = [
                 default=['heat.openstack.common.exception',
                          'nova.exception',
                          'cinder.exception',
+                         'exceptions',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
                      'upon receiving exception data from an rpc call.'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
-    #
-    # The following options are not registered here, but are expected to be
-    # present. The project using this library must register these options with
-    # the configuration so that project-specific defaults may be defined.
-    #
-    #cfg.StrOpt('control_exchange',
-    #           default='nova',
-    #           help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+    cfg.StrOpt('control_exchange',
+               default='openstack',
+               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
 ]
 
 cfg.CONF.register_opts(rpc_opts)
 
 
+def set_defaults(control_exchange):
+    cfg.set_defaults(rpc_opts,
+                     control_exchange=control_exchange)
+
+
 def create_connection(new=True):
     """Create a connection to the message bus used for rpc.
 
@@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
     return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
 
 
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
     """Send notification event.
 
     :param context: Information that identifies the user that has made this
                     request.
     :param topic: The topic to send the notification to.
     :param msg: This is a dict of content of event.
+    :param envelope: Set to True to enable message envelope for notifications.
 
     :returns: None
     """
-    return _get_impl().notify(cfg.CONF, context, topic, msg)
+    return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
 
 
 def cleanup():
index af661920569c6d7180fc40cc888a0ae8b6903fba..8da736c40d48b6d2d790250d7d04385242a8034f 100644 (file)
@@ -33,7 +33,6 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import semaphore
 
-from heat.openstack.common import cfg
 from heat.openstack.common import excutils
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import local
@@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
 
 
 def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
-              ending=False):
+              ending=False, log_failure=True):
     """Sends a reply or an error on the channel signified by msg_id.
 
     Failure should be a sys.exc_info() tuple.
@@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
     """
     with ConnectionContext(conf, connection_pool) as conn:
         if failure:
-            failure = rpc_common.serialize_remote_exception(failure)
+            failure = rpc_common.serialize_remote_exception(failure,
+                                                            log_failure)
 
         try:
             msg = {'result': reply, 'failure': failure}
@@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
                    'failure': failure}
         if ending:
             msg['ending'] = True
-        conn.direct_send(msg_id, msg)
+        conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
 
 
 class RpcContext(rpc_common.CommonRpcContext):
@@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
         return self.__class__(**values)
 
     def reply(self, reply=None, failure=None, ending=False,
-              connection_pool=None):
+              connection_pool=None, log_failure=True):
         if self.msg_id:
             msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
-                      ending)
+                      ending, log_failure)
             if ending:
                 self.msg_id = None
 
@@ -282,11 +282,21 @@ class ProxyCallback(object):
                 ctxt.reply(rval, None, connection_pool=self.connection_pool)
             # This final None tells multicall that it is done.
             ctxt.reply(ending=True, connection_pool=self.connection_pool)
+        except rpc_common.ClientException as e:
+            LOG.debug(_('Expected exception during message handling (%s)') %
+                      e._exc_info[1])
+            ctxt.reply(None, e._exc_info,
+                       connection_pool=self.connection_pool,
+                       log_failure=False)
         except Exception:
             LOG.exception(_('Exception during message handling'))
             ctxt.reply(None, sys.exc_info(),
                        connection_pool=self.connection_pool)
 
+    def wait(self):
+        """Wait for all callback threads to exit."""
+        self.pool.waitall()
+
 
 class MulticallWaiter(object):
     def __init__(self, conf, connection, timeout):
@@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     # that will continue to use the connection.  When it's done,
     # connection.close() will get called which will put it back into
     # the pool
-    LOG.debug(_('Making asynchronous call on %s ...'), topic)
+    LOG.debug(_('Making synchronous call on %s ...'), topic)
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
     LOG.debug(_('MSG_ID is %s') % (msg_id))
@@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     conn = ConnectionContext(conf, connection_pool)
     wait_msg = MulticallWaiter(conf, conn, timeout)
     conn.declare_direct_consumer(msg_id, wait_msg)
-    conn.topic_send(topic, msg)
+    conn.topic_send(topic, rpc_common.serialize_msg(msg))
     return wait_msg
 
 
@@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
     LOG.debug(_('Making asynchronous cast on %s...'), topic)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
-        conn.topic_send(topic, msg)
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
 
 
 def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
     LOG.debug(_('Making asynchronous fanout cast...'))
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
-        conn.fanout_send(topic, msg)
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
 
 
 def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
-        conn.topic_send(topic, msg)
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
 
 
 def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -402,16 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
-        conn.fanout_send(topic, msg)
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
 
 
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
     """Sends a notification event on a topic."""
     LOG.debug(_('Sending %(event_type)s on %(topic)s'),
               dict(event_type=msg.get('event_type'),
                    topic=topic))
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
+        if envelope:
+            msg = rpc_common.serialize_msg(msg, force_envelope=True)
         conn.notify_send(topic, msg)
 
 
@@ -421,7 +433,4 @@ def cleanup(connection_pool):
 
 
 def get_control_exchange(conf):
-    try:
-        return conf.control_exchange
-    except cfg.NoSuchOptError:
-        return 'openstack'
+    return conf.control_exchange
index b56533dc3908e9ca3ef647655cca7ebad4eba227..6a3614542edf8ec085691edda472c216cccf99a2 100644 (file)
 #    under the License.
 
 import copy
+import sys
 import traceback
 
+from heat.openstack.common import cfg
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
@@ -27,9 +29,50 @@ from heat.openstack.common import local
 from heat.openstack.common import log as logging
 
 
+CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently.  For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc.  This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer.  See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple.  It is:
+
+    {
+        'heat.version': <RPC Envelope Version as a String>,
+        'heat.message': <Application Message Payload, JSON encoded>
+    }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version.  It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload.  The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'heat.version'
+_MESSAGE_KEY = 'heat.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
 class RPCException(Exception):
     message = _("An unknown RPC related exception occurred.")
 
@@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
                 "this endpoint.")
 
 
+class UnsupportedRpcEnvelopeVersion(RPCException):
+    message = _("Specified RPC envelope version, %(version)s, "
+                "not supported by this endpoint.")
+
+
 class Connection(object):
     """A connection, returned by rpc.create_connection().
 
@@ -164,8 +212,12 @@ class Connection(object):
 
 def _safe_log(log_func, msg, msg_data):
     """Sanitizes the msg_data field before logging."""
-    SANITIZE = {'set_admin_password': ('new_pass',),
-                'run_instance': ('admin_password',), }
+    SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+                'run_instance': [('args', 'admin_password')],
+                'route_message': [('args', 'message', 'args', 'method_info',
+                                   'method_kwargs', 'password'),
+                                  ('args', 'message', 'args', 'method_info',
+                                   'method_kwargs', 'admin_password')]}
 
     has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
     has_context_token = '_context_auth_token' in msg_data
@@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
     msg_data = copy.deepcopy(msg_data)
 
     if has_method:
-        method = msg_data['method']
-        if method in SANITIZE:
-            args_to_sanitize = SANITIZE[method]
-            for arg in args_to_sanitize:
-                try:
-                    msg_data['args'][arg] = "<SANITIZED>"
-                except KeyError:
-                    pass
+        for arg in SANITIZE.get(msg_data['method'], []):
+            try:
+                d = msg_data
+                for elem in arg[:-1]:
+                    d = d[elem]
+                d[arg[-1]] = '<SANITIZED>'
+            except KeyError, e:
+                LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+                         {'item': arg,
+                          'err': e})
 
     if has_context_token:
         msg_data['_context_auth_token'] = '<SANITIZED>'
@@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
     return log_func(msg, msg_data)
 
 
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
     """Prepares exception data to be sent over rpc.
 
     Failure_info should be a sys.exc_info() tuple.
@@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
     """
     tb = traceback.format_exception(*failure_info)
     failure = failure_info[1]
-    LOG.error(_("Returning exception %s to caller"), unicode(failure))
-    LOG.error(tb)
+    if log_failure:
+        LOG.error(_("Returning exception %s to caller"), unicode(failure))
+        LOG.error(tb)
 
     kwargs = {}
     if hasattr(failure, 'kwargs'):
@@ -309,3 +364,107 @@ class CommonRpcContext(object):
             context.values['read_deleted'] = read_deleted
 
         return context
+
+
+class ClientException(Exception):
+    """This encapsulates some actual exception that is expected to be
+    hit by an RPC proxy object. Merely instantiating it records the
+    current exception information, which will be passed back to the
+    RPC client without exceptional logging."""
+    def __init__(self):
+        self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+    try:
+        return func(*args, **kwargs)
+    except Exception, e:
+        if type(e) in exceptions:
+            raise ClientException()
+        else:
+            raise
+
+
+def client_exceptions(*exceptions):
+    """Decorator for manager methods that raise expected exceptions.
+    Marking a Manager method with this decorator allows the declaration
+    of expected exceptions that the RPC layer should not consider fatal,
+    and not log as if they were generated in a real error scenario. Note
+    that this will cause listed exceptions to be wrapped in a
+    ClientException, which is used internally by the RPC layer."""
+    def outer(func):
+        def inner(*args, **kwargs):
+            return catch_client_exception(exceptions, func, *args, **kwargs)
+        return inner
+    return outer
+
+
+def version_is_compatible(imp_version, version):
+    """Determine whether versions are compatible.
+
+    :param imp_version: The version implemented
+    :param version: The version requested by an incoming message.
+    """
+    version_parts = version.split('.')
+    imp_version_parts = imp_version.split('.')
+    if int(version_parts[0]) != int(imp_version_parts[0]):  # Major
+        return False
+    if int(version_parts[1]) > int(imp_version_parts[1]):  # Minor
+        return False
+    return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+    if not _SEND_RPC_ENVELOPE and not force_envelope:
+        return raw_msg
+
+    # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+    # information about this format.
+    msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+           _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+    return msg
+
+
+def deserialize_msg(msg):
+    # NOTE(russellb): Hang on to your hats, this road is about to
+    # get a little bumpy.
+    #
+    # Robustness Principle:
+    #    "Be strict in what you send, liberal in what you accept."
+    #
+    # At this point we have to do a bit of guessing about what it
+    # is we just received.  Here is the set of possibilities:
+    #
+    # 1) We received a dict.  This could be 2 things:
+    #
+    #   a) Inspect it to see if it looks like a standard message envelope.
+    #      If so, great!
+    #
+    #   b) If it doesn't look like a standard message envelope, it could either
+    #      be a notification, or a message from before we added a message
+    #      envelope (referred to as version 1.0).
+    #      Just return the message as-is.
+    #
+    # 2) It's any other non-dict type.  Just return it and hope for the best.
+    #    This case covers return values from rpc.call() from before message
+    #    envelopes were used.  (messages to call a method were always a dict)
+
+    if not isinstance(msg, dict):
+        # See #2 above.
+        return msg
+
+    base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+    if not all(map(lambda key: key in msg, base_envelope_keys)):
+        #  See #1.b above.
+        return msg
+
+    # At this point we think we have the message envelope
+    # format we were expecting. (#1.a above)
+
+    if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+        raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+    raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+    return raw_msg
index 72d2ae079999714c4a77a81d72a256c73a7fe720..52403b4371f264ffa7e35c22276d499d0bb0f7d8 100644 (file)
@@ -103,21 +103,6 @@ class RpcDispatcher(object):
         self.callbacks = callbacks
         super(RpcDispatcher, self).__init__()
 
-    @staticmethod
-    def _is_compatible(mversion, version):
-        """Determine whether versions are compatible.
-
-        :param mversion: The API version implemented by a callback.
-        :param version: The API version requested by an incoming message.
-        """
-        version_parts = version.split('.')
-        mversion_parts = mversion.split('.')
-        if int(version_parts[0]) != int(mversion_parts[0]):  # Major
-            return False
-        if int(version_parts[1]) > int(mversion_parts[1]):  # Minor
-            return False
-        return True
-
     def dispatch(self, ctxt, version, method, **kwargs):
         """Dispatch a message based on a requested version.
 
@@ -139,7 +124,8 @@ class RpcDispatcher(object):
                 rpc_api_version = proxyobj.RPC_API_VERSION
             else:
                 rpc_api_version = '1.0'
-            is_compatible = self._is_compatible(rpc_api_version, version)
+            is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+                                                             version)
             had_compatible = had_compatible or is_compatible
             if not hasattr(proxyobj, method):
                 continue
index d56665b492b8006292a5dacc42c669f870229f13..6c2e13098f7aacf0de3a19d1001bf4a1918f77ce 100644 (file)
@@ -79,6 +79,8 @@ class Consumer(object):
                     else:
                         res.append(rval)
                 done.send(res)
+            except rpc_common.ClientException as e:
+                done.send_exception(e._exc_info[1])
             except Exception as e:
                 done.send_exception(e)
 
@@ -165,7 +167,7 @@ def cast(conf, context, topic, msg):
         pass
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     check_serialize(msg)
 
 
index 8c0e5464304668d84e136c699dcab2002b909c6c..7865bf7703038d454ab6e67a12b8a761cc72f9c9 100644 (file)
@@ -162,7 +162,8 @@ class ConsumerBase(object):
         def _callback(raw_message):
             message = self.channel.message_to_python(raw_message)
             try:
-                callback(message.payload)
+                msg = rpc_common.deserialize_msg(message.payload)
+                callback(msg)
                 message.ack()
             except Exception:
                 LOG.exception(_("Failed to process message... skipping it."))
@@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
         # Default options
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         exchange = kombu.entity.Exchange(name=msg_id,
                                          type='direct',
@@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
         options = {'durable': False,
                    'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
                                          durable=options['durable'],
@@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
 
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
                                               type='direct', **options)
@@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
         """
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
                                               None, type='fanout', **options)
@@ -387,6 +388,7 @@ class Connection(object):
     def __init__(self, conf, server_params=None):
         self.consumers = []
         self.consumer_thread = None
+        self.proxy_callbacks = []
         self.conf = conf
         self.max_retries = self.conf.rabbit_max_retries
         # Try forever?
@@ -469,7 +471,7 @@ class Connection(object):
             LOG.info(_("Reconnecting to AMQP server on "
                      "%(hostname)s:%(port)d") % params)
             try:
-                self.connection.close()
+                self.connection.release()
             except self.connection_errors:
                 pass
             # Setting this in case the next statement fails, though
@@ -573,12 +575,14 @@ class Connection(object):
     def close(self):
         """Close/release this connection"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.connection.release()
         self.connection = None
 
     def reset(self):
         """Reset a connection so it can be used again"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.channel.close()
         self.channel = self.connection.channel()
         # work around 'memory' transport bug in 1.1.3
@@ -644,6 +648,11 @@ class Connection(object):
                 pass
             self.consumer_thread = None
 
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
     def publisher_send(self, cls, topic, msg, **kwargs):
         """Send to a publisher based on the publisher class"""
 
@@ -719,6 +728,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         if fanout:
             self.declare_fanout_consumer(topic, proxy_cb)
@@ -730,6 +740,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
         self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
 
@@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
         rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     """Sends a notification event on a topic."""
     return rpc_amqp.notify(
         conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
+        rpc_amqp.get_connection_pool(conf, Connection),
+        envelope)
 
 
 def cleanup():
index 20715d3288560aab1c39e02c78bd3ae19a99b496..9c614f7783abb7ce60bd967ccff367a7e22ab367 100644 (file)
@@ -22,16 +22,18 @@ import uuid
 
 import eventlet
 import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
 
 from heat.openstack.common import cfg
 from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
 from heat.openstack.common import log as logging
 from heat.openstack.common.rpc import amqp as rpc_amqp
 from heat.openstack.common.rpc import common as rpc_common
 
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
 LOG = logging.getLogger(__name__)
 
 qpid_opts = [
@@ -124,7 +126,8 @@ class ConsumerBase(object):
         """Fetch the message and pass it to the callback object"""
         message = self.receiver.fetch()
         try:
-            self.callback(message.content)
+            msg = rpc_common.deserialize_msg(message.content)
+            self.callback(msg)
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
@@ -274,11 +277,22 @@ class Connection(object):
     pool = None
 
     def __init__(self, conf, server_params=None):
+        if not qpid_messaging:
+            raise ImportError("Failed to import qpid.messaging")
+
         self.session = None
         self.consumers = {}
         self.consumer_thread = None
+        self.proxy_callbacks = []
         self.conf = conf
 
+        if server_params and 'hostname' in server_params:
+            # NOTE(russellb) This enables support for cast_to_server.
+            server_params['qpid_hosts'] = [
+                '%s:%d' % (server_params['hostname'],
+                           server_params.get('port', 5672))
+            ]
+
         params = {
             'qpid_hosts': self.conf.qpid_hosts,
             'username': self.conf.qpid_username,
@@ -294,7 +308,7 @@ class Connection(object):
 
     def connection_create(self, broker):
         # Create the connection - this does not open the connection
-        self.connection = qpid.messaging.Connection(broker)
+        self.connection = qpid_messaging.Connection(broker)
 
         # Check if flags are set and if so set them for the connection
         # before we call open
@@ -319,7 +333,7 @@ class Connection(object):
         if self.connection.opened():
             try:
                 self.connection.close()
-            except qpid.messaging.exceptions.ConnectionError:
+            except qpid_exceptions.ConnectionError:
                 pass
 
         attempt = 0
@@ -331,7 +345,7 @@ class Connection(object):
             try:
                 self.connection_create(broker)
                 self.connection.open()
-            except qpid.messaging.exceptions.ConnectionError, e:
+            except qpid_exceptions.ConnectionError, e:
                 msg_dict = dict(e=e, delay=delay)
                 msg = _("Unable to connect to AMQP server: %(e)s. "
                         "Sleeping %(delay)s seconds") % msg_dict
@@ -358,8 +372,8 @@ class Connection(object):
         while True:
             try:
                 return method(*args, **kwargs)
-            except (qpid.messaging.exceptions.Empty,
-                    qpid.messaging.exceptions.ConnectionError), e:
+            except (qpid_exceptions.Empty,
+                    qpid_exceptions.ConnectionError), e:
                 if error_callback:
                     error_callback(e)
                 self.reconnect()
@@ -367,12 +381,14 @@ class Connection(object):
     def close(self):
         """Close/release this connection"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.connection.close()
         self.connection = None
 
     def reset(self):
         """Reset a connection so it can be used again"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.session.close()
         self.session = self.connection.session()
         self.consumers = {}
@@ -397,7 +413,7 @@ class Connection(object):
         """Return an iterator that will consume from all queues/consumers"""
 
         def _error_callback(exc):
-            if isinstance(exc, qpid.messaging.exceptions.Empty):
+            if isinstance(exc, qpid_exceptions.Empty):
                 LOG.exception(_('Timed out waiting for RPC response: %s') %
                               str(exc))
                 raise rpc_common.Timeout()
@@ -427,6 +443,11 @@ class Connection(object):
                 pass
             self.consumer_thread = None
 
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
     def publisher_send(self, cls, topic, msg):
         """Send to a publisher based on the publisher class"""
 
@@ -502,6 +523,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         if fanout:
             consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -517,6 +539,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
                                  name=pool_name)
@@ -575,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
         rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     """Sends a notification event on a topic."""
     return rpc_amqp.notify(conf, context, topic, msg,
-                           rpc_amqp.get_connection_pool(conf, Connection))
+                           rpc_amqp.get_connection_pool(conf, Connection),
+                           envelope)
 
 
 def cleanup():
index a4320e80cc99f08bd052b5a9803a4dec862b07ac..b2c16bfcf237b792b04d045962430c95f1d363a6 100644 (file)
@@ -14,6 +14,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import os
 import pprint
 import socket
 import string
@@ -22,15 +23,16 @@ import types
 import uuid
 
 import eventlet
-from eventlet.green import zmq
 import greenlet
 
 from heat.openstack.common import cfg
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
+from heat.openstack.common import processutils as utils
 from heat.openstack.common.rpc import common as rpc_common
 
+zmq = importutils.try_import('eventlet.green.zmq')
 
 # for convenience, are not modified.
 pformat = pprint.pformat
@@ -61,6 +63,10 @@ zmq_opts = [
     cfg.IntOpt('rpc_zmq_contexts', default=1,
                help='Number of ZeroMQ contexts, defaults to 1'),
 
+    cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+               help='Maximum number of ingress messages to locally buffer '
+                    'per topic. Default is unlimited.'),
+
     cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
                help='Directory for holding IPC sockets'),
 
@@ -70,9 +76,9 @@ zmq_opts = [
 ]
 
 
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
 ZMQ_CTX = None  # ZeroMQ Context, must be global.
 matchmaker = None  # memoized matchmaker object
 
@@ -107,7 +113,7 @@ class ZmqSocket(object):
     """
 
     def __init__(self, addr, zmq_type, bind=True, subscribe=None):
-        self.sock = ZMQ_CTX.socket(zmq_type)
+        self.sock = _get_ctxt().socket(zmq_type)
         self.addr = addr
         self.type = zmq_type
         self.subscriptions = []
@@ -181,11 +187,15 @@ class ZmqSocket(object):
                     pass
             self.subscriptions = []
 
-        # Linger -1 prevents lost/dropped messages
         try:
-            self.sock.close(linger=-1)
+            # Default is to linger
+            self.sock.close()
         except Exception:
-            pass
+            # While this is a bad thing to happen,
+            # it would be much worse if some of the code calling this
+            # were to fail. For now, lets log, and later evaluate
+            # if we can safely raise here.
+            LOG.error("ZeroMQ socket could not be closed.")
         self.sock = None
 
     def recv(self):
@@ -202,10 +212,14 @@ class ZmqSocket(object):
 class ZmqClient(object):
     """Client for ZMQ sockets."""
 
-    def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+    def __init__(self, addr, socket_type=None, bind=False):
+        if socket_type is None:
+            socket_type = zmq.PUSH
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
-    def cast(self, msg_id, topic, data):
+    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+        if serialize:
+            data = rpc_common.serialize_msg(data, force_envelope)
         self.outq.send([str(msg_id), str(topic), str('cast'),
                         _serialize(data)])
 
@@ -250,7 +264,7 @@ class InternalContext(object):
         """Process a curried message and cast the result to topic."""
         LOG.debug(_("Running func with context: %s"), ctx.to_dict())
         data.setdefault('version', None)
-        data.setdefault('args', [])
+        data.setdefault('args', {})
 
         try:
             result = proxy.dispatch(
@@ -259,7 +273,14 @@ class InternalContext(object):
         except greenlet.GreenletExit:
             # ignore these since they are just from shutdowns
             pass
+        except rpc_common.ClientException, e:
+            LOG.debug(_("Expected exception during message handling (%s)") %
+                      e._exc_info[1])
+            return {'exc':
+                    rpc_common.serialize_remote_exception(e._exc_info,
+                                                          log_failure=False)}
         except Exception:
+            LOG.error(_("Exception during message handling"))
             return {'exc':
                     rpc_common.serialize_remote_exception(sys.exc_info())}
 
@@ -314,7 +335,7 @@ class ConsumerBase(object):
             return
 
         data.setdefault('version', None)
-        data.setdefault('args', [])
+        data.setdefault('args', {})
         proxy.dispatch(ctx, data['version'],
                        data['method'], **data['args'])
 
@@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor):
         super(ZmqProxy, self).__init__(conf)
 
         self.topic_proxy = {}
-        ipc_dir = CONF.rpc_zmq_ipc_dir
-
-        self.topic_proxy['zmq_replies'] = \
-            ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
-                      zmq.PUB, bind=True)
-        self.sockets.append(self.topic_proxy['zmq_replies'])
 
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
@@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor):
             sock_type = zmq.PUB
         elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
-            inside = _deserialize(in_msg)
+            inside = rpc_common.deserialize_msg(_deserialize(in_msg))
             msg_id = inside[-1]['args']['msg_id']
             response = inside[-1]['args']['response']
             LOG.debug(_("->response->%s"), response)
@@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor):
             sock_type = zmq.PUSH
 
         if not topic in self.topic_proxy:
-            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
-                             sock_type, bind=True)
-            self.topic_proxy[topic] = outq
-            self.sockets.append(outq)
-            LOG.info(_("Created topic proxy: %s"), topic)
+            def publisher(waiter):
+                LOG.info(_("Creating proxy for topic: %s"), topic)
 
-            # It takes some time for a pub socket to open,
-            # before we can have any faith in doing a send() to it.
-            if sock_type == zmq.PUB:
-                eventlet.sleep(.5)
+                try:
+                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+                                         (ipc_dir, topic),
+                                         sock_type, bind=True)
+                except RPCException:
+                    waiter.send_exception(*sys.exc_info())
+                    return
+
+                self.topic_proxy[topic] = eventlet.queue.LightQueue(
+                    CONF.rpc_zmq_topic_backlog)
+                self.sockets.append(out_sock)
+
+                # It takes some time for a pub socket to open,
+                # before we can have any faith in doing a send() to it.
+                if sock_type == zmq.PUB:
+                    eventlet.sleep(.5)
+
+                waiter.send(True)
+
+                while(True):
+                    data = self.topic_proxy[topic].get()
+                    out_sock.send(data)
+                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+                              {'data': data})
+
+            wait_sock_creation = eventlet.event.Event()
+            eventlet.spawn(publisher, wait_sock_creation)
+
+            try:
+                wait_sock_creation.wait()
+            except RPCException:
+                LOG.error(_("Topic socket file creation failed."))
+                return
 
-        LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
-        self.topic_proxy[topic].send(data)
-        LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+        try:
+            self.topic_proxy[topic].put_nowait(data)
+            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+                      {'data': data})
+        except eventlet.queue.Full:
+            LOG.error(_("Local per-topic backlog buffer full for topic "
+                        "%(topic)s. Dropping message.") % {'topic': topic})
+
+    def consume_in_thread(self):
+        """Runs the ZmqProxy service"""
+        ipc_dir = CONF.rpc_zmq_ipc_dir
+        consume_in = "tcp://%s:%s" % \
+            (CONF.rpc_zmq_bind_address,
+             CONF.rpc_zmq_port)
+        consumption_proxy = InternalContext(None)
+
+        if not os.path.isdir(ipc_dir):
+            try:
+                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+                              ipc_dir, run_as_root=True)
+                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+            except utils.ProcessExecutionError:
+                LOG.error(_("Could not create IPC directory %s") %
+                          (ipc_dir, ))
+                raise
+
+        try:
+            self.register(consumption_proxy,
+                          consume_in,
+                          zmq.PULL,
+                          out_bind=True)
+        except zmq.ZMQError:
+            LOG.error(_("Could not create ZeroMQ receiver daemon. "
+                        "Socket may already be in use."))
+            raise
+
+        super(ZmqProxy, self).consume_in_thread()
 
 
 class ZmqReactor(ZmqBaseReactor):
@@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor):
 
         msg_id, topic, style, in_msg = data
 
-        ctx, request = _deserialize(in_msg)
+        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
         ctx = RpcContext.unmarshal(ctx)
 
         proxy = self.proxies[sock]
@@ -524,7 +600,8 @@ class Connection(rpc_common.Connection):
         self.reactor.consume_in_thread()
 
 
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+          force_envelope=False):
     timeout_cast = timeout or CONF.rpc_cast_timeout
     payload = [RpcContext.marshal(context), msg]
 
@@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
             conn = ZmqClient(addr)
 
             # assumes cast can't return an exception
-            conn.cast(msg_id, topic, payload)
+            conn.cast(msg_id, topic, payload, serialize, force_envelope)
         except zmq.ZMQError:
             raise RPCException("Cast failed. ZMQ Socket Exception")
         finally:
@@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
                 conn.close()
 
 
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+          serialize=True, force_envelope=False):
     # timeout_response is how long we wait for a response
     timeout = timeout or CONF.rpc_response_timeout
 
@@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
             )
 
             LOG.debug(_("Sending cast"))
-            _cast(addr, context, msg_id, topic, payload)
+            _cast(addr, context, msg_id, topic, payload,
+                  serialize=serialize, force_envelope=force_envelope)
 
             LOG.debug(_("Cast sent; Waiting reply"))
             # Blocks until receives reply
@@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     return responses[-1]
 
 
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+                force_envelope=False):
     """
     Wraps the sending of messages,
     dispatches to the matchmaker and sends
@@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
     conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
 
-    queues = matchmaker.queues(topic)
+    queues = _get_matchmaker().queues(topic)
     LOG.debug(_("Sending message(s) to: %s"), queues)
 
     # Don't stack if we have no matchmaker results
@@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None):
 
         if method.__name__ == '_cast':
             eventlet.spawn_n(method, _addr, context,
-                             _topic, _topic, msg, timeout)
+                             _topic, _topic, msg, timeout, serialize,
+                             force_envelope)
             return
-        return method(_addr, context, _topic, _topic, msg, timeout)
+        return method(_addr, context, _topic, _topic, msg, timeout,
+                      serialize, force_envelope)
 
 
 def create_connection(conf, new=True):
@@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs):
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
     topic.replace('.', '-')
+    kwargs['serialize'] = kwargs.pop('envelope')
+    kwargs['force_envelope'] = True
     cast(conf, context, topic, msg, **kwargs)
 
 
 def cleanup():
     """Clean up resources in use by implementation."""
     global ZMQ_CTX
+    if ZMQ_CTX:
+        ZMQ_CTX.term()
+    ZMQ_CTX = None
+
     global matchmaker
     matchmaker = None
-    ZMQ_CTX.term()
-    ZMQ_CTX = None
 
 
-def register_opts(conf):
-    """Registration of options for this driver."""
-    #NOTE(ewindisch): ZMQ_CTX and matchmaker
-    # are initialized here as this is as good
-    # an initialization method as any.
+def _get_ctxt():
+    if not zmq:
+        raise ImportError("Failed to import eventlet.green.zmq")
 
-    # We memoize through these globals
     global ZMQ_CTX
-    global matchmaker
-    global CONF
-
-    if not CONF:
-        conf.register_opts(zmq_opts)
-        CONF = conf
-    # Don't re-set, if this method is called twice.
     if not ZMQ_CTX:
-        ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+    return ZMQ_CTX
+
+
+def _get_matchmaker():
+    global matchmaker
     if not matchmaker:
         # rpc_zmq_matchmaker should be set to a 'module.Class'
-        mm_path = conf.rpc_zmq_matchmaker.split('.')
+        mm_path = CONF.rpc_zmq_matchmaker.split('.')
         mm_module = '.'.join(mm_path[:-1])
         mm_class = mm_path[-1]
 
@@ -713,6 +794,4 @@ def register_opts(conf):
         mm_impl = importutils.import_module(mm_module)
         mm_constructor = getattr(mm_impl, mm_class)
         matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+    return matchmaker
index 4b3ef90119dc099b60a8d617a6c9548674eb4191..0906725da2cd9801b76a4da211f58fdd36ff7a7c 100644 (file)
@@ -51,7 +51,7 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services = threadgroup.ThreadGroup('launcher')
+        self._services = threadgroup.ThreadGroup()
         eventlet_backdoor.initialize_if_enabled()
 
     @staticmethod
@@ -310,7 +310,7 @@ class Service(object):
     """Service object for binaries running on hosts."""
 
     def __init__(self, threads=1000):
-        self.tg = threadgroup.ThreadGroup('service', threads)
+        self.tg = threadgroup.ThreadGroup(threads)
 
     def start(self):
         pass
index e6f72f034ec06c176f1a82d691ed91f4c841c42d..7267a6a27fc24929d0b2e2eaec84305d7007bbc0 100644 (file)
@@ -1,6 +1,7 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
 # Copyright 2011 OpenStack LLC.
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
 # All Rights Reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -19,7 +20,7 @@
 Utilities with minimum-depends for use in setup.py
 """
 
-import datetime
+import email
 import os
 import re
 import subprocess
@@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
     if os.path.exists(mailmap):
         with open(mailmap, 'r') as fp:
             for l in fp:
-                l = l.strip()
-                if not l.startswith('#') and ' ' in l:
-                    canonical_email, alias = [x for x in l.split(' ')
-                                              if x.startswith('<')]
-                    mapping[alias] = canonical_email
+                try:
+                    canonical_email, alias = re.match(
+                        r'[^#]*?(<.+>).*(<.+>).*', l).groups()
+                except AttributeError:
+                    continue
+                mapping[alias] = canonical_email
     return mapping
 
 
@@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
     """Takes in a string and an email alias mapping and replaces all
        instances of the aliases in the string with their real email.
     """
-    for alias, email in mapping.iteritems():
-        changelog = changelog.replace(alias, email)
+    for alias, email_address in mapping.iteritems():
+        changelog = changelog.replace(alias, email_address)
     return changelog
 
 
@@ -106,16 +108,6 @@ def parse_dependency_links(requirements_files=['requirements.txt',
     return dependency_links
 
 
-def write_requirements():
-    venv = os.environ.get('VIRTUAL_ENV', None)
-    if venv is not None:
-        with open("requirements.txt", "w") as req_file:
-            output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
-                                      stdout=subprocess.PIPE)
-            requirements = output.communicate()[0].strip()
-            req_file.write(requirements)
-
-
 def _run_shell_command(cmd):
     if os.name == 'nt':
         output = subprocess.Popen(["cmd.exe", "/C", cmd],
@@ -131,57 +123,6 @@ def _run_shell_command(cmd):
     return out[0].strip()
 
 
-def _get_git_next_version_suffix(branch_name):
-    datestamp = datetime.datetime.now().strftime('%Y%m%d')
-    if branch_name == 'milestone-proposed':
-        revno_prefix = "r"
-    else:
-        revno_prefix = ""
-    _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
-    milestone_cmd = "git show meta/openstack/release:%s" % branch_name
-    milestonever = _run_shell_command(milestone_cmd)
-    if milestonever:
-        first_half = "%s~%s" % (milestonever, datestamp)
-    else:
-        first_half = datestamp
-
-    post_version = _get_git_post_version()
-    # post version should look like:
-    # 0.1.1.4.gcc9e28a
-    # where the bit after the last . is the short sha, and the bit between
-    # the last and second to last is the revno count
-    (revno, sha) = post_version.split(".")[-2:]
-    second_half = "%s%s.%s" % (revno_prefix, revno, sha)
-    return ".".join((first_half, second_half))
-
-
-def _get_git_current_tag():
-    return _run_shell_command("git tag --contains HEAD")
-
-
-def _get_git_tag_info():
-    return _run_shell_command("git describe --tags")
-
-
-def _get_git_post_version():
-    current_tag = _get_git_current_tag()
-    if current_tag is not None:
-        return current_tag
-    else:
-        tag_info = _get_git_tag_info()
-        if tag_info is None:
-            base_version = "0.0"
-            cmd = "git --no-pager log --oneline"
-            out = _run_shell_command(cmd)
-            revno = len(out.split("\n"))
-            sha = _run_shell_command("git describe --always")
-        else:
-            tag_infos = tag_info.split("-")
-            base_version = "-".join(tag_infos[:-2])
-            (revno, sha) = tag_infos[-2:]
-        return "%s.%s.%s" % (base_version, revno, sha)
-
-
 def write_git_changelog():
     """Write a changelog based on the git changelog."""
     new_changelog = 'ChangeLog'
@@ -227,26 +168,6 @@ _rst_template = """%(heading)s
 """
 
 
-def read_versioninfo(project):
-    """Read the versioninfo file. If it doesn't exist, we're in a github
-       zipball, and there's really no way to know what version we really
-       are, but that should be ok, because the utility of that should be
-       just about nil if this code path is in use in the first place."""
-    versioninfo_path = os.path.join(project, 'versioninfo')
-    if os.path.exists(versioninfo_path):
-        with open(versioninfo_path, 'r') as vinfo:
-            version = vinfo.read().strip()
-    else:
-        version = "0.0.0"
-    return version
-
-
-def write_versioninfo(project, version):
-    """Write a simple file containing the version of the package."""
-    with open(os.path.join(project, 'versioninfo'), 'w') as fil:
-        fil.write("%s\n" % version)
-
-
 def get_cmdclass():
     """Return dict of commands to run from setup.py."""
 
@@ -276,6 +197,9 @@ def get_cmdclass():
         from sphinx.setup_command import BuildDoc
 
         class LocalBuildDoc(BuildDoc):
+
+            builders = ['html', 'man']
+
             def generate_autoindex(self):
                 print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
                 modules = {}
@@ -311,56 +235,69 @@ def get_cmdclass():
                 if not os.getenv('SPHINX_DEBUG'):
                     self.generate_autoindex()
 
-                for builder in ['html', 'man']:
+                for builder in self.builders:
                     self.builder = builder
                     self.finalize_options()
                     self.project = self.distribution.get_name()
                     self.version = self.distribution.get_version()
                     self.release = self.distribution.get_version()
                     BuildDoc.run(self)
+
+        class LocalBuildLatex(LocalBuildDoc):
+            builders = ['latex']
+
         cmdclass['build_sphinx'] = LocalBuildDoc
+        cmdclass['build_sphinx_latex'] = LocalBuildLatex
     except ImportError:
         pass
 
     return cmdclass
 
 
-def get_git_branchname():
-    for branch in _run_shell_command("git branch --color=never").split("\n"):
-        if branch.startswith('*'):
-            _branch_name = branch.split()[1].strip()
-    if _branch_name == "(no":
-        _branch_name = "no-branch"
-    return _branch_name
-
-
-def get_pre_version(projectname, base_version):
-    """Return a version which is leading up to a version that will
-       be released in the future."""
-    if os.path.isdir('.git'):
-        current_tag = _get_git_current_tag()
-        if current_tag is not None:
-            version = current_tag
-        else:
-            branch_name = os.getenv('BRANCHNAME',
-                                    os.getenv('GERRIT_REFNAME',
-                                              get_git_branchname()))
-            version_suffix = _get_git_next_version_suffix(branch_name)
-            version = "%s~%s" % (base_version, version_suffix)
-        write_versioninfo(projectname, version)
-        return version
-    else:
-        version = read_versioninfo(projectname)
-    return version
-
-
-def get_post_version(projectname):
+def get_version_from_git():
     """Return a version which is equal to the tag that's on the current
     revision if there is one, or tag plus number of additional revisions
     if the current revision has no tag."""
 
     if os.path.isdir('.git'):
-        version = _get_git_post_version()
-        write_versioninfo(projectname, version)
+        return _run_shell_command(
+            "git describe --always").replace('-', '.')
+    return None
+
+
+def get_version_from_pkg_info(package_name):
+    """Get the version from PKG-INFO file if we can."""
+    try:
+        pkg_info_file = open('PKG-INFO', 'r')
+    except (IOError, OSError):
+        return None
+    try:
+        pkg_info = email.message_from_file(pkg_info_file)
+    except email.MessageError:
+        return None
+    # Check to make sure we're in our own dir
+    if pkg_info.get('Name', None) != package_name:
+        return None
+    return pkg_info.get('Version', None)
+
+
+def get_version(package_name):
+    """Get the version of the project. First, try getting it from PKG-INFO, if
+    it exists. If it does, that means we're in a distribution tarball or that
+    install has happened. Otherwise, if there is no PKG-INFO file, pull the
+    version from git.
+
+    We do not support setup.py version sanity in git archive tarballs, nor do
+    we support packagers directly sucking our git repo into theirs. We expect
+    that a source tarball be made from our git repo - or that if someone wants
+    to make a source tarball from a fork of our repo with additional tags in it
+    that they understand and desire the results of doing that.
+    """
+    version = get_version_from_pkg_info(package_name)
+    if version:
+        return version
+    version = get_version_from_git()
+    if version:
         return version
-    return read_versioninfo(projectname)
+    raise Exception("Versioning for this project requires either an sdist"
+                    " tarball, or access to an upstream git repository.")
index 618e3c706eec0d58058597be993b670da9b8b34d..fecefbc110d9ec58f2912527a504617615ddee96 100644 (file)
@@ -38,8 +38,7 @@ class Thread(object):
     :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
     it has done so it can be removed from the threads list.
     """
-    def __init__(self, name, thread, group):
-        self.name = name
+    def __init__(self, thread, group):
         self.thread = thread
         self.thread.link(_thread_done, group=group, thread=self)
 
@@ -57,8 +56,7 @@ class ThreadGroup(object):
       when need be).
     * provide an easy API to add timers.
     """
-    def __init__(self, name, thread_pool_size=10):
-        self.name = name
+    def __init__(self, thread_pool_size=10):
         self.pool = greenpool.GreenPool(thread_pool_size)
         self.threads = []
         self.timers = []
@@ -72,7 +70,7 @@ class ThreadGroup(object):
 
     def add_thread(self, callback, *args, **kwargs):
         gt = self.pool.spawn(callback, *args, **kwargs)
-        th = Thread(callback.__name__, gt, self)
+        th = Thread(gt, self)
         self.threads.append(th)
 
     def thread_done(self, thread):
index ea69164284a846935a5ceee13f37374a73edaabe..0f346087f78dbeae8f329efbfc9731c155f2d8ad 100644 (file)
@@ -71,11 +71,15 @@ def normalize_time(timestamp):
 
 def is_older_than(before, seconds):
     """Return True if before is older than seconds."""
+    if isinstance(before, basestring):
+        before = parse_strtime(before).replace(tzinfo=None)
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
 def is_newer_than(after, seconds):
     """Return True if after is newer than seconds."""
+    if isinstance(after, basestring):
+        after = parse_strtime(after).replace(tzinfo=None)
     return after - utcnow() > datetime.timedelta(seconds=seconds)
 
 
index dae88e37b5d08f6a42d1541db5ec1e02d97b5a4f..8f7e1a900253671f6040e12aaf1d78153233c815 100644 (file)
@@ -1,6 +1,6 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
 #    Copyright 2012 OpenStack LLC
+#    Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
 #    under the License.
 
 """
-Utilities for consuming the auto-generated versioninfo files.
+Utilities for consuming the version from pkg_resources.
 """
 
-import datetime
 import pkg_resources
 
-import setup
-
-
-class _deferred_version_string(str):
-    """Internal helper class which provides delayed version calculation."""
-
-    def __new__(cls, version_info, prefix):
-        new_obj = str.__new__(cls, "")
-        new_obj._version_info = version_info
-        new_obj._prefix = prefix
-        new_obj._cached_version = None
-        return new_obj
-
-    def _get_cached_version(self):
-        if not self._cached_version:
-            self._cached_version = \
-                "%s%s" % (self._prefix,
-                self._version_info.version_string())
-        return self._cached_version
-
-    def __len__(self):
-        return self._get_cached_version().__len__()
-
-    def __contains__(self, item):
-        return self._get_cached_version().__contains__(item)
-
-    def __getslice__(self, i, j):
-        return self._get_cached_version().__getslice__(i, j)
-
-    def __str__(self):
-        return self._get_cached_version()
-
-    def __repr__(self):
-        return self._get_cached_version()
-
 
 class VersionInfo(object):
 
-    def __init__(self, package, python_package=None, pre_version=None):
+    def __init__(self, package):
         """Object that understands versioning for a package
-        :param package: name of the top level python namespace. For glance,
-                        this would be "glance" for python-glanceclient, it
-                        would be "glanceclient"
-        :param python_package: optional name of the project name. For
-                               glance this can be left unset. For
-                               python-glanceclient, this would be
-                               "python-glanceclient"
-        :param pre_version: optional version that the project is working to
+        :param package: name of the python package, such as glance, or
+                        python-glanceclient
         """
         self.package = package
-        if python_package is None:
-            self.python_package = package
-        else:
-            self.python_package = python_package
-        self.pre_version = pre_version
         self.version = None
+        self._cached_version = None
 
-    def _generate_version(self):
-        """Defer to the openstack.common.setup routines for making a
-        version from git."""
-        if self.pre_version is None:
-            return setup.get_post_version(self.python_package)
-        else:
-            return setup.get_pre_version(self.python_package, self.pre_version)
-
-    def _newer_version(self, pending_version):
-        """Check to see if we're working with a stale version or not.
-        We expect a version string that either looks like:
-          2012.2~f3~20120708.10.4426392
-        which is an unreleased version of a pre-version, or:
-          0.1.1.4.gcc9e28a
-        which is an unreleased version of a post-version, or:
-          0.1.1
-        Which is a release and which should match tag.
-        For now, if we have a date-embedded version, check to see if it's
-        old, and if so re-generate. Otherwise, just deal with it.
-        """
-        try:
-            version_date = int(self.version.split("~")[-1].split('.')[0])
-            if version_date < int(datetime.date.today().strftime('%Y%m%d')):
-                return self._generate_version()
-            else:
-                return pending_version
-        except Exception:
-            return pending_version
+    def _get_version_from_pkg_resources(self):
+        """Get the version of the package from the pkg_resources record
+        associated with the package."""
+        requirement = pkg_resources.Requirement.parse(self.package)
+        provider = pkg_resources.get_provider(requirement)
+        return provider.version
 
-    def version_string_with_vcs(self, always=False):
+    def version_string(self):
         """Return the full version of the package including suffixes indicating
         VCS status.
-
-        For instance, if we are working towards the 2012.2 release,
-        canonical_version_string should return 2012.2 if this is a final
-        release, or else something like 2012.2~f1~20120705.20 if it's not.
-
-        :param always: if true, skip all version caching
         """
-        if always:
-            self.version = self._generate_version()
-
         if self.version is None:
-
-            requirement = pkg_resources.Requirement.parse(self.python_package)
-            versioninfo = "%s/versioninfo" % self.package
-            try:
-                raw_version = pkg_resources.resource_string(requirement,
-                                                            versioninfo)
-                self.version = self._newer_version(raw_version.strip())
-            except (IOError, pkg_resources.DistributionNotFound):
-                self.version = self._generate_version()
+            self.version = self._get_version_from_pkg_resources()
 
         return self.version
 
-    def canonical_version_string(self, always=False):
-        """Return the simple version of the package excluding any suffixes.
-
-        For instance, if we are working towards the 2012.2 release,
-        canonical_version_string should return 2012.2 in all cases.
-
-        :param always: if true, skip all version caching
-        """
-        return self.version_string_with_vcs(always).split('~')[0]
+    # Compatibility functions
+    canonical_version_string = version_string
+    version_string_with_vcs = version_string
 
-    def version_string(self, always=False):
-        """Return the base version of the package.
-
-        For instance, if we are working towards the 2012.2 release,
-        version_string should return 2012.2 if this is a final release, or
-        2012.2-dev if it is not.
-
-        :param always: if true, skip all version caching
-        """
-        version_parts = self.version_string_with_vcs(always).split('~')
-        if len(version_parts) == 1:
-            return version_parts[0]
-        else:
-            return '%s-dev' % (version_parts[0],)
-
-    def deferred_version_string(self, prefix=""):
+    def cached_version_string(self, prefix=""):
         """Generate an object which will expand in a string context to
         the results of version_string(). We do this so that don't
         call into pkg_resources every time we start up a program when
         passing version information into the CONF constructor, but
         rather only do the calculation when and if a version is requested
         """
-        return _deferred_version_string(self, prefix)
+        if not self._cached_version:
+            self._cached_version = "%s%s" % (prefix,
+                                             self.version_string())
+        return self._cached_version
index 7d1b7601ac645c3bedcf4c39581feb2bfb4b33c1..5a7c7ba335698c6a455f9af95b4b2500c13c7161 100644 (file)
@@ -172,8 +172,7 @@ class stackServiceCreateUpdateDeleteTest(unittest.TestCase):
         stack.validate().AndReturn(None)
 
         self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
-        name_match = mox.StrContains(stack.name)
-        threadgroup.ThreadGroup(name_match).AndReturn(DummyThreadGroup())
+        threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
 
         self.m.ReplayAll()
 
index cfb415bea3a575aed5aec928b77d756350782f46..fd7525444e9e823f93ef0e9d9f0f0c0c7c4c3e93 100644 (file)
@@ -16,5 +16,4 @@
 
 from heat.openstack.common import version as common_version
 
-NEXT_VERSION = '2013.1'
-version_info = common_version.VersionInfo('heat', pre_version=NEXT_VERSION)
+version_info = common_version.VersionInfo('heat')
index eb0066fb3f7953e555769d82a3919b5951e723e1..05ccad1edb5fb53e1f5a74819d38b076c14e9fbf 100755 (executable)
--- a/setup.py
+++ b/setup.py
 import setuptools
 
 from heat.openstack.common import setup
-from heat.version import version_info as version
 
 requires = setup.parse_requirements()
+project = 'heat'
+
 
 setuptools.setup(
-    name='heat',
-    version=version.canonical_version_string(always=True),
+    name=project,
+    version=setup.get_version(project),
     description='The heat project provides services for provisioning '
                 'virtual machines',
     license='Apache License (2.0)',