]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update latest openstack-common code
authorGary Kotton <gkotton@redhat.com>
Thu, 8 Nov 2012 21:16:01 +0000 (21:16 +0000)
committerGary Kotton <gkotton@redhat.com>
Thu, 8 Nov 2012 21:16:01 +0000 (21:16 +0000)
This fixes bug 1073999 (quantum/openstack/common/rpc/impl_qpid.py)

In addition to this the common code is updated.

Change-Id: I41223963baf34772edcd0d6d7ef5686a5fad1035

quantum/openstack/common/cfg.py
quantum/openstack/common/exception.py
quantum/openstack/common/gettextutils.py
quantum/openstack/common/log.py
quantum/openstack/common/rpc/__init__.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/threadgroup.py
quantum/openstack/common/uuidutils.py

index 4fcd242517c91965ab15bce4551fa08193e458b0..c760ede2a0b86ebddcf6afa8c9cbb6e18bfc7d0b 100644 (file)
@@ -236,7 +236,7 @@ log files:
 This module also contains a global instance of the CommonConfigOpts class
 in order to support a common usage pattern in OpenStack:
 
-  from openstack.common import cfg
+  from quantum.openstack.common import cfg
 
   opts = [
     cfg.StrOpt('bind_host', default='0.0.0.0'),
index ba32da550b01a1b353becc043b544fecde474eb7..4866de2fd2beaeee9b8227d7cd3e2227a14eacd6 100644 (file)
@@ -22,18 +22,6 @@ Exceptions common to OpenStack projects
 import logging
 
 
-class ProcessExecutionError(IOError):
-    def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
-                 description=None):
-        if description is None:
-            description = "Unexpected error while running command."
-        if exit_code is None:
-            exit_code = '-'
-        message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
-                  description, cmd, exit_code, stdout, stderr)
-        IOError.__init__(self, message)
-
-
 class Error(Exception):
     def __init__(self, message=None):
         super(Error, self).__init__(message)
index 235350cc498b9abd3cbe3a5a75ca974fc283a881..81076c1f3bf22bb45dbbb35225a33d29431ccedc 100644 (file)
@@ -20,7 +20,7 @@ gettext for openstack-common modules.
 
 Usual usage in an openstack.common module:
 
-    from openstack.common.gettextutils import _
+    from quantum.openstack.common.gettextutils import _
 """
 
 import gettext
index acdf96def0eba786355d59b12723460506e1d07e..9a9dfc577fa085753ebce06127be0c2fdb1a08f9 100644 (file)
@@ -257,7 +257,7 @@ class JSONFormatter(logging.Formatter):
 
 class PublishErrorsHandler(logging.Handler):
     def emit(self, record):
-        if ('openstack.common.notifier.log_notifier' in
+        if ('quantum.openstack.common.notifier.log_notifier' in
             CONF.notification_driver):
             return
         notifier.api.notify(None, 'error.publisher',
index 89940ef1aa97c63f4d6e9f5700d9b1fbe64b5b22..f26919306d5b9f7c969d1dff014c914bd6bb7050 100644 (file)
@@ -250,7 +250,7 @@ def queue_get_for(context, topic, host):
     Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
     <host>.
     """
-    return '%s.%s' % (topic, host)
+    return '%s.%s' % (topic, host) if host else topic
 
 
 _RPCIMPL = None
index facf898866326f919fee6bb754e9f761f86eea9c..b0b292794c31ced5dda6885444031b5d9791577f 100644 (file)
@@ -267,6 +267,7 @@ class FanoutConsumer(ConsumerBase):
 
         # Default options
         options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': True,
                    'exclusive': True}
         options.update(kwargs)
@@ -408,18 +409,18 @@ class Connection(object):
             hostname, port = network_utils.parse_host_port(
                 adr, default_port=self.conf.rabbit_port)
 
-            params = {}
+            params = {
+                'hostname': hostname,
+                'port': port,
+                'userid': self.conf.rabbit_userid,
+                'password': self.conf.rabbit_password,
+                'virtual_host': self.conf.rabbit_virtual_host,
+            }
 
             for sp_key, value in server_params.iteritems():
                 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
                 params[p_key] = value
 
-            params.setdefault('hostname', hostname)
-            params.setdefault('port', port)
-            params.setdefault('userid', self.conf.rabbit_userid)
-            params.setdefault('password', self.conf.rabbit_password)
-            params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
-
             if self.conf.fake_rabbit:
                 params['transport'] = 'memory'
             if self.conf.rabbit_use_ssl:
@@ -776,7 +777,7 @@ def cast_to_server(conf, context, server_params, topic, msg):
 
 def fanout_cast_to_server(conf, context, server_params, topic, msg):
     """Sends a message on a fanout exchange to a specific server."""
