]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
heat align openstack/common with latest oslo-incubator
authorSteven Hardy <shardy@redhat.com>
Tue, 13 Nov 2012 08:47:44 +0000 (08:47 +0000)
committerSteven Hardy <shardy@redhat.com>
Tue, 13 Nov 2012 08:47:44 +0000 (08:47 +0000)
Pull in the latest oslo changes, we need the threadgroup fix
Fixes 1078064

Change-Id: I58cc94979558663d101a1a7b42446d1a9face6cf
Signed-off-by: Steven Hardy <shardy@redhat.com>
heat/openstack/common/eventlet_backdoor.py
heat/openstack/common/loopingcall.py
heat/openstack/common/notifier/rabbit_notifier.py
heat/openstack/common/rpc/__init__.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/service.py
heat/openstack/common/setup.py
heat/openstack/common/threadgroup.py

index c2e73ddfd41bfb822dc98022df64156187ea930c..589d2ee47a8f8ec9b245e3fb480722613d6a3913 100644 (file)
@@ -73,6 +73,6 @@ def initialize_if_enabled():
             pprint.pprint(val)
     sys.displayhook = displayhook
 
-    eventlet.spawn(eventlet.backdoor.backdoor_server,
-                   eventlet.listen(('localhost', CONF.backdoor_port)),
-                   locals=backdoor_locals)
+    eventlet.spawn_n(eventlet.backdoor.backdoor_server,
+                     eventlet.listen(('localhost', CONF.backdoor_port)),
+                     locals=backdoor_locals)
index bc7de8c7b1522bf12d6d5d8d630b8a11a677f4b0..a58b02f27c30a61c929d78b72823c6f04770ee6f 100644 (file)
@@ -78,7 +78,7 @@ class LoopingCall(object):
 
         self.done = done
 
-        greenthread.spawn(_inner)
+        greenthread.spawn_n(_inner)
         return self.done
 
     def stop(self):
index 2ddb32156462c69747a8a6c0c4e25230f86a1bc0..74785c071dde3e5c592838027a09def36445a663 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011 OpenStack LLC.
+# Copyright 2012 Red Hat, Inc.
 # All Rights Reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    under the License.
 
 
-from heat.openstack.common import cfg
-from heat.openstack.common import context as req_context
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import log as logging
-from heat.openstack.common import rpc
+from heat.openstack.common.notifier import rpc_notifier
 
 LOG = logging.getLogger(__name__)
 
-notification_topic_opt = cfg.ListOpt(
-    'notification_topics', default=['notifications', ],
-    help='AMQP topic used for openstack notifications')
-
-CONF = cfg.CONF
-CONF.register_opt(notification_topic_opt)
-
 
 def notify(context, message):
