This module also contains a global instance of the CommonConfigOpts class
in order to support a common usage pattern in OpenStack:
- from openstack.common import cfg
+ from heat.openstack.common import cfg
opts = [
- cfg.StrOpt('bind_host' default='0.0.0.0'),
+ cfg.StrOpt('bind_host', default='0.0.0.0'),
cfg.IntOpt('bind_port', default=9292),
]
if ('default' in info or 'override' in info):
continue
- if self._get(opt.name, group) is None:
+ if self._get(opt.dest, group) is None:
raise RequiredOptError(opt.name, group)
def _parse_cli_opts(self, args):
Usual usage in an openstack.common module:
- from openstack.common.gettextutils import _
+ from heat.openstack.common.gettextutils import _
"""
import gettext
cfg.BoolOpt('publish_errors',
default=False,
help='publish error events'),
+ cfg.BoolOpt('fatal_deprecations',
+ default=False,
+ help='make deprecations fatal'),
# NOTE(mikal): there are two options here because sometimes we are handed
# a full instance (and could include more information), and other times we
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
+ def deprecated(self, msg, *args, **kwargs):
+ stdmsg = _("Deprecated Config: %s") % msg
+ if CONF.fatal_deprecations:
+ self.critical(stdmsg, *args, **kwargs)
+ raise DeprecatedConfig(msg=stdmsg)
+ else:
+ self.warn(stdmsg, *args, **kwargs)
+
def process(self, msg, kwargs):
if 'extra' not in kwargs:
kwargs['extra'] = {}
def format(self, record):
record.color = self.LEVEL_COLORS[record.levelno]
return logging.StreamHandler.format(self, record)
+
+
+class DeprecatedConfig(Exception):
+ message = _("Fatal call to deprecated config: %(msg)s")
+
+ def __init__(self, msg):
+ super(Exception, self).__init__(self.message % dict(msg=msg))
driver.notify(context, msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
- "send to notification system. Payload=%(payload)s") %
- locals())
+ "send to notification system. "
+ "Payload=%(payload)s") % locals())
_drivers = None
except ImportError as e:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
- notification_driver)
+ notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
+from heat.openstack.common import network_utils
from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
- help='the RabbitMQ host'),
+ help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port',
default=5672,
- help='the RabbitMQ port'),
+ help='The RabbitMQ broker port where a single node is used'),
+ cfg.ListOpt('rabbit_hosts',
+ default=['$rabbit_host:$rabbit_port'],
+ help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
+ cfg.BoolOpt('rabbit_ha_queues',
+ default=False,
+ help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+ 'You need to wipe RabbitMQ database when '
+ 'changing this option.'),
]
LOG = rpc_common.LOG
+def _get_queue_arguments(conf):
+ """Construct the arguments for declaring a queue.
+
+ If the rabbit_ha_queues option is set, we declare a mirrored queue
+ as described here:
+
+ http://www.rabbitmq.com/ha.html
+
+ Setting x-ha-policy to all means that the queue will be mirrored
+ to all nodes in the cluster.
+ """
+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
class ConsumerBase(object):
"""Consumer base class."""
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
- **kwargs):
+ exchange_name=None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=rpc_amqp.get_control_exchange(conf),
- type='topic', durable=options['durable'],
- auto_delete=options['auto_delete'])
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+ exchange = kombu.entity.Exchange(name=exchange_name,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
# Default options
options = {'durable': False,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel,
- rpc_amqp.get_control_exchange(conf), topic,
- type='topic', **options)
+ exchange_name,
+ topic,
+ type='topic',
+ **options)
class FanoutPublisher(Publisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
- routing_key=self.routing_key)
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments)
queue.declare()
if server_params is None:
server_params = {}
-
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
- params = {}
- for sp_key, value in server_params.iteritems():
- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
- params[p_key] = value
+ ssl_params = self._fetch_ssl_params()
+ params_list = []
+ for adr in self.conf.rabbit_hosts:
+ hostname, port = network_utils.parse_host_port(
+ adr, default_port=self.conf.rabbit_port)
- params.setdefault('hostname', self.conf.rabbit_host)
- params.setdefault('port', self.conf.rabbit_port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+ params = {}
- self.params = params
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
- if self.conf.fake_rabbit:
- self.params['transport'] = 'memory'
- self.memory_transport = True
- else:
- self.memory_transport = False
+ params.setdefault('hostname', hostname)
+ params.setdefault('port', port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+ if self.conf.fake_rabbit:
+ params['transport'] = 'memory'
+ if self.conf.rabbit_use_ssl:
+ params['ssl'] = ssl_params
- if self.conf.rabbit_use_ssl:
- self.params['ssl'] = self._fetch_ssl_params()
+ params_list.append(params)
+
+ self.params_list = params_list
+
+ self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.reconnect()
# Return the extended behavior
return ssl_params
- def _connect(self):
+ def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % params)
try:
self.connection.close()
except self.connection_errors:
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(**self.params)
+ self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
- self.params)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+ params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
attempt = 0
while True:
+ params = self.params_list[attempt % len(self.params_list)]
attempt += 1
try:
- self._connect()
+ self._connect(params)
return
- except (self.connection_errors, IOError), e:
+ except (IOError, self.connection_errors) as e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
- log_info.update(self.params)
+ log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ LOG.error(_('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
- LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+ 'unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e:
- pass
+ if error_callback:
+ error_callback(e)
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
- if error_callback:
- error_callback(e)
+ if error_callback:
+ error_callback(e)
self.reconnect()
def get_channel(self):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
- default=5,
+ default=60,
help='Seconds between connection keepalive heartbeats'),
cfg.StrOpt('qpid_protocol',
default='tcp',
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
:param name: optional queue name, defaults to topic
"""
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
- {}, name or topic, {})
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
+ "%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
- {"durable": True})
+ "%s/%s" % (exchange_name, topic),
+ {"durable": True})
class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import rpc
+from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
+from heat.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+ """Service object for binaries running on hosts.
+
+ A service enables rpc by listening to queues based on topic and host."""
+ def __init__(self, host, topic, manager=None):
+ super(Service, self).__init__()
+ self.host = host
+ self.topic = topic
+ if manager is None:
+ self.manager = self
+ else:
+ self.manager = manager
+
+ def start(self):
+ super(Service, self).start()
+
+ self.conn = rpc.create_connection(new=True)
+ LOG.debug(_("Creating Consumer connection for Service %s") %
+ self.topic)
+
+ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+
+ # Share this same connection for these Consumers
+ self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+
+ node_topic = '%s.%s' % (self.topic, self.host)
+ self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+
+ self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
+
+ def stop(self):
+ # Try to shut the connection down, but if we get any sort of
+ # errors, go ahead and ignore them.. as we're shutting down anyway
+ try:
+ self.conn.close()
+ except Exception:
+ pass
+ super(Service, self).stop()
def parse_mailmap(mailmap='.mailmap'):
mapping = {}
if os.path.exists(mailmap):
- fp = open(mailmap, 'r')
- for l in fp:
- l = l.strip()
- if not l.startswith('#') and ' ' in l:
- canonical_email, alias = [x for x in l.split(' ')
- if x.startswith('<')]
- mapping[alias] = canonical_email
+ with open(mailmap, 'r') as fp:
+ for l in fp:
+ l = l.strip()
+ if not l.startswith('#') and ' ' in l:
+ canonical_email, alias = [x for x in l.split(' ')
+ if x.startswith('<')]
+ mapping[alias] = canonical_email
return mapping
def get_reqs_from_files(requirements_files):
for requirements_file in requirements_files:
if os.path.exists(requirements_file):
- return open(requirements_file, 'r').read().split('\n')
+ with open(requirements_file, 'r') as fil:
+ return fil.read().split('\n')
return []
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
- if not milestonever:
- milestonever = ""
+ if milestonever:
+ first_half = "%s~%s" % (milestonever, datestamp)
+ else:
+ first_half = datestamp
+
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
- first_half = "%s~%s" % (milestonever, datestamp)
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))
def generate_authors():
"""Create AUTHORS file using git commits."""
- jenkins_email = 'jenkins@review.openstack.org'
+ jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if os.path.isdir('.git'):
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
- "grep -v " + jenkins_email)
+ "egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
with open(new_authors, 'w') as new_authors_fh:
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
- open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version)
+ with open(os.path.join(project, 'versioninfo'), 'w') as fil:
+ fil.write("%s\n" % version)
def get_cmdclass():
def normalize_time(timestamp):
- """Normalize time in arbitrary timezone to UTC"""
+ """Normalize time in arbitrary timezone to UTC naive object"""
offset = timestamp.utcoffset()
- return timestamp.replace(tzinfo=None) - offset if offset else timestamp
+ if offset is None:
+ return timestamp
+ return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
return utcnow() - before > datetime.timedelta(seconds=seconds)
+def is_newer_than(after, seconds):
+ """Return True if after is newer than seconds."""
+ return after - utcnow() > datetime.timedelta(seconds=seconds)
+
+
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
- return datetime.datetime(day=tyme['day'], month=tyme['month'],
- year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
- second=tyme['second'], microsecond=tyme['microsecond'])
+ return datetime.datetime(day=tyme['day'],
+ month=tyme['month'],
+ year=tyme['year'],
+ hour=tyme['hour'],
+ minute=tyme['minute'],
+ second=tyme['second'],
+ microsecond=tyme['microsecond'])
import random
import shlex
-from eventlet import greenthread
from eventlet.green import subprocess
+from eventlet import greenthread
from heat.openstack.common import exception
from heat.openstack.common.gettextutils import _
return False
-def parse_host_port(address, default_port=None):
- """
- Interpret a string as a host:port pair.
- An IPv6 address MUST be escaped if accompanied by a port,
- because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
- means both [2001:db8:85a3::8a2e:370:7334] and
- [2001:db8:85a3::8a2e:370]:7334.
-
- >>> parse_host_port('server01:80')
- ('server01', 80)
- >>> parse_host_port('server01')
- ('server01', None)
- >>> parse_host_port('server01', default_port=1234)
- ('server01', 1234)
- >>> parse_host_port('[::1]:80')
- ('::1', 80)
- >>> parse_host_port('[::1]')
- ('::1', None)
- >>> parse_host_port('[::1]', default_port=1234)
- ('::1', 1234)
- >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
- ('2001:db8:85a3::8a2e:370:7334', 1234)
-
- """
- if address[0] == '[':
- # Escaped ipv6
- _host, _port = address[1:].split(']')
- host = _host
- if ':' in _port:
- port = _port.split(':')[1]
- else:
- port = default_port
- else:
- if address.count(':') == 1:
- host, port = address.split(':')
- else:
- # 0 means ipv4, >1 means ipv6.
- # We prohibit unescaped ipv6 addresses with port.
- host = address
- port = default_port
-
- return (host, None if port is None else int(port))
-
-
def execute(*cmd, **kwargs):
"""
Helper method to execute command with optional retry.