]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Support for several HA RabbitMQ servers.
authorEmilien Macchi <emilien.macchi@stackops.com>
Thu, 27 Sep 2012 14:20:29 +0000 (16:20 +0200)
committerEmilien Macchi <emilien.macchi@stackops.com>
Fri, 28 Sep 2012 16:14:21 +0000 (18:14 +0200)
Change-Id: I37f73867ff469133c39bf5f1a7fd48f48b0704d4

openstack-common.conf
quantum/openstack/common/network_utils.py [new file with mode: 0644]
quantum/openstack/common/rpc/impl_kombu.py

index e85fe9b02c61484e11ebc330702b47cc22aa94c2..c8aee3ac1db808ce47070796a6b2c27a5f55ef11 100644 (file)
@@ -1,5 +1,5 @@
 [DEFAULT]
 # The list of modules to copy from openstack-common
-modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,notifier,timeutils,log,context,local,rpc,gettextutils,excutils
+modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,network_utils,notifier,timeutils,log,context,local,rpc,gettextutils,excutils
 # The base module to hold the copy of openstack.common
 base=quantum
diff --git a/quantum/openstack/common/network_utils.py b/quantum/openstack/common/network_utils.py
new file mode 100644 (file)
index 0000000..69f6732
--- /dev/null
@@ -0,0 +1,68 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+    """
+    Interpret a string as a host:port pair.
+    An IPv6 address MUST be escaped if accompanied by a port,
+    because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+    means both [2001:db8:85a3::8a2e:370:7334] and
+    [2001:db8:85a3::8a2e:370]:7334.
+
+    >>> parse_host_port('server01:80')
+    ('server01', 80)
+    >>> parse_host_port('server01')
+    ('server01', None)
+    >>> parse_host_port('server01', default_port=1234)
+    ('server01', 1234)
+    >>> parse_host_port('[::1]:80')
+    ('::1', 80)
+    >>> parse_host_port('[::1]')
+    ('::1', None)
+    >>> parse_host_port('[::1]', default_port=1234)
+    ('::1', 1234)
+    >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+    ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+    """
+    if address[0] == '[':
+        # Escaped ipv6
+        _host, _port = address[1:].split(']')
+        host = _host
+        if ':' in _port:
+            port = _port.split(':')[1]
+        else:
+            port = default_port
+    else:
+        if address.count(':') == 1:
+            host, port = address.split(':')
+        else:
+            # 0 means ipv4, >1 means ipv6.
+            # We prohibit unescaped ipv6 addresses with port.
+            host = address
+            port = default_port
+
+    return (host, None if port is None else int(port))
index 4d5d533d1d217d2b9890536ed2b3b439299addba..64e671818b03a85a4e3a1c86a46092e50511fe24 100644 (file)
@@ -29,10 +29,11 @@ import kombu.connection
 import kombu.entity
 import kombu.messaging
 
-from quantum.openstack.common import cfg
-from quantum.openstack.common.gettextutils import _
-from quantum.openstack.common.rpc import amqp as rpc_amqp
-from quantum.openstack.common.rpc import common as rpc_common
+from openstack.common import cfg
+from openstack.common.gettextutils import _
+from openstack.common.rpc import amqp as rpc_amqp
+from openstack.common.rpc import common as rpc_common
+from openstack.common import network_utils
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
@@ -50,10 +51,13 @@ kombu_opts = [
                      '(valid only if SSL enabled)')),
     cfg.StrOpt('rabbit_host',
                default='localhost',
-               help='the RabbitMQ host'),
+               help='The RabbitMQ broker address where a single node is used'),
     cfg.IntOpt('rabbit_port',
                default=5672,
-               help='the RabbitMQ port'),
+               help='The RabbitMQ broker port where a single node is used'),
+    cfg.ListOpt('rabbit_hosts',
+                default=['$rabbit_host:$rabbit_port'],
+                help='RabbitMQ HA cluster host:port pairs'),
     cfg.BoolOpt('rabbit_use_ssl',
                 default=False,
                 help='connect over SSL for RabbitMQ'),