-    """Sends a notification to the RabbitMQ"""
-    if not context:
-        context = req_context.get_admin_context()
-    priority = message.get('priority',
-                           CONF.default_notification_level)
-    priority = priority.lower()
-    for topic in CONF.notification_topics:
-        topic = '%s.%s' % (topic, priority)
-        try:
-            rpc.notify(context, topic, message)
-        except Exception, e:
-            LOG.exception(_("Could not send notification to %(topic)s. "
-                            "Payload=%(message)s"), locals())
+    """Deprecated in Grizzly. Please use rpc_notifier instead."""
+
+    LOG.deprecated(_("The rabbit_notifier is now deprecated."
+                     " Please use rpc_notifier instead."))
+    rpc_notifier.notify(context, message)
index 2eb5f0a7da6057725fb289f63e0fc5ce07323bfa..8046c216f8fc66ca51006ef560af68615fe04677 100644 (file)
@@ -250,7 +250,7 @@ def queue_get_for(context, topic, host):
     Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
     <host>.
     """
-    return '%s.%s' % (topic, host)
+    return '%s.%s' % (topic, host) if host else topic
 
 
 _RPCIMPL = None
index d3f0107fb487c0fb0d201ddd48392fd763198e7b..8c0e5464304668d84e136c699dcab2002b909c6c 100644 (file)
@@ -409,18 +409,18 @@ class Connection(object):
             hostname, port = network_utils.parse_host_port(
                 adr, default_port=self.conf.rabbit_port)
 
-            params = {}
+            params = {
+                'hostname': hostname,
+                'port': port,
+                'userid': self.conf.rabbit_userid,
+                'password': self.conf.rabbit_password,
+                'virtual_host': self.conf.rabbit_virtual_host,
+            }
 
             for sp_key, value in server_params.iteritems():
                 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
                 params[p_key] = value
 
-            params.setdefault('hostname', hostname)
-            params.setdefault('port', port)
-            params.setdefault('userid', self.conf.rabbit_userid)
-            params.setdefault('password', self.conf.rabbit_password)
-            params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
-
             if self.conf.fake_rabbit:
                 params['transport'] = 'memory'
             if self.conf.rabbit_use_ssl:
index b47f25b3fea3ab53b9d9308a0a79eb38c8201b68..c7afcc48d42e4d1eaab7a56e4cf01fe5c4ba18dd 100644 (file)
@@ -50,24 +50,6 @@ qpid_opts = [
     cfg.StrOpt('qpid_sasl_mechanisms',
                default='',
                help='Space separated list of SASL mechanisms to use for auth'),
-    cfg.BoolOpt('qpid_reconnect',
-                default=True,
-                help='Automatically reconnect'),
-    cfg.IntOpt('qpid_reconnect_timeout',
-               default=0,
-               help='Reconnection timeout in seconds'),
-    cfg.IntOpt('qpid_reconnect_limit',
-               default=0,
-               help='Max reconnections before giving up'),
-    cfg.IntOpt('qpid_reconnect_interval_min',
-               default=0,
-               help='Minimum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval_max',
-               default=0,
-               help='Maximum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval',
-               default=0,
-               help='Equivalent to setting max and min to the same value'),
     cfg.IntOpt('qpid_heartbeat',
                default=60,
                help='Seconds between connection keepalive heartbeats'),
@@ -294,50 +276,36 @@ class Connection(object):
         self.consumer_thread = None
         self.conf = conf
 
-        if server_params is None:
-            server_params = {}
-
-        default_params = dict(hostname=self.conf.qpid_hostname,
-                              port=self.conf.qpid_port,
-                              username=self.conf.qpid_username,
-                              password=self.conf.qpid_password)
-
-        params = server_params
-        for key in default_params.keys():
-            params.setdefault(key, default_params[key])
+        params = {
+            'hostname': self.conf.qpid_hostname,
+            'port': self.conf.qpid_port,
+            'username': self.conf.qpid_username,
+            'password': self.conf.qpid_password,
+        }
+        params.update(server_params or {})
 
         self.broker = params['hostname'] + ":" + str(params['port'])
+        self.username = params['username']
+        self.password = params['password']
+        self.connection_create()
+        self.reconnect()
+
+    def connection_create(self):
         # Create the connection - this does not open the connection
         self.connection = qpid.messaging.Connection(self.broker)
 
         # Check if flags are set and if so set them for the connection
         # before we call open
-        self.connection.username = params['username']
-        self.connection.password = params['password']
+        self.connection.username = self.username
+        self.connection.password = self.password
+
         self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
-        self.connection.reconnect = self.conf.qpid_reconnect
-        if self.conf.qpid_reconnect_timeout:
-            self.connection.reconnect_timeout = (
-                self.conf.qpid_reconnect_timeout)
-        if self.conf.qpid_reconnect_limit:
-            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
-        if self.conf.qpid_reconnect_interval_max:
-            self.connection.reconnect_interval_max = (
-                self.conf.qpid_reconnect_interval_max)
-        if self.conf.qpid_reconnect_interval_min:
-            self.connection.reconnect_interval_min = (
-                self.conf.qpid_reconnect_interval_min)
-        if self.conf.qpid_reconnect_interval:
-            self.connection.reconnect_interval = (
-                self.conf.qpid_reconnect_interval)
+        # Reconnection is done by self.reconnect()
+        self.connection.reconnect = False
         self.connection.heartbeat = self.conf.qpid_heartbeat
         self.connection.protocol = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
-        # Open is part of reconnect -
-        # NOTE(WGH) not sure we need this with the reconnect flags
-        self.reconnect()
-
     def _register_consumer(self, consumer):
         self.consumers[str(consumer.get_receiver())] = consumer
 
@@ -352,12 +320,18 @@ class Connection(object):
             except qpid.messaging.exceptions.ConnectionError:
                 pass
 
+        delay = 1
         while True:
             try:
+                self.connection_create()
                 self.connection.open()
             except qpid.messaging.exceptions.ConnectionError, e:
-                LOG.error(_('Unable to connect to AMQP server: %s'), e)
-                time.sleep(self.conf.qpid_reconnect_interval or 1)
+                msg_dict = dict(e=e, delay=delay)
+                msg = _("Unable to connect to AMQP server: %(e)s. "
+                        "Sleeping %(delay)s seconds") % msg_dict
+                LOG.error(msg)
+                time.sleep(delay)
+                delay = min(2 * delay, 60)
             else:
                 break
 
@@ -365,10 +339,14 @@ class Connection(object):
 
         self.session = self.connection.session()
 
-        for consumer in self.consumers.itervalues():
-            consumer.reconnect(self.session)
-
         if self.consumers:
+            consumers = self.consumers
+            self.consumers = {}
+
+            for consumer in consumers.itervalues():
+                consumer.reconnect(self.session)
+                self._register_consumer(consumer)
+
             LOG.debug(_("Re-established AMQP queues"))
 
     def ensure(self, error_callback, method, *args, **kwargs):
index 913a4c79657dbf43eaac7629772421ab662baaa7..e2fbae04732b085121facf91d4402be1ed26c027 100644 (file)
@@ -191,7 +191,7 @@ class ProcessLauncher(object):
         # Close write to ensure only parent has it open
         os.close(self.writepipe)
         # Create greenthread to watch for parent to close pipe
-        eventlet.spawn(self._pipe_watcher)
+        eventlet.spawn_n(self._pipe_watcher)
 
         # Reseed random number generator
         random.seed()
@@ -275,6 +275,10 @@ class ProcessLauncher(object):
 
     def wait(self):
         """Loop waiting on children to die and respawning as necessary"""
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
         while self.running:
             wrap = self._wait_child()
             if not wrap:
index 83eef07a7be54ea67c3975fa1d7f050de23dcf87..e6f72f034ec06c176f1a82d691ed91f4c841c42d 100644 (file)
@@ -117,8 +117,12 @@ def write_requirements():
 
 
 def _run_shell_command(cmd):
-    output = subprocess.Popen(["/bin/sh", "-c", cmd],
-                              stdout=subprocess.PIPE)
+    if os.name == 'nt':
+        output = subprocess.Popen(["cmd.exe", "/C", cmd],
+                                  stdout=subprocess.PIPE)
+    else:
+        output = subprocess.Popen(["/bin/sh", "-c", cmd],
+                                  stdout=subprocess.PIPE)
     out = output.communicate()
     if len(out) == 0:
         return None
index feee2a0eb671f6c7a6a4afea7ea7ab6a14e4c07d..09d9b398ec5fcd335f3ebc758a53b82816a22b4e 100644 (file)
@@ -47,7 +47,7 @@ class Thread(object):
         self.thread.link(_thread_done, group=group, thread=self)
 
     def stop(self):
-        self.thread.cancel()
+        self.thread.kill()
 
     def wait(self):
         return self.thread.wait()