]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Update openstack-common
authorAngus Salkeld <asalkeld@redhat.com>
Thu, 1 Nov 2012 03:00:41 +0000 (14:00 +1100)
committerAngus Salkeld <asalkeld@redhat.com>
Thu, 1 Nov 2012 10:09:01 +0000 (21:09 +1100)
Change-Id: I3abd011729f413dbd20806f54c9ae6963641d59a

heat/openstack/common/cfg.py
heat/openstack/common/gettextutils.py
heat/openstack/common/log.py
heat/openstack/common/notifier/api.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/rpc/service.py [new file with mode: 0644]
heat/openstack/common/setup.py
heat/openstack/common/timeutils.py
heat/openstack/common/utils.py

index 53ed9ce2128b8d89defe085e314863ee9522b0c7..2f19677697e0c59ddba56a534f5369dd7707bd68 100644 (file)
@@ -236,10 +236,10 @@ log files:
 This module also contains a global instance of the CommonConfigOpts class
 in order to support a common usage pattern in OpenStack:
 
-  from openstack.common import cfg
+  from heat.openstack.common import cfg
 
   opts = [
-    cfg.StrOpt('bind_host' default='0.0.0.0'),
+    cfg.StrOpt('bind_host', default='0.0.0.0'),
     cfg.IntOpt('bind_port', default=9292),
   ]
 
@@ -1507,7 +1507,7 @@ class ConfigOpts(collections.Mapping):
                 if ('default' in info or 'override' in info):
                     continue
 
-                if self._get(opt.name, group) is None:
+                if self._get(opt.dest, group) is None:
                     raise RequiredOptError(opt.name, group)
 
     def _parse_cli_opts(self, args):
index 235350cc498b9abd3cbe3a5a75ca974fc283a881..681b5a88a8ffd71671e455469d159958111f1db0 100644 (file)
@@ -20,7 +20,7 @@ gettext for openstack-common modules.
 
 Usual usage in an openstack.common module:
 