@@ -80,6 +84,11 @@ kombu_opts = [
     cfg.BoolOpt('rabbit_durable_queues',
                 default=False,
                 help='use durable queues in RabbitMQ'),
+    cfg.BoolOpt('rabbit_ha_queues',
+                default=False,
+                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+                     'You need to wipe RabbitMQ database when '
+                     'changing this option.'),
 
 ]
 
@@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
 LOG = rpc_common.LOG
 
 
+def _get_queue_arguments(conf):
+    """Construct the arguments for declaring a queue.
+
+    If the rabbit_ha_queues option is set, we declare a mirrored queue
+    as described here:
+
+      http://www.rabbitmq.com/ha.html
+
+    Setting x-ha-policy to all means that the queue will be mirrored
+    to all nodes in the cluster.
+    """
+    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
 class ConsumerBase(object):
     """Consumer base class."""
 
@@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
     def __init__(self, conf, channel, topic, callback, tag, name=None,
-                 **kwargs):
+                 exchange_name=None, **kwargs):
         """Init a 'topic' queue.
 
         :param channel: the amqp channel to use
@@ -207,10 +230,12 @@ class TopicConsumer(ConsumerBase):
         """
         # Default options
         options = {'durable': conf.rabbit_durable_queues,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(name=conf.control_exchange,
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        exchange = kombu.entity.Exchange(name=exchange_name,
                                          type='topic',
                                          durable=options['durable'],
                                          auto_delete=options['auto_delete'])
@@ -307,8 +332,12 @@ class TopicPublisher(Publisher):
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        super(TopicPublisher, self).__init__(channel, conf.control_exchange,
-                                             topic, type='topic', **options)
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(TopicPublisher, self).__init__(channel,
+                                             exchange_name,
+                                             topic,
+                                             type='topic',
+                                             **options)
 
 
 class FanoutPublisher(Publisher):
@@ -331,6 +360,7 @@ class NotifyPublisher(TopicPublisher):
 
     def __init__(self, conf, channel, topic, **kwargs):
         self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        self.queue_arguments = _get_queue_arguments(conf)
         super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
     def reconnect(self, channel):
@@ -343,7 +373,8 @@ class NotifyPublisher(TopicPublisher):
                                    exchange=self.exchange,
                                    durable=self.durable,
                                    name=self.routing_key,
-                                   routing_key=self.routing_key)
+                                   routing_key=self.routing_key,
+                                   queue_arguments=self.queue_arguments)
         queue.declare()
 
 
@@ -368,31 +399,37 @@ class Connection(object):
 
         if server_params is None:
             server_params = {}
-
         # Keys to translate from server_params to kombu params
         server_params_to_kombu_params = {'username': 'userid'}
 
-        params = {}
-        for sp_key, value in server_params.iteritems():
-            p_key = server_params_to_kombu_params.get(sp_key, sp_key)
-            params[p_key] = value
+        ssl_params = self._fetch_ssl_params()
+        params_list = []
+        for adr in self.conf.rabbit_hosts:
+            hostname, port = network_utils.parse_host_port(
+                adr, default_port=self.conf.rabbit_port)
 
-        params.setdefault('hostname', self.conf.rabbit_host)
-        params.setdefault('port', self.conf.rabbit_port)
-        params.setdefault('userid', self.conf.rabbit_userid)
-        params.setdefault('password', self.conf.rabbit_password)
-        params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+            params = {}
 
-        self.params = params
+            for sp_key, value in server_params.iteritems():
+                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+                params[p_key] = value
 
-        if self.conf.fake_rabbit:
-            self.params['transport'] = 'memory'
-            self.memory_transport = True
-        else:
-            self.memory_transport = False
+            params.setdefault('hostname', hostname)
+            params.setdefault('port', port)
+            params.setdefault('userid', self.conf.rabbit_userid)
+            params.setdefault('password', self.conf.rabbit_password)
+            params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+            if self.conf.fake_rabbit:
+                params['transport'] = 'memory'
+            if self.conf.rabbit_use_ssl:
+                params['ssl'] = ssl_params
+
+            params_list.append(params)
 