-    return rpc_amqp.cast_to_server(
+    return rpc_amqp.fanout_cast_to_server(
         conf, context, server_params, topic, msg,
         rpc_amqp.get_connection_pool(conf, Connection))
 
index efdf21211d9a8e4e3ecdeeaffd9d694f2b5c3fad..d8813bbfdf838daa58859616459c50c3e02cf860 100644 (file)
@@ -50,24 +50,6 @@ qpid_opts = [
     cfg.StrOpt('qpid_sasl_mechanisms',
                default='',
                help='Space separated list of SASL mechanisms to use for auth'),
-    cfg.BoolOpt('qpid_reconnect',
-                default=True,
-                help='Automatically reconnect'),
-    cfg.IntOpt('qpid_reconnect_timeout',
-               default=0,
-               help='Reconnection timeout in seconds'),
-    cfg.IntOpt('qpid_reconnect_limit',
-               default=0,
-               help='Max reconnections before giving up'),
-    cfg.IntOpt('qpid_reconnect_interval_min',
-               default=0,
-               help='Minimum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval_max',
-               default=0,
-               help='Maximum seconds between reconnection attempts'),
-    cfg.IntOpt('qpid_reconnect_interval',
-               default=0,
-               help='Equivalent to setting max and min to the same value'),
     cfg.IntOpt('qpid_heartbeat',
                default=60,
                help='Seconds between connection keepalive heartbeats'),
@@ -294,50 +276,36 @@ class Connection(object):
         self.consumer_thread = None
         self.conf = conf
 
-        if server_params is None:
-            server_params = {}
-
-        default_params = dict(hostname=self.conf.qpid_hostname,
-                              port=self.conf.qpid_port,
-                              username=self.conf.qpid_username,
-                              password=self.conf.qpid_password)
-
-        params = server_params
-        for key in default_params.keys():
-            params.setdefault(key, default_params[key])
+        params = {
+            'hostname': self.conf.qpid_hostname,
+            'port': self.conf.qpid_port,
+            'username': self.conf.qpid_username,
+            'password': self.conf.qpid_password,
+        }
+        params.update(server_params or {})
 
         self.broker = params['hostname'] + ":" + str(params['port'])
+        self.username = params['username']
+        self.password = params['password']
+        self.connection_create()
+        self.reconnect()
+
+    def connection_create(self):
         # Create the connection - this does not open the connection
         self.connection = qpid.messaging.Connection(self.broker)
 
         # Check if flags are set and if so set them for the connection
         # before we call open
-        self.connection.username = params['username']
-        self.connection.password = params['password']
+        self.connection.username = self.username
+        self.connection.password = self.password
+
         self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
-        self.connection.reconnect = self.conf.qpid_reconnect
-        if self.conf.qpid_reconnect_timeout:
-            self.connection.reconnect_timeout = (
-                self.conf.qpid_reconnect_timeout)
-        if self.conf.qpid_reconnect_limit:
-            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
-        if self.conf.qpid_reconnect_interval_max:
-            self.connection.reconnect_interval_max = (
-                self.conf.qpid_reconnect_interval_max)
-        if self.conf.qpid_reconnect_interval_min:
-            self.connection.reconnect_interval_min = (
-                self.conf.qpid_reconnect_interval_min)
-        if self.conf.qpid_reconnect_interval:
-            self.connection.reconnect_interval = (
-                self.conf.qpid_reconnect_interval)
+        # Reconnection is done by self.reconnect()
+        self.connection.reconnect = False
         self.connection.heartbeat = self.conf.qpid_heartbeat
         self.connection.protocol = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
-        # Open is part of reconnect -
-        # NOTE(WGH) not sure we need this with the reconnect flags
-        self.reconnect()
-
     def _register_consumer(self, consumer):
         self.consumers[str(consumer.get_receiver())] = consumer
 
@@ -352,12 +320,18 @@ class Connection(object):
             except qpid.messaging.exceptions.ConnectionError:
                 pass
 
+        delay = 1
         while True:
             try:
+                self.connection_create()
                 self.connection.open()
             except qpid.messaging.exceptions.ConnectionError, e:
-                LOG.error(_('Unable to connect to AMQP server: %s'), e)
-                time.sleep(self.conf.qpid_reconnect_interval or 1)
+                msg_dict = dict(e=e, delay=delay)
+                msg = _("Unable to connect to AMQP server: %(e)s. "
+                        "Sleeping %(delay)s seconds") % msg_dict
+                LOG.error(msg)
+                time.sleep(delay)
+                delay = min(2 * delay, 60)
             else:
                 break
 
@@ -365,10 +339,14 @@ class Connection(object):
 
         self.session = self.connection.session()
 
-        for consumer in self.consumers.itervalues():
-            consumer.reconnect(self.session)
-
         if self.consumers:
+            consumers = self.consumers
+            self.consumers = {}
+
+            for consumer in consumers.itervalues():
+                consumer.reconnect(self.session)
+                self._register_consumer(consumer)
+
             LOG.debug(_("Re-established AMQP queues"))
 
     def ensure(self, error_callback, method, *args, **kwargs):
index f4fcce0389a2f38528bdde1295dbd925b7f8e406..02a44d21ba01c34d4361d1f70abf79b87664b597 100644 (file)
@@ -546,7 +546,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     timeout = timeout or CONF.rpc_response_timeout
 
     # The msg_id is used to track replies.
-    msg_id = str(uuid.uuid4().hex)
+    msg_id = uuid.uuid4().hex
 
     # Replies always come into the reply service.
     reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
index 9d39d0498342e9102197c37bc058b34be01b2305..0aea30ff62731d57e29fbc80abe8e77eea29f174 100644 (file)
@@ -47,7 +47,7 @@ class Thread(object):
         self.thread.link(_thread_done, group=group, thread=self)
 
     def stop(self):
-        self.thread.cancel()
+        self.thread.kill()
 
     def wait(self):
         return self.thread.wait()
index 51042a798dfae767235541c0652227488c7c7526..7608acb9421fe93b28d8a0fffab2d66190548149 100644 (file)
@@ -22,6 +22,10 @@ UUID related utilities and helper functions.
 import uuid
 
 
+def generate_uuid():
+    return str(uuid.uuid4())
+
+
 def is_uuid_like(val):
     """Returns validation of a value as a UUID.