-    from openstack.common.gettextutils import _
+    from heat.openstack.common.gettextutils import _
 """
 
 import gettext
index 41e3781ce8b62a667fb2cd56bb3aaeb7042f7432..723ab54ddaeba36cfd3db074686b756389ec86c1 100644 (file)
@@ -76,6 +76,9 @@ log_opts = [
     cfg.BoolOpt('publish_errors',
                 default=False,
                 help='publish error events'),
+    cfg.BoolOpt('fatal_deprecations',
+                default=False,
+                help='make deprecations fatal'),
 
     # NOTE(mikal): there are two options here because sometimes we are handed
     # a full instance (and could include more information), and other times we
@@ -170,6 +173,14 @@ class ContextAdapter(logging.LoggerAdapter):
     def audit(self, msg, *args, **kwargs):
         self.log(logging.AUDIT, msg, *args, **kwargs)
 
+    def deprecated(self, msg, *args, **kwargs):
+        stdmsg = _("Deprecated Config: %s") % msg
+        if CONF.fatal_deprecations:
+            self.critical(stdmsg, *args, **kwargs)
+            raise DeprecatedConfig(msg=stdmsg)
+        else:
+            self.warn(stdmsg, *args, **kwargs)
+
     def process(self, msg, kwargs):
         if 'extra' not in kwargs:
             kwargs['extra'] = {}
@@ -450,3 +461,10 @@ class ColorHandler(logging.StreamHandler):
     def format(self, record):
         record.color = self.LEVEL_COLORS[record.levelno]
         return logging.StreamHandler.format(self, record)
+
+
+class DeprecatedConfig(Exception):
+    message = _("Fatal call to deprecated config: %(msg)s")
+
+    def __init__(self, msg):
+        super(Exception, self).__init__(self.message % dict(msg=msg))
index e6383f610ecf5e6044b1fcc1574465ae4ad92306..d1ea0f2f087494f9b7829df56e2148f13430ae0f 100644 (file)
@@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload):
             driver.notify(context, msg)
         except Exception, e:
             LOG.exception(_("Problem '%(e)s' attempting to "
-              "send to notification system. Payload=%(payload)s") %
-                            locals())
+                            "send to notification system. "
+                            "Payload=%(payload)s") % locals())
 
 
 _drivers = None
@@ -169,7 +169,7 @@ def add_driver(notification_driver):
         except ImportError as e:
             LOG.exception(_("Failed to load notifier %s. "
                             "These notifications will not be sent.") %
-                            notification_driver)
+                          notification_driver)
     else:
         # Driver is already loaded; just add the object.
         _drivers[notification_driver] = notification_driver
index b5209a407f57a696928c85bda24adba488aaaa98..41fec9a2a48a4e017ec36496a434b4e1c93fabb0 100644 (file)
@@ -31,6 +31,7 @@ import kombu.messaging
 
 from heat.openstack.common import cfg
 from heat.openstack.common.gettextutils import _
+from heat.openstack.common import network_utils
 from heat.openstack.common.rpc import amqp as rpc_amqp
 from heat.openstack.common.rpc import common as rpc_common
 
@@ -50,10 +51,13 @@ kombu_opts = [
                      '(valid only if SSL enabled)')),
     cfg.StrOpt('rabbit_host',
                default='localhost',
-               help='the RabbitMQ host'),
+               help='The RabbitMQ broker address where a single node is used'),
     cfg.IntOpt('rabbit_port',
                default=5672,
-               help='the RabbitMQ port'),
+               help='The RabbitMQ broker port where a single node is used'),
+    cfg.ListOpt('rabbit_hosts',
+                default=['$rabbit_host:$rabbit_port'],
+                help='RabbitMQ HA cluster host:port pairs'),
     cfg.BoolOpt('rabbit_use_ssl',
                 default=False,
                 help='connect over SSL for RabbitMQ'),
@@ -80,6 +84,11 @@ kombu_opts = [
     cfg.BoolOpt('rabbit_durable_queues',
                 default=False,
                 help='use durable queues in RabbitMQ'),
+    cfg.BoolOpt('rabbit_ha_queues',
+                default=False,
+                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+                     'You need to wipe RabbitMQ database when '
+                     'changing this option.'),
 
 ]
 
@@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
 LOG = rpc_common.LOG
 
 
+def _get_queue_arguments(conf):
+    """Construct the arguments for declaring a queue.
+
+    If the rabbit_ha_queues option is set, we declare a mirrored queue
+    as described here:
+
+      http://www.rabbitmq.com/ha.html
+
+    Setting x-ha-policy to all means that the queue will be mirrored
+    to all nodes in the cluster.
+    """
+    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
 class ConsumerBase(object):
     """Consumer base class."""
 
@@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
     def __init__(self, conf, channel, topic, callback, tag, name=None,
-                 **kwargs):
+                 exchange_name=None, **kwargs):
         """Init a 'topic' queue.
 
         :param channel: the amqp channel to use