-        if self.conf.rabbit_use_ssl:
-            self.params['ssl'] = self._fetch_ssl_params()
+        self.params_list = params_list
+
+        self.memory_transport = self.conf.fake_rabbit
 
         self.connection = None
         self.reconnect()
@@ -422,14 +459,14 @@ class Connection(object):
             # Return the extended behavior
             return ssl_params
 
-    def _connect(self):
+    def _connect(self, params):
         """Connect to rabbit.  Re-establish any queues that may have
         been declared before if we are reconnecting.  Exceptions should
         be handled by the caller.
         """
         if self.connection:
             LOG.info(_("Reconnecting to AMQP server on "
-                     "%(hostname)s:%(port)d") % self.params)
+                     "%(hostname)s:%(port)d") % params)
             try:
                 self.connection.close()
             except self.connection_errors:
@@ -437,7 +474,7 @@ class Connection(object):
             # Setting this in case the next statement fails, though
             # it shouldn't be doing any network operations, yet.
             self.connection = None
-        self.connection = kombu.connection.BrokerConnection(**self.params)
+        self.connection = kombu.connection.BrokerConnection(**params)
         self.connection_errors = self.connection.connection_errors
         if self.memory_transport:
             # Kludge to speed up tests.
@@ -450,8 +487,8 @@ class Connection(object):
             self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
             consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
-                 self.params)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+                 params)
 
     def reconnect(self):
         """Handles reconnecting and re-establishing queues.
@@ -464,11 +501,12 @@ class Connection(object):
 
         attempt = 0
         while True:
+            params = self.params_list[attempt % len(self.params_list)]
             attempt += 1
             try:
-                self._connect()
+                self._connect(params)
                 return
-            except (self.connection_errors, IOError), e:
+            except (IOError, self.connection_errors) as e:
                 pass
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
@@ -483,12 +521,12 @@ class Connection(object):
             log_info = {}
             log_info['err_str'] = str(e)
             log_info['max_retries'] = self.max_retries
-            log_info.update(self.params)
+            log_info.update(params)
 
             if self.max_retries and attempt == self.max_retries:
-                LOG.exception(_('Unable to connect to AMQP server on '
-                              '%(hostname)s:%(port)d after %(max_retries)d '
-                              'tries: %(err_str)s') % log_info)
+                LOG.error(_('Unable to connect to AMQP server on '
+                            '%(hostname)s:%(port)d after %(max_retries)d '
+                            'tries: %(err_str)s') % log_info)
                 # NOTE(comstud): Copied from original code.  There's
                 # really no better recourse because if this was a queue we
                 # need to consume on, we have no way to consume anymore.
@@ -502,9 +540,9 @@ class Connection(object):
                 sleep_time = min(sleep_time, self.interval_max)
 
             log_info['sleep_time'] = sleep_time
-            LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
-                          ' unreachable: %(err_str)s. Trying again in '
-                          '%(sleep_time)d seconds.') % log_info)
+            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                        'unreachable: %(err_str)s. Trying again in '
+                        '%(sleep_time)d seconds.') % log_info)
             time.sleep(sleep_time)
 
     def ensure(self, error_callback, method, *args, **kwargs):
@@ -512,7 +550,8 @@ class Connection(object):
             try:
                 return method(*args, **kwargs)
             except (self.connection_errors, socket.timeout, IOError), e:
-                pass
+                if error_callback:
+                    error_callback(e)
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
                 # to return an error not covered by its transport
@@ -522,8 +561,8 @@ class Connection(object):
                 # and try to reconnect in this case.
                 if 'timeout' not in str(e):
                     raise
-            if error_callback:
-                error_callback(e)
+                if error_callback:
+                    error_callback(e)
             self.reconnect()
 
     def get_channel(self):
@@ -625,10 +664,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
@@ -749,3 +790,4 @@ def notify(conf, context, topic, msg):
 
 def cleanup():
     return rpc_amqp.cleanup(Connection.pool)
+