From: Steven Hardy Date: Tue, 13 Nov 2012 08:47:44 +0000 (+0000) Subject: heat align openstack/common with latest oslo-incubator X-Git-Tag: 2014.1~1225 X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=9096b586e2246cbc13b13141216a775720770c80;p=openstack-build%2Fheat-build.git heat align openstack/common with latest oslo-incubator Pull in the latest oslo changes, we need the threadgroup fix Fixes 1078064 Change-Id: I58cc94979558663d101a1a7b42446d1a9face6cf Signed-off-by: Steven Hardy --- diff --git a/heat/openstack/common/eventlet_backdoor.py b/heat/openstack/common/eventlet_backdoor.py index c2e73ddf..589d2ee4 100644 --- a/heat/openstack/common/eventlet_backdoor.py +++ b/heat/openstack/common/eventlet_backdoor.py @@ -73,6 +73,6 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - eventlet.spawn(eventlet.backdoor.backdoor_server, - eventlet.listen(('localhost', CONF.backdoor_port)), - locals=backdoor_locals) + eventlet.spawn_n(eventlet.backdoor.backdoor_server, + eventlet.listen(('localhost', CONF.backdoor_port)), + locals=backdoor_locals) diff --git a/heat/openstack/common/loopingcall.py b/heat/openstack/common/loopingcall.py index bc7de8c7..a58b02f2 100644 --- a/heat/openstack/common/loopingcall.py +++ b/heat/openstack/common/loopingcall.py @@ -78,7 +78,7 @@ class LoopingCall(object): self.done = done - greenthread.spawn(_inner) + greenthread.spawn_n(_inner) return self.done def stop(self): diff --git a/heat/openstack/common/notifier/rabbit_notifier.py b/heat/openstack/common/notifier/rabbit_notifier.py index 2ddb3215..74785c07 100644 --- a/heat/openstack/common/notifier/rabbit_notifier.py +++ b/heat/openstack/common/notifier/rabbit_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2012 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,33 +14,16 @@ # under the License. -from heat.openstack.common import cfg -from heat.openstack.common import context as req_context from heat.openstack.common.gettextutils import _ from heat.openstack.common import log as logging -from heat.openstack.common import rpc +from heat.openstack.common.notifier import rpc_notifier LOG = logging.getLogger(__name__) -notification_topic_opt = cfg.ListOpt( - 'notification_topics', default=['notifications', ], - help='AMQP topic used for openstack notifications') - -CONF = cfg.CONF -CONF.register_opt(notification_topic_opt) - def notify(context, message): - """Sends a notification to the RabbitMQ""" - if not context: - context = req_context.get_admin_context() - priority = message.get('priority', - CONF.default_notification_level) - priority = priority.lower() - for topic in CONF.notification_topics: - topic = '%s.%s' % (topic, priority) - try: - rpc.notify(context, topic, message) - except Exception, e: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) + """Deprecated in Grizzly. Please use rpc_notifier instead.""" + + LOG.deprecated(_("The rabbit_notifier is now deprecated." + " Please use rpc_notifier instead.")) + rpc_notifier.notify(context, message) diff --git a/heat/openstack/common/rpc/__init__.py b/heat/openstack/common/rpc/__init__.py index 2eb5f0a7..8046c216 100644 --- a/heat/openstack/common/rpc/__init__.py +++ b/heat/openstack/common/rpc/__init__.py @@ -250,7 +250,7 @@ def queue_get_for(context, topic, host): Messages sent to the 'foo.' topic are sent to the nova-foo service on . """ - return '%s.%s' % (topic, host) + return '%s.%s' % (topic, host) if host else topic _RPCIMPL = None diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py index d3f0107f..8c0e5464 100644 --- a/heat/openstack/common/rpc/impl_kombu.py +++ b/heat/openstack/common/rpc/impl_kombu.py @@ -409,18 +409,18 @@ class Connection(object): hostname, port = network_utils.parse_host_port( adr, default_port=self.conf.rabbit_port) - params = {} + params = { + 'hostname': hostname, + 'port': port, + 'userid': self.conf.rabbit_userid, + 'password': self.conf.rabbit_password, + 'virtual_host': self.conf.rabbit_virtual_host, + } for sp_key, value in server_params.iteritems(): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value - params.setdefault('hostname', hostname) - params.setdefault('port', port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) - if self.conf.fake_rabbit: params['transport'] = 'memory' if self.conf.rabbit_use_ssl: diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index b47f25b3..c7afcc48 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -50,24 +50,6 @@ qpid_opts = [ cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', default=60, help='Seconds between connection keepalive heartbeats'), @@ -294,50 +276,36 @@ class Connection(object): self.consumer_thread = None self.conf = conf - if server_params is None: - server_params = {} - - default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) - - params = server_params - for key in default_params.keys(): - params.setdefault(key, default_params[key]) + params = { + 'hostname': self.conf.qpid_hostname, + 'port': self.conf.qpid_port, + 'username': self.conf.qpid_username, + 'password': self.conf.qpid_password, + } + params.update(server_params or {}) self.broker = params['hostname'] + ":" + str(params['port']) + self.username = params['username'] + self.password = params['password'] + self.connection_create() + self.reconnect() + + def connection_create(self): # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = params['username'] - self.connection.password = params['password'] + self.connection.username = self.username + self.connection.password = self.password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms - self.connection.reconnect = self.conf.qpid_reconnect - if self.conf.qpid_reconnect_timeout: - self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) - if self.conf.qpid_reconnect_limit: - self.connection.reconnect_limit = self.conf.qpid_reconnect_limit - if self.conf.qpid_reconnect_interval_max: - self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) - if self.conf.qpid_reconnect_interval_min: - self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) - if self.conf.qpid_reconnect_interval: - self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + # Reconnection is done by self.reconnect() + self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay - # Open is part of reconnect - - # NOTE(WGH) not sure we need this with the reconnect flags - self.reconnect() - def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -352,12 +320,18 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + delay = 1 while True: try: + self.connection_create() self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s'), e) - time.sleep(self.conf.qpid_reconnect_interval or 1) + msg_dict = dict(e=e, delay=delay) + msg = _("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(2 * delay, 60) else: break @@ -365,10 +339,14 @@ class Connection(object): self.session = self.connection.session() - for consumer in self.consumers.itervalues(): - consumer.reconnect(self.session) - if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in consumers.itervalues(): + consumer.reconnect(self.session) + self._register_consumer(consumer) + LOG.debug(_("Re-established AMQP queues")) def ensure(self, error_callback, method, *args, **kwargs): diff --git a/heat/openstack/common/service.py b/heat/openstack/common/service.py index 913a4c79..e2fbae04 100644 --- a/heat/openstack/common/service.py +++ b/heat/openstack/common/service.py @@ -191,7 +191,7 @@ class ProcessLauncher(object): # Close write to ensure only parent has it open os.close(self.writepipe) # Create greenthread to watch for parent to close pipe - eventlet.spawn(self._pipe_watcher) + eventlet.spawn_n(self._pipe_watcher) # Reseed random number generator random.seed() @@ -275,6 +275,10 @@ class ProcessLauncher(object): def wait(self): """Loop waiting on children to die and respawning as necessary""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + while self.running: wrap = self._wait_child() if not wrap: diff --git a/heat/openstack/common/setup.py b/heat/openstack/common/setup.py index 83eef07a..e6f72f03 100644 --- a/heat/openstack/common/setup.py +++ b/heat/openstack/common/setup.py @@ -117,8 +117,12 @@ def write_requirements(): def _run_shell_command(cmd): - output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE) + if os.name == 'nt': + output = subprocess.Popen(["cmd.exe", "/C", cmd], + stdout=subprocess.PIPE) + else: + output = subprocess.Popen(["/bin/sh", "-c", cmd], + stdout=subprocess.PIPE) out = output.communicate() if len(out) == 0: return None diff --git a/heat/openstack/common/threadgroup.py b/heat/openstack/common/threadgroup.py index feee2a0e..09d9b398 100644 --- a/heat/openstack/common/threadgroup.py +++ b/heat/openstack/common/threadgroup.py @@ -47,7 +47,7 @@ class Thread(object): self.thread.link(_thread_done, group=group, thread=self) def stop(self): - self.thread.cancel() + self.thread.kill() def wait(self): return self.thread.wait()