@@ -207,13 +230,15 @@ class TopicConsumer(ConsumerBase):
         """
         # Default options
         options = {'durable': conf.rabbit_durable_queues,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(
-                name=rpc_amqp.get_control_exchange(conf),
-                type='topic', durable=options['durable'],
-                auto_delete=options['auto_delete'])
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        exchange = kombu.entity.Exchange(name=exchange_name,
+                                         type='topic',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
         super(TopicConsumer, self).__init__(channel,
                                             callback,
                                             tag,
@@ -242,6 +267,7 @@ class FanoutConsumer(ConsumerBase):
 
         # Default options
         options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': True,
                    'exclusive': True}
         options.update(kwargs)
@@ -307,9 +333,12 @@ class TopicPublisher(Publisher):
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(TopicPublisher, self).__init__(channel,
-                rpc_amqp.get_control_exchange(conf), topic,
-                type='topic', **options)
+                                             exchange_name,
+                                             topic,
+                                             type='topic',
+                                             **options)
 
 
 class FanoutPublisher(Publisher):
@@ -332,6 +361,7 @@ class NotifyPublisher(TopicPublisher):
 
     def __init__(self, conf, channel, topic, **kwargs):
         self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        self.queue_arguments = _get_queue_arguments(conf)
         super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
     def reconnect(self, channel):
@@ -344,7 +374,8 @@ class NotifyPublisher(TopicPublisher):
                                    exchange=self.exchange,
                                    durable=self.durable,
                                    name=self.routing_key,
-                                   routing_key=self.routing_key)
+                                   routing_key=self.routing_key,
+                                   queue_arguments=self.queue_arguments)
         queue.declare()
 
 
@@ -369,31 +400,37 @@ class Connection(object):
 
         if server_params is None:
             server_params = {}
-
         # Keys to translate from server_params to kombu params
         server_params_to_kombu_params = {'username': 'userid'}
 
-        params = {}
-        for sp_key, value in server_params.iteritems():
-            p_key = server_params_to_kombu_params.get(sp_key, sp_key)
-            params[p_key] = value
+        ssl_params = self._fetch_ssl_params()
+        params_list = []
+        for adr in self.conf.rabbit_hosts:
+            hostname, port = network_utils.parse_host_port(
+                adr, default_port=self.conf.rabbit_port)
 
-        params.setdefault('hostname', self.conf.rabbit_host)
-        params.setdefault('port', self.conf.rabbit_port)
-        params.setdefault('userid', self.conf.rabbit_userid)
-        params.setdefault('password', self.conf.rabbit_password)
-        params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+            params = {}
 
-        self.params = params
+            for sp_key, value in server_params.iteritems():
+                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+                params[p_key] = value
 
-        if self.conf.fake_rabbit:
-            self.params['transport'] = 'memory'
-            self.memory_transport = True
-        else:
-            self.memory_transport = False
+            params.setdefault('hostname', hostname)
+            params.setdefault('port', port)
+            params.setdefault('userid', self.conf.rabbit_userid)
+            params.setdefault('password', self.conf.rabbit_password)
+            params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+            if self.conf.fake_rabbit:
+                params['transport'] = 'memory'
+            if self.conf.rabbit_use_ssl:
+                params['ssl'] = ssl_params
 
-        if self.conf.rabbit_use_ssl:
-            self.params['ssl'] = self._fetch_ssl_params()
+            params_list.append(params)
+
+        self.params_list = params_list
+
+        self.memory_transport = self.conf.fake_rabbit
 
         self.connection = None
         self.reconnect()
@@ -423,14 +460,14 @@ class Connection(object):
             # Return the extended behavior
             return ssl_params
 
-    def _connect(self):
+    def _connect(self, params):
         """Connect to rabbit.  Re-establish any queues that may have
         been declared before if we are reconnecting.  Exceptions should
         be handled by the caller.
         """
         if self.connection:
             LOG.info(_("Reconnecting to AMQP server on "
-                     "%(hostname)s:%(port)d") % self.params)
+                     "%(hostname)s:%(port)d") % params)
             try:
                 self.connection.close()
             except self.connection_errors:
@@ -438,7 +475,7 @@ class Connection(object):
             # Setting this in case the next statement fails, though
             # it shouldn't be doing any network operations, yet.
             self.connection = None
-        self.connection = kombu.connection.BrokerConnection(**self.params)
+        self.connection = kombu.connection.BrokerConnection(**params)
         self.connection_errors = self.connection.connection_errors
         if self.memory_transport:
             # Kludge to speed up tests.
@@ -451,8 +488,8 @@ class Connection(object):
             self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
             consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
-                 self.params)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+                 params)
 
     def reconnect(self):
         """Handles reconnecting and re-establishing queues.
@@ -465,11 +502,12 @@ class Connection(object):
 
         attempt = 0
         while True:
+            params = self.params_list[attempt % len(self.params_list)]
             attempt += 1
             try:
