From c6a624f3ab73a5b1770e69d35713c92a82ecb54c Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Thu, 1 Nov 2012 14:00:41 +1100 Subject: [PATCH] Update openstack-common Change-Id: I3abd011729f413dbd20806f54c9ae6963641d59a --- heat/openstack/common/cfg.py | 6 +- heat/openstack/common/gettextutils.py | 2 +- heat/openstack/common/log.py | 18 ++++ heat/openstack/common/notifier/api.py | 6 +- heat/openstack/common/rpc/impl_kombu.py | 133 ++++++++++++++++-------- heat/openstack/common/rpc/impl_qpid.py | 22 ++-- heat/openstack/common/rpc/service.py | 70 +++++++++++++ heat/openstack/common/setup.py | 32 +++--- heat/openstack/common/timeutils.py | 21 +++- heat/openstack/common/utils.py | 46 +------- 10 files changed, 231 insertions(+), 125 deletions(-) create mode 100644 heat/openstack/common/rpc/service.py diff --git a/heat/openstack/common/cfg.py b/heat/openstack/common/cfg.py index 53ed9ce2..2f196776 100644 --- a/heat/openstack/common/cfg.py +++ b/heat/openstack/common/cfg.py @@ -236,10 +236,10 @@ log files: This module also contains a global instance of the CommonConfigOpts class in order to support a common usage pattern in OpenStack: - from openstack.common import cfg + from heat.openstack.common import cfg opts = [ - cfg.StrOpt('bind_host' default='0.0.0.0'), + cfg.StrOpt('bind_host', default='0.0.0.0'), cfg.IntOpt('bind_port', default=9292), ] @@ -1507,7 +1507,7 @@ class ConfigOpts(collections.Mapping): if ('default' in info or 'override' in info): continue - if self._get(opt.name, group) is None: + if self._get(opt.dest, group) is None: raise RequiredOptError(opt.name, group) def _parse_cli_opts(self, args): diff --git a/heat/openstack/common/gettextutils.py b/heat/openstack/common/gettextutils.py index 235350cc..681b5a88 100644 --- a/heat/openstack/common/gettextutils.py +++ b/heat/openstack/common/gettextutils.py @@ -20,7 +20,7 @@ gettext for openstack-common modules. Usual usage in an openstack.common module: - from openstack.common.gettextutils import _ + from heat.openstack.common.gettextutils import _ """ import gettext diff --git a/heat/openstack/common/log.py b/heat/openstack/common/log.py index 41e3781c..723ab54d 100644 --- a/heat/openstack/common/log.py +++ b/heat/openstack/common/log.py @@ -76,6 +76,9 @@ log_opts = [ cfg.BoolOpt('publish_errors', default=False, help='publish error events'), + cfg.BoolOpt('fatal_deprecations', + default=False, + help='make deprecations fatal'), # NOTE(mikal): there are two options here because sometimes we are handed # a full instance (and could include more information), and other times we @@ -170,6 +173,14 @@ class ContextAdapter(logging.LoggerAdapter): def audit(self, msg, *args, **kwargs): self.log(logging.AUDIT, msg, *args, **kwargs) + def deprecated(self, msg, *args, **kwargs): + stdmsg = _("Deprecated Config: %s") % msg + if CONF.fatal_deprecations: + self.critical(stdmsg, *args, **kwargs) + raise DeprecatedConfig(msg=stdmsg) + else: + self.warn(stdmsg, *args, **kwargs) + def process(self, msg, kwargs): if 'extra' not in kwargs: kwargs['extra'] = {} @@ -450,3 +461,10 @@ class ColorHandler(logging.StreamHandler): def format(self, record): record.color = self.LEVEL_COLORS[record.levelno] return logging.StreamHandler.format(self, record) + + +class DeprecatedConfig(Exception): + message = _("Fatal call to deprecated config: %(msg)s") + + def __init__(self, msg): + super(Exception, self).__init__(self.message % dict(msg=msg)) diff --git a/heat/openstack/common/notifier/api.py b/heat/openstack/common/notifier/api.py index e6383f61..d1ea0f2f 100644 --- a/heat/openstack/common/notifier/api.py +++ b/heat/openstack/common/notifier/api.py @@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload): driver.notify(context, msg) except Exception, e: LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system. Payload=%(payload)s") % - locals()) + "send to notification system. " + "Payload=%(payload)s") % locals()) _drivers = None @@ -169,7 +169,7 @@ def add_driver(notification_driver): except ImportError as e: LOG.exception(_("Failed to load notifier %s. " "These notifications will not be sent.") % - notification_driver) + notification_driver) else: # Driver is already loaded; just add the object. _drivers[notification_driver] = notification_driver diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py index b5209a40..41fec9a2 100644 --- a/heat/openstack/common/rpc/impl_kombu.py +++ b/heat/openstack/common/rpc/impl_kombu.py @@ -31,6 +31,7 @@ import kombu.messaging from heat.openstack.common import cfg from heat.openstack.common.gettextutils import _ +from heat.openstack.common import network_utils from heat.openstack.common.rpc import amqp as rpc_amqp from heat.openstack.common.rpc import common as rpc_common @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='The RabbitMQ broker address where a single node is used'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='The RabbitMQ broker port where a single node is used'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" def __init__(self, conf, channel, topic, callback, tag, name=None, - **kwargs): + exchange_name=None, **kwargs): """Init a 'topic' queue. :param channel: the amqp channel to use @@ -207,13 +230,15 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=rpc_amqp.get_control_exchange(conf), - type='topic', durable=options['durable'], - auto_delete=options['auto_delete']) + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) + exchange = kombu.entity.Exchange(name=exchange_name, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) super(TopicConsumer, self).__init__(channel, callback, tag, @@ -242,6 +267,7 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': True} options.update(kwargs) @@ -307,9 +333,12 @@ class TopicPublisher(Publisher): 'auto_delete': False, 'exclusive': False} options.update(kwargs) + exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(channel, - rpc_amqp.get_control_exchange(conf), topic, - type='topic', **options) + exchange_name, + topic, + type='topic', + **options) class FanoutPublisher(Publisher): @@ -332,6 +361,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -344,7 +374,8 @@ class NotifyPublisher(TopicPublisher): exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) queue.declare() @@ -369,31 +400,37 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = network_utils.parse_host_port( + adr, default_port=self.conf.rabbit_port) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + params_list.append(params) + + self.params_list = params_list + + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -423,14 +460,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -438,7 +475,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -451,8 +488,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -465,11 +502,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -484,12 +522,12 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + LOG.error(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -503,9 +541,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -513,7 +551,8 @@ class Connection(object): try: return method(*args, **kwargs) except (self.connection_errors, socket.timeout, IOError), e: - pass + if error_callback: + error_callback(e) except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport @@ -523,8 +562,8 @@ class Connection(object): # and try to reconnect in this case. if 'timeout' not in str(e): raise - if error_callback: - error_callback(e) + if error_callback: + error_callback(e) self.reconnect() def get_channel(self): @@ -626,10 +665,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index 697f4d8c..b47f25b3 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -69,7 +69,7 @@ qpid_opts = [ default=0, help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', - default=5, + default=60, help='Seconds between connection keepalive heartbeats'), cfg.StrOpt('qpid_protocol', default='tcp', @@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback, name=None): + def __init__(self, conf, session, topic, callback, name=None, + exchange_name=None): """Init a 'topic' queue. :param session: the amqp session to use @@ -180,9 +181,10 @@ class TopicConsumer(ConsumerBase): :param name: optional queue name, defaults to topic """ + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), - {}, name or topic, {}) + "%s/%s" % (exchange_name, topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -255,8 +257,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ + exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(session, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic)) + "%s/%s" % (exchange_name, topic)) class FanoutPublisher(Publisher): @@ -274,9 +277,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ + exchange_name = rpc_amqp.get_control_exchange(conf) super(NotifyPublisher, self).__init__(session, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), - {"durable": True}) + "%s/%s" % (exchange_name, topic), + {"durable": True}) class Connection(object): @@ -461,10 +465,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/heat/openstack/common/rpc/service.py b/heat/openstack/common/rpc/service.py new file mode 100644 index 00000000..44a9cdc8 --- /dev/null +++ b/heat/openstack/common/rpc/service.py @@ -0,0 +1,70 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.openstack.common.gettextutils import _ +from heat.openstack.common import log as logging +from heat.openstack.common import rpc +from heat.openstack.common.rpc import dispatcher as rpc_dispatcher +from heat.openstack.common import service + + +LOG = logging.getLogger(__name__) + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host.""" + def __init__(self, host, topic, manager=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, dispatcher, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) + + self.conn.create_consumer(self.topic, dispatcher, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop() diff --git a/heat/openstack/common/setup.py b/heat/openstack/common/setup.py index 628f5e3c..83eef07a 100644 --- a/heat/openstack/common/setup.py +++ b/heat/openstack/common/setup.py @@ -31,13 +31,13 @@ from setuptools.command import sdist def parse_mailmap(mailmap='.mailmap'): mapping = {} if os.path.exists(mailmap): - fp = open(mailmap, 'r') - for l in fp: - l = l.strip() - if not l.startswith('#') and ' ' in l: - canonical_email, alias = [x for x in l.split(' ') - if x.startswith('<')] - mapping[alias] = canonical_email + with open(mailmap, 'r') as fp: + for l in fp: + l = l.strip() + if not l.startswith('#') and ' ' in l: + canonical_email, alias = [x for x in l.split(' ') + if x.startswith('<')] + mapping[alias] = canonical_email return mapping @@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping): def get_reqs_from_files(requirements_files): for requirements_file in requirements_files: if os.path.exists(requirements_file): - return open(requirements_file, 'r').read().split('\n') + with open(requirements_file, 'r') as fil: + return fil.read().split('\n') return [] @@ -135,15 +136,17 @@ def _get_git_next_version_suffix(branch_name): _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*") milestone_cmd = "git show meta/openstack/release:%s" % branch_name milestonever = _run_shell_command(milestone_cmd) - if not milestonever: - milestonever = "" + if milestonever: + first_half = "%s~%s" % (milestonever, datestamp) + else: + first_half = datestamp + post_version = _get_git_post_version() # post version should look like: # 0.1.1.4.gcc9e28a # where the bit after the last . is the short sha, and the bit between # the last and second to last is the revno count (revno, sha) = post_version.split(".")[-2:] - first_half = "%s~%s" % (milestonever, datestamp) second_half = "%s%s.%s" % (revno_prefix, revno, sha) return ".".join((first_half, second_half)) @@ -191,14 +194,14 @@ def write_git_changelog(): def generate_authors(): """Create AUTHORS file using git commits.""" - jenkins_email = 'jenkins@review.openstack.org' + jenkins_email = 'jenkins@review.(openstack|stackforge).org' old_authors = 'AUTHORS.in' new_authors = 'AUTHORS' if not os.getenv('SKIP_GENERATE_AUTHORS'): if os.path.isdir('.git'): # don't include jenkins email address in AUTHORS file git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | " - "grep -v " + jenkins_email) + "egrep -v '" + jenkins_email + "'") changelog = _run_shell_command(git_log_cmd) mailmap = parse_mailmap() with open(new_authors, 'w') as new_authors_fh: @@ -236,7 +239,8 @@ def read_versioninfo(project): def write_versioninfo(project, version): """Write a simple file containing the version of the package.""" - open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version) + with open(os.path.join(project, 'versioninfo'), 'w') as fil: + fil.write("%s\n" % version) def get_cmdclass(): diff --git a/heat/openstack/common/timeutils.py b/heat/openstack/common/timeutils.py index c4f6cf04..86004391 100644 --- a/heat/openstack/common/timeutils.py +++ b/heat/openstack/common/timeutils.py @@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): def normalize_time(timestamp): - """Normalize time in arbitrary timezone to UTC""" + """Normalize time in arbitrary timezone to UTC naive object""" offset = timestamp.utcoffset() - return timestamp.replace(tzinfo=None) - offset if offset else timestamp + if offset is None: + return timestamp + return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): @@ -72,6 +74,11 @@ def is_older_than(before, seconds): return utcnow() - before > datetime.timedelta(seconds=seconds) +def is_newer_than(after, seconds): + """Return True if after is newer than seconds.""" + return after - utcnow() > datetime.timedelta(seconds=seconds) + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return calendar.timegm(utcnow().timetuple()) @@ -121,6 +128,10 @@ def marshall_now(now=None): def unmarshall_time(tyme): """Unmarshall a datetime dict.""" - return datetime.datetime(day=tyme['day'], month=tyme['month'], - year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], - second=tyme['second'], microsecond=tyme['microsecond']) + return datetime.datetime(day=tyme['day'], + month=tyme['month'], + year=tyme['year'], + hour=tyme['hour'], + minute=tyme['minute'], + second=tyme['second'], + microsecond=tyme['microsecond']) diff --git a/heat/openstack/common/utils.py b/heat/openstack/common/utils.py index e1d847cc..ce691681 100644 --- a/heat/openstack/common/utils.py +++ b/heat/openstack/common/utils.py @@ -23,8 +23,8 @@ import logging import random import shlex -from eventlet import greenthread from eventlet.green import subprocess +from eventlet import greenthread from heat.openstack.common import exception from heat.openstack.common.gettextutils import _ @@ -64,50 +64,6 @@ def bool_from_string(subject): return False -def parse_host_port(address, default_port=None): - """ - Interpret a string as a host:port pair. - An IPv6 address MUST be escaped if accompanied by a port, - because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334 - means both [2001:db8:85a3::8a2e:370:7334] and - [2001:db8:85a3::8a2e:370]:7334. - - >>> parse_host_port('server01:80') - ('server01', 80) - >>> parse_host_port('server01') - ('server01', None) - >>> parse_host_port('server01', default_port=1234) - ('server01', 1234) - >>> parse_host_port('[::1]:80') - ('::1', 80) - >>> parse_host_port('[::1]') - ('::1', None) - >>> parse_host_port('[::1]', default_port=1234) - ('::1', 1234) - >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) - ('2001:db8:85a3::8a2e:370:7334', 1234) - - """ - if address[0] == '[': - # Escaped ipv6 - _host, _port = address[1:].split(']') - host = _host - if ':' in _port: - port = _port.split(':')[1] - else: - port = default_port - else: - if address.count(':') == 1: - host, port = address.split(':') - else: - # 0 means ipv4, >1 means ipv6. - # We prohibit unescaped ipv6 addresses with port. - host = address - port = default_port - - return (host, None if port is None else int(port)) - - def execute(*cmd, **kwargs): """ Helper method to execute command with optional retry. -- 2.45.2