-                self._connect()
+                self._connect(params)
                 return
-            except (self.connection_errors, IOError), e:
+            except (IOError, self.connection_errors) as e:
                 pass
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
@@ -484,12 +522,12 @@ class Connection(object):
             log_info = {}
             log_info['err_str'] = str(e)
             log_info['max_retries'] = self.max_retries
-            log_info.update(self.params)
+            log_info.update(params)
 
             if self.max_retries and attempt == self.max_retries:
-                LOG.exception(_('Unable to connect to AMQP server on '
-                              '%(hostname)s:%(port)d after %(max_retries)d '
-                              'tries: %(err_str)s') % log_info)
+                LOG.error(_('Unable to connect to AMQP server on '
+                            '%(hostname)s:%(port)d after %(max_retries)d '
+                            'tries: %(err_str)s') % log_info)
                 # NOTE(comstud): Copied from original code.  There's
                 # really no better recourse because if this was a queue we
                 # need to consume on, we have no way to consume anymore.
@@ -503,9 +541,9 @@ class Connection(object):
                 sleep_time = min(sleep_time, self.interval_max)
 
             log_info['sleep_time'] = sleep_time
-            LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
-                          ' unreachable: %(err_str)s. Trying again in '
-                          '%(sleep_time)d seconds.') % log_info)
+            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                        'unreachable: %(err_str)s. Trying again in '
+                        '%(sleep_time)d seconds.') % log_info)
             time.sleep(sleep_time)
 
     def ensure(self, error_callback, method, *args, **kwargs):
@@ -513,7 +551,8 @@ class Connection(object):
             try:
                 return method(*args, **kwargs)
             except (self.connection_errors, socket.timeout, IOError), e:
-                pass
+                if error_callback:
+                    error_callback(e)
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
                 # to return an error not covered by its transport
@@ -523,8 +562,8 @@ class Connection(object):
                 # and try to reconnect in this case.
                 if 'timeout' not in str(e):
                     raise
-            if error_callback:
-                error_callback(e)
+                if error_callback:
+                    error_callback(e)
             self.reconnect()
 
     def get_channel(self):
@@ -626,10 +665,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
index 697f4d8c5d129f9f6d20a426108e7af0081b2c5a..b47f25b3fea3ab53b9d9308a0a79eb38c8201b68 100644 (file)
@@ -69,7 +69,7 @@ qpid_opts = [
                default=0,
                help='Equivalent to setting max and min to the same value'),
     cfg.IntOpt('qpid_heartbeat',
-               default=5,
+               default=60,
                help='Seconds between connection keepalive heartbeats'),
     cfg.StrOpt('qpid_protocol',
                default='tcp',
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
 class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
-    def __init__(self, conf, session, topic, callback, name=None):
+    def __init__(self, conf, session, topic, callback, name=None,
+                 exchange_name=None):
         """Init a 'topic' queue.
 
         :param session: the amqp session to use
@@ -180,9 +181,10 @@ class TopicConsumer(ConsumerBase):
         :param name: optional queue name, defaults to topic
         """
 
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
         super(TopicConsumer, self).__init__(session, callback,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
-                {}, name or topic, {})
+                                            "%s/%s" % (exchange_name, topic),
+                                            {}, name or topic, {})
 
 
 class FanoutConsumer(ConsumerBase):
@@ -255,8 +257,9 @@ class TopicPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(TopicPublisher, self).__init__(session,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
+                                             "%s/%s" % (exchange_name, topic))
 
 
 class FanoutPublisher(Publisher):
@@ -274,9 +277,10 @@ class NotifyPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(NotifyPublisher, self).__init__(session,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
-                {"durable": True})
+                                              "%s/%s" % (exchange_name, topic),
+                                              {"durable": True})
 
 
 class Connection(object):
@@ -461,10 +465,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
diff --git a/heat/openstack/common/rpc/service.py b/heat/openstack/common/rpc/service.py
new file mode 100644 (file)
index 0000000..44a9cdc
--- /dev/null
@@ -0,0 +1,70 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import rpc
+from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
+from heat.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+    """Service object for binaries running on hosts.
+
+    A service enables rpc by listening to queues based on topic and host."""
+    def __init__(self, host, topic, manager=None):
+        super(Service, self).__init__()
+        self.host = host
+        self.topic = topic
+        if manager is None:
+            self.manager = self
+        else:
+            self.manager = manager
+
+    def start(self):
+        super(Service, self).start()
+
+        self.conn = rpc.create_connection(new=True)
+        LOG.debug(_("Creating Consumer connection for Service %s") %
+                  self.topic)
+
+        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+
+        # Share this same connection for these Consumers
+        self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+
+        node_topic = '%s.%s' % (self.topic, self.host)
+        self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+
+        self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+
+    def stop(self):
+        # Try to shut the connection down, but if we get any sort of
+        # errors, go ahead and ignore them.. as we're shutting down anyway
+        try:
+            self.conn.close()
+        except Exception:
+            pass
+        super(Service, self).stop()
index 628f5e3c9b2f68e2b930982b470786dbdb4c6340..83eef07a7be54ea67c3975fa1d7f050de23dcf87 100644 (file)
@@ -31,13 +31,13 @@ from setuptools.command import sdist
 def parse_mailmap(mailmap='.mailmap'):
     mapping = {}
     if os.path.exists(mailmap):
-        fp = open(mailmap, 'r')
-        for l in fp:
-            l = l.strip()
-            if not l.startswith('#') and ' ' in l:
-                canonical_email, alias = [x for x in l.split(' ')
-                                          if x.startswith('<')]
-                mapping[alias] = canonical_email
+        with open(mailmap, 'r') as fp:
+            for l in fp:
+                l = l.strip()
+                if not l.startswith('#') and ' ' in l:
+                    canonical_email, alias = [x for x in l.split(' ')
+                                              if x.startswith('<')]
+                    mapping[alias] = canonical_email
     return mapping
 
 
@@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping):
 def get_reqs_from_files(requirements_files):
     for requirements_file in requirements_files:
         if os.path.exists(requirements_file):
-            return open(requirements_file, 'r').read().split('\n')
+            with open(requirements_file, 'r') as fil:
+                return fil.read().split('\n')
     return []
 
 
@@ -135,15 +136,17 @@ def _get_git_next_version_suffix(branch_name):
     _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
     milestone_cmd = "git show meta/openstack/release:%s" % branch_name
     milestonever = _run_shell_command(milestone_cmd)
-    if not milestonever:
-        milestonever = ""
+    if milestonever:
+        first_half = "%s~%s" % (milestonever, datestamp)
+    else:
+        first_half = datestamp
+
     post_version = _get_git_post_version()
     # post version should look like:
     # 0.1.1.4.gcc9e28a
     # where the bit after the last . is the short sha, and the bit between
     # the last and second to last is the revno count
     (revno, sha) = post_version.split(".")[-2:]
-    first_half = "%s~%s" % (milestonever, datestamp)
     second_half = "%s%s.%s" % (revno_prefix, revno, sha)
     return ".".join((first_half, second_half))
 
@@ -191,14 +194,14 @@ def write_git_changelog():
 
 def generate_authors():
     """Create AUTHORS file using git commits."""
-    jenkins_email = 'jenkins@review.openstack.org'
+    jenkins_email = 'jenkins@review.(openstack|stackforge).org'
     old_authors = 'AUTHORS.in'
     new_authors = 'AUTHORS'
     if not os.getenv('SKIP_GENERATE_AUTHORS'):
         if os.path.isdir('.git'):
             # don't include jenkins email address in AUTHORS file
             git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
-                           "grep -v " + jenkins_email)
+                           "egrep -v '" + jenkins_email + "'")
             changelog = _run_shell_command(git_log_cmd)
             mailmap = parse_mailmap()
             with open(new_authors, 'w') as new_authors_fh:
@@ -236,7 +239,8 @@ def read_versioninfo(project):
 
 def write_versioninfo(project, version):
     """Write a simple file containing the version of the package."""
-    open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version)
+    with open(os.path.join(project, 'versioninfo'), 'w') as fil:
+        fil.write("%s\n" % version)
 
 
 def get_cmdclass():
index c4f6cf0497a5573270f104e98061ff69587213e6..86004391de06a7144b6dce653344880bc93ccbf0 100644 (file)
@@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
 
 
 def normalize_time(timestamp):
-    """Normalize time in arbitrary timezone to UTC"""
+    """Normalize time in arbitrary timezone to UTC naive object"""
     offset = timestamp.utcoffset()
-    return timestamp.replace(tzinfo=None) - offset if offset else timestamp
+    if offset is None:
+        return timestamp
+    return timestamp.replace(tzinfo=None) - offset
 
 
 def is_older_than(before, seconds):
@@ -72,6 +74,11 @@ def is_older_than(before, seconds):
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
+def is_newer_than(after, seconds):
+    """Return True if after is newer than seconds."""
+    return after - utcnow() > datetime.timedelta(seconds=seconds)
+
+
 def utcnow_ts():
     """Timestamp version of our utcnow function."""
     return calendar.timegm(utcnow().timetuple())
@@ -121,6 +128,10 @@ def marshall_now(now=None):
 
 def unmarshall_time(tyme):
     """Unmarshall a datetime dict."""
-    return datetime.datetime(day=tyme['day'], month=tyme['month'],
-                 year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
-                 second=tyme['second'], microsecond=tyme['microsecond'])
+    return datetime.datetime(day=tyme['day'],
+                             month=tyme['month'],
+                             year=tyme['year'],
+                             hour=tyme['hour'],
+                             minute=tyme['minute'],
+                             second=tyme['second'],
+                             microsecond=tyme['microsecond'])
index e1d847ccb7af7f2122ae61050b01554e8a60d312..ce6916811c882a4e6ca7549610f433b9e6a3bf9e 100644 (file)
@@ -23,8 +23,8 @@ import logging
 import random
 import shlex
 
-from eventlet import greenthread
 from eventlet.green import subprocess
+from eventlet import greenthread
 
 from heat.openstack.common import exception
 from heat.openstack.common.gettextutils import _
@@ -64,50 +64,6 @@ def bool_from_string(subject):
     return False
 
 
-def parse_host_port(address, default_port=None):
-    """
-    Interpret a string as a host:port pair.
-    An IPv6 address MUST be escaped if accompanied by a port,
-    because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
-    means both [2001:db8:85a3::8a2e:370:7334] and
-    [2001:db8:85a3::8a2e:370]:7334.
-
-    >>> parse_host_port('server01:80')
-    ('server01', 80)
-    >>> parse_host_port('server01')
-    ('server01', None)
-    >>> parse_host_port('server01', default_port=1234)
-    ('server01', 1234)
-    >>> parse_host_port('[::1]:80')
-    ('::1', 80)
-    >>> parse_host_port('[::1]')
-    ('::1', None)
-    >>> parse_host_port('[::1]', default_port=1234)
-    ('::1', 1234)
-    >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
-    ('2001:db8:85a3::8a2e:370:7334', 1234)
-
-    """
-    if address[0] == '[':
-        # Escaped ipv6
-        _host, _port = address[1:].split(']')
-        host = _host
-        if ':' in _port:
-            port = _port.split(':')[1]
-        else:
-            port = default_port
-    else:
-        if address.count(':') == 1:
-            host, port = address.split(':')
-        else:
-            # 0 means ipv4, >1 means ipv6.
-            # We prohibit unescaped ipv6 addresses with port.
-            host = address
-            port = default_port
-
-    return (host, None if port is None else int(port))
-
-
 def execute(*cmd, **kwargs):
     """
     Helper method to execute command with optional retry.