]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Sync openstack common and add policy
authorClay Gerrard <clay.gerrard@gmail.com>
Fri, 28 Sep 2012 21:27:09 +0000 (16:27 -0500)
committerClay Gerrard <clay.gerrard@gmail.com>
Mon, 1 Oct 2012 15:17:46 +0000 (10:17 -0500)
openstack-common.conf now includes network_utils and policy

The default kombu imple for openstack.common.rpc seems to depends on
network_utils.

Fixes: bug #1058353
Change-Id: I2526c7355e7f2be67b25bf22854c56ec65741d9a

cinder/openstack/common/network_utils.py [new file with mode: 0644]
cinder/openstack/common/policy.py [moved from cinder/common/policy.py with 51% similarity]
cinder/openstack/common/rpc/impl_kombu.py
cinder/openstack/common/rpc/impl_qpid.py
cinder/openstack/common/rpc/impl_zmq.py
cinder/openstack/common/rpc/matchmaker.py
cinder/openstack/common/rpc/service.py [new file with mode: 0644]
cinder/openstack/common/timeutils.py
cinder/policy.py
cinder/tests/test_policy.py
openstack-common.conf

diff --git a/cinder/openstack/common/network_utils.py b/cinder/openstack/common/network_utils.py
new file mode 100644 (file)
index 0000000..69f6732
--- /dev/null
@@ -0,0 +1,68 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+    """
+    Interpret a string as a host:port pair.
+    An IPv6 address MUST be escaped if accompanied by a port,
+    because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+    means both [2001:db8:85a3::8a2e:370:7334] and
+    [2001:db8:85a3::8a2e:370]:7334.
+
+    >>> parse_host_port('server01:80')
+    ('server01', 80)
+    >>> parse_host_port('server01')
+    ('server01', None)
+    >>> parse_host_port('server01', default_port=1234)
+    ('server01', 1234)
+    >>> parse_host_port('[::1]:80')
+    ('::1', 80)
+    >>> parse_host_port('[::1]')
+    ('::1', None)
+    >>> parse_host_port('[::1]', default_port=1234)
+    ('::1', 1234)
+    >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+    ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+    """
+    if address[0] == '[':
+        # Escaped ipv6
+        _host, _port = address[1:].split(']')
+        host = _host
+        if ':' in _port:
+            port = _port.split(':')[1]
+        else:
+            port = default_port
+    else:
+        if address.count(':') == 1:
+            host, port = address.split(':')
+        else:
+            # 0 means ipv4, >1 means ipv6.
+            # We prohibit unescaped ipv6 addresses with port.
+            host = address
+            port = default_port
+
+    return (host, None if port is None else int(port))
similarity index 51%
rename from cinder/common/policy.py
rename to cinder/openstack/common/policy.py
index 49ecbdaa927c53bcba0cee045c75e7535054da89..0ca48ce9e89c509d6f8fde0eac2e3a1aca0d0480 100644 (file)
 
 """Common Policy Engine Implementation"""
 
+import logging
 import urllib
 import urllib2
 
+from cinder.openstack.common.gettextutils import _
 from cinder.openstack.common import jsonutils
 
 
-class NotAuthorized(Exception):
-    pass
+LOG = logging.getLogger(__name__)
 
 
 _BRAIN = None
@@ -46,7 +47,8 @@ def reset():
     _BRAIN = None
 
 
-def enforce(match_list, target_dict, credentials_dict):
+def enforce(match_list, target_dict, credentials_dict, exc=None,
+            *args, **kwargs):
     """Enforces authorization of some rules against credentials.
 
     :param match_list: nested tuples of data to match against
@@ -107,18 +109,35 @@ def enforce(match_list, target_dict, credentials_dict):
       Credentials dicts contain as much information as we can about the user
       performing the action.
 
-    :raises NotAuthorized: if the check fails
+    :param exc: exception to raise
+
+      Class of the exception to raise if the check fails.  Any remaining
+      arguments passed to enforce() (both positional and keyword arguments)
+      will be passed to the exception class.  If exc is not provided, returns
+      False.
 
+    :return: True if the policy allows the action
+    :return: False if the policy does not allow the action and exc is not set
     """
     global _BRAIN
     if not _BRAIN:
         _BRAIN = Brain()
     if not _BRAIN.check(match_list, target_dict, credentials_dict):
-        raise NotAuthorized()
+        if exc:
+            raise exc(*args, **kwargs)
+        return False
+    return True
 
 
 class Brain(object):
     """Implements policy checking."""
+
+    _checks = {}
+
+    @classmethod
+    def _register(cls, name, func):
+        cls._checks[name] = func
+
     @classmethod
     def load_json(cls, data, default_rule=None):
         """Init a brain using json instead of a rules dictionary."""
@@ -126,6 +145,11 @@ class Brain(object):
         return cls(rules=rules_dict, default_rule=default_rule)
 
     def __init__(self, rules=None, default_rule=None):
+        if self.__class__ != Brain:
+            LOG.warning(_("Inheritance-based rules are deprecated; use "
+                          "the default brain instead of %s.") %
+                        self.__class__.__name__)
+
         self.rules = rules or {}
         self.default_rule = default_rule
 
@@ -133,16 +157,31 @@ class Brain(object):
         self.rules[key] = match
 
     def _check(self, match, target_dict, cred_dict):
-        match_kind, match_value = match.split(':', 1)
         try:
-            f = getattr(self, '_check_%s' % match_kind)
+            match_kind, match_value = match.split(':', 1)
+        except Exception:
+            LOG.exception(_("Failed to understand rule %(match)r") % locals())
+            # If the rule is invalid, fail closed
+            return False
+
+        func = None
+        try:
+            old_func = getattr(self, '_check_%s' % match_kind)
         except AttributeError:
-            if not self._check_generic(match, target_dict, cred_dict):
-                return False
+            func = self._checks.get(match_kind, self._checks.get(None, None))
         else:
-            if not f(match_value, target_dict, cred_dict):
-                return False
-        return True
+            LOG.warning(_("Inheritance-based rules are deprecated; update "
+                          "_check_%s") % match_kind)
+            func = lambda brain, kind, value, target, cred: old_func(value,
+                                                                     target,
+                                                                     cred)
+
+        if not func:
+            LOG.error(_("No handler for matches of kind %s") % match_kind)
+            # Fail closed
+            return False
+
+        return func(self, match_kind, match_value, target_dict, cred_dict)
 
     def check(self, match_list, target_dict, cred_dict):
         """Checks authorization of some rules against credentials.
@@ -166,58 +205,97 @@ class Brain(object):
                 return True
         return False
 
-    def _check_rule(self, match, target_dict, cred_dict):
-        """Recursively checks credentials based on the brains rules."""
-        try:
-            new_match_list = self.rules[match]
-        except KeyError:
-            if self.default_rule and match != self.default_rule:
-                new_match_list = ('rule:%s' % self.default_rule,)
-            else:
-                return False
 
-        return self.check(new_match_list, target_dict, cred_dict)
+class HttpBrain(Brain):
+    """A brain that can check external urls for policy.
+
+    Posts json blobs for target and credentials.
+
+    Note that this brain is deprecated; the http check is registered
+    by default.
+    """
 
-    def _check_role(self, match, target_dict, cred_dict):
-        """Check that there is a matching role in the cred dict."""
-        return match.lower() in [x.lower() for x in cred_dict['roles']]
+    pass
 
-    def _check_generic(self, match, target_dict, cred_dict):
-        """Check an individual match.
 
-        Matches look like:
+def register(name, func=None):
+    """
+    Register a function as a policy check.
+
+    :param name: Gives the name of the check type, e.g., 'rule',
+                 'role', etc.  If name is None, a default function
+                 will be registered.
+    :param func: If given, provides the function to register.  If not
+                 given, returns a function taking one argument to
+                 specify the function to register, allowing use as a
+                 decorator.
+    """
 
-            tenant:%(tenant_id)s
-            role:compute:admin
+    # Perform the actual decoration by registering the function.
+    # Returns the function for compliance with the decorator
+    # interface.
+    def decorator(func):
+        # Register the function
+        Brain._register(name, func)
+        return func
+
+    # If the function is given, do the registration
+    if func:
+        return decorator(func)
+
+    return decorator
+
+
+@register("rule")
+def _check_rule(brain, match_kind, match, target_dict, cred_dict):
+    """Recursively checks credentials based on the brains rules."""
+    try:
+        new_match_list = brain.rules[match]
+    except KeyError:
+        if brain.default_rule and match != brain.default_rule:
+            new_match_list = ('rule:%s' % brain.default_rule,)
+        else:
+            return False
 
-        """
+    return brain.check(new_match_list, target_dict, cred_dict)
 
-        # TODO(termie): do dict inspection via dot syntax
-        match = match % target_dict
-        key, value = match.split(':', 1)
-        if key in cred_dict:
-            return value == cred_dict[key]
-        return False
 
+@register("role")
+def _check_role(brain, match_kind, match, target_dict, cred_dict):
+    """Check that there is a matching role in the cred dict."""
+    return match.lower() in [x.lower() for x in cred_dict['roles']]
 
-class HttpBrain(Brain):
-    """A brain that can check external urls for policy.
 
-    Posts json blobs for target and credentials.
+@register('http')
+def _check_http(brain, match_kind, match, target_dict, cred_dict):
+    """Check http: rules by calling to a remote server.
+
+    This example implementation simply verifies that the response is
+    exactly 'True'. A custom brain using response codes could easily
+    be implemented.
 
     """
+    url = 'http:' + (match % target_dict)
+    data = {'target': jsonutils.dumps(target_dict),
+            'credentials': jsonutils.dumps(cred_dict)}
+    post_data = urllib.urlencode(data)
+    f = urllib2.urlopen(url, post_data)
+    return f.read() == "True"
 
-    def _check_http(self, match, target_dict, cred_dict):
-        """Check http: rules by calling to a remote server.
 
-        This example implementation simply verifies that the response is
-        exactly 'True'. A custom brain using response codes could easily
-        be implemented.
+@register(None)
+def _check_generic(brain, match_kind, match, target_dict, cred_dict):
+    """Check an individual match.
 
-        """
-        url = match % target_dict
-        data = {'target': jsonutils.dumps(target_dict),
-                'credentials': jsonutils.dumps(cred_dict)}
-        post_data = urllib.urlencode(data)
-        f = urllib2.urlopen(url, post_data)
-        return f.read() == "True"
+    Matches look like:
+
+        tenant:%(tenant_id)s
+        role:compute:admin
+
+    """
+
+    # TODO(termie): do dict inspection via dot syntax
+    match = match % target_dict
+    if match_kind in cred_dict:
+        return match == unicode(cred_dict[match_kind])
+    return False
index faf6c138423d0aaa5fb41741d13bb000a6be760a..01f67defd510d0af853851aa561f3140e8068e4c 100644 (file)
@@ -33,6 +33,7 @@ from cinder.openstack.common import cfg
 from cinder.openstack.common.gettextutils import _
 from cinder.openstack.common.rpc import amqp as rpc_amqp
 from cinder.openstack.common.rpc import common as rpc_common
+from cinder.openstack.common import network_utils
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
@@ -50,10 +51,13 @@ kombu_opts = [
                      '(valid only if SSL enabled)')),
     cfg.StrOpt('rabbit_host',
                default='localhost',
-               help='the RabbitMQ host'),
+               help='The RabbitMQ broker address where a single node is used'),
     cfg.IntOpt('rabbit_port',
                default=5672,
-               help='the RabbitMQ port'),
+               help='The RabbitMQ broker port where a single node is used'),
+    cfg.ListOpt('rabbit_hosts',
+                default=['$rabbit_host:$rabbit_port'],
+                help='RabbitMQ HA cluster host:port pairs'),
     cfg.BoolOpt('rabbit_use_ssl',
                 default=False,
                 help='connect over SSL for RabbitMQ'),
@@ -80,6 +84,11 @@ kombu_opts = [
     cfg.BoolOpt('rabbit_durable_queues',
                 default=False,
                 help='use durable queues in RabbitMQ'),
+    cfg.BoolOpt('rabbit_ha_queues',
+                default=False,
+                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+                     'You need to wipe RabbitMQ database when '
+                     'changing this option.'),
 
 ]
 
@@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
 LOG = rpc_common.LOG
 
 
+def _get_queue_arguments(conf):
+    """Construct the arguments for declaring a queue.
+
+    If the rabbit_ha_queues option is set, we declare a mirrored queue
+    as described here:
+
+      http://www.rabbitmq.com/ha.html
+
+    Setting x-ha-policy to all means that the queue will be mirrored
+    to all nodes in the cluster.
+    """
+    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
 class ConsumerBase(object):
     """Consumer base class."""
 
@@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
     def __init__(self, conf, channel, topic, callback, tag, name=None,
-                 **kwargs):
+                 exchange_name=None, **kwargs):
         """Init a 'topic' queue.
 
         :param channel: the amqp channel to use
@@ -207,13 +230,15 @@ class TopicConsumer(ConsumerBase):
         """
         # Default options
         options = {'durable': conf.rabbit_durable_queues,
+                   'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(
-                name=rpc_amqp.get_control_exchange(conf),
-                type='topic', durable=options['durable'],
-                auto_delete=options['auto_delete'])
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        exchange = kombu.entity.Exchange(name=exchange_name,
+                                         type='topic',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
         super(TopicConsumer, self).__init__(channel,
                                             callback,
                                             tag,
@@ -307,9 +332,12 @@ class TopicPublisher(Publisher):
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(TopicPublisher, self).__init__(channel,
-                rpc_amqp.get_control_exchange(conf), topic,
-                type='topic', **options)
+                                             exchange_name,
+                                             topic,
+                                             type='topic',
+                                             **options)
 
 
 class FanoutPublisher(Publisher):
@@ -332,6 +360,7 @@ class NotifyPublisher(TopicPublisher):
 
     def __init__(self, conf, channel, topic, **kwargs):
         self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        self.queue_arguments = _get_queue_arguments(conf)
         super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
     def reconnect(self, channel):
@@ -344,7 +373,8 @@ class NotifyPublisher(TopicPublisher):
                                    exchange=self.exchange,
                                    durable=self.durable,
                                    name=self.routing_key,
-                                   routing_key=self.routing_key)
+                                   routing_key=self.routing_key,
+                                   queue_arguments=self.queue_arguments)
         queue.declare()
 
 
@@ -369,31 +399,37 @@ class Connection(object):
 
         if server_params is None:
             server_params = {}
-
         # Keys to translate from server_params to kombu params
         server_params_to_kombu_params = {'username': 'userid'}
 
-        params = {}
-        for sp_key, value in server_params.iteritems():
-            p_key = server_params_to_kombu_params.get(sp_key, sp_key)
-            params[p_key] = value
+        ssl_params = self._fetch_ssl_params()
+        params_list = []
+        for adr in self.conf.rabbit_hosts:
+            hostname, port = network_utils.parse_host_port(
+                adr, default_port=self.conf.rabbit_port)
 
-        params.setdefault('hostname', self.conf.rabbit_host)
-        params.setdefault('port', self.conf.rabbit_port)
-        params.setdefault('userid', self.conf.rabbit_userid)
-        params.setdefault('password', self.conf.rabbit_password)
-        params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+            params = {}
 
-        self.params = params
+            for sp_key, value in server_params.iteritems():
+                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+                params[p_key] = value
 
-        if self.conf.fake_rabbit:
-            self.params['transport'] = 'memory'
-            self.memory_transport = True
-        else:
-            self.memory_transport = False
+            params.setdefault('hostname', hostname)
+            params.setdefault('port', port)
+            params.setdefault('userid', self.conf.rabbit_userid)
+            params.setdefault('password', self.conf.rabbit_password)
+            params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+            if self.conf.fake_rabbit:
+                params['transport'] = 'memory'
+            if self.conf.rabbit_use_ssl:
+                params['ssl'] = ssl_params
 
-        if self.conf.rabbit_use_ssl:
-            self.params['ssl'] = self._fetch_ssl_params()
+            params_list.append(params)
+
+        self.params_list = params_list
+
+        self.memory_transport = self.conf.fake_rabbit
 
         self.connection = None
         self.reconnect()
@@ -423,14 +459,14 @@ class Connection(object):
             # Return the extended behavior
             return ssl_params
 
-    def _connect(self):
+    def _connect(self, params):
         """Connect to rabbit.  Re-establish any queues that may have
         been declared before if we are reconnecting.  Exceptions should
         be handled by the caller.
         """
         if self.connection:
             LOG.info(_("Reconnecting to AMQP server on "
-                     "%(hostname)s:%(port)d") % self.params)
+                     "%(hostname)s:%(port)d") % params)
             try:
                 self.connection.close()
             except self.connection_errors:
@@ -438,7 +474,7 @@ class Connection(object):
             # Setting this in case the next statement fails, though
             # it shouldn't be doing any network operations, yet.
             self.connection = None
-        self.connection = kombu.connection.BrokerConnection(**self.params)
+        self.connection = kombu.connection.BrokerConnection(**params)
         self.connection_errors = self.connection.connection_errors
         if self.memory_transport:
             # Kludge to speed up tests.
@@ -451,8 +487,8 @@ class Connection(object):
             self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
             consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
-                 self.params)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+                 params)
 
     def reconnect(self):
         """Handles reconnecting and re-establishing queues.
@@ -465,11 +501,12 @@ class Connection(object):
 
         attempt = 0
         while True:
+            params = self.params_list[attempt % len(self.params_list)]
             attempt += 1
             try:
-                self._connect()
+                self._connect(params)
                 return
-            except (self.connection_errors, IOError), e:
+            except (IOError, self.connection_errors) as e:
                 pass
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
@@ -484,12 +521,12 @@ class Connection(object):
             log_info = {}
             log_info['err_str'] = str(e)
             log_info['max_retries'] = self.max_retries
-            log_info.update(self.params)
+            log_info.update(params)
 
             if self.max_retries and attempt == self.max_retries:
-                LOG.exception(_('Unable to connect to AMQP server on '
-                              '%(hostname)s:%(port)d after %(max_retries)d '
-                              'tries: %(err_str)s') % log_info)
+                LOG.error(_('Unable to connect to AMQP server on '
+                            '%(hostname)s:%(port)d after %(max_retries)d '
+                            'tries: %(err_str)s') % log_info)
                 # NOTE(comstud): Copied from original code.  There's
                 # really no better recourse because if this was a queue we
                 # need to consume on, we have no way to consume anymore.
@@ -503,9 +540,9 @@ class Connection(object):
                 sleep_time = min(sleep_time, self.interval_max)
 
             log_info['sleep_time'] = sleep_time
-            LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
-                          ' unreachable: %(err_str)s. Trying again in '
-                          '%(sleep_time)d seconds.') % log_info)
+            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                        'unreachable: %(err_str)s. Trying again in '
+                        '%(sleep_time)d seconds.') % log_info)
             time.sleep(sleep_time)
 
     def ensure(self, error_callback, method, *args, **kwargs):
@@ -513,7 +550,8 @@ class Connection(object):
             try:
                 return method(*args, **kwargs)
             except (self.connection_errors, socket.timeout, IOError), e:
-                pass
+                if error_callback:
+                    error_callback(e)
             except Exception, e:
                 # NOTE(comstud): Unfortunately it's possible for amqplib
                 # to return an error not covered by its transport
@@ -523,8 +561,8 @@ class Connection(object):
                 # and try to reconnect in this case.
                 if 'timeout' not in str(e):
                     raise
-            if error_callback:
-                error_callback(e)
+                if error_callback:
+                    error_callback(e)
             self.reconnect()
 
     def get_channel(self):
@@ -626,10 +664,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
index d9fc8210f7500b019c009b52c7f16610dcc45d97..01f8a22c233b3219bea29ede6756ede637ff8923 100644 (file)
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
 class TopicConsumer(ConsumerBase):
     """Consumer class for 'topic'"""
 
-    def __init__(self, conf, session, topic, callback, name=None):
+    def __init__(self, conf, session, topic, callback, name=None,
+                 exchange_name=None):
         """Init a 'topic' queue.
 
         :param session: the amqp session to use
@@ -180,9 +181,10 @@ class TopicConsumer(ConsumerBase):
         :param name: optional queue name, defaults to topic
         """
 
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
         super(TopicConsumer, self).__init__(session, callback,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
-                {}, name or topic, {})
+                                            "%s/%s" % (exchange_name, topic),
+                                            {}, name or topic, {})
 
 
 class FanoutConsumer(ConsumerBase):
@@ -255,8 +257,9 @@ class TopicPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(TopicPublisher, self).__init__(session,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
+                                             "%s/%s" % (exchange_name, topic))
 
 
 class FanoutPublisher(Publisher):
@@ -274,9 +277,10 @@ class NotifyPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
         super(NotifyPublisher, self).__init__(session,
-                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
-                {"durable": True})
+                                              "%s/%s" % (exchange_name, topic),
+                                              {"durable": True})
 
 
 class Connection(object):
@@ -461,10 +465,12 @@ class Connection(object):
         """
         self.declare_consumer(DirectConsumer, topic, callback)
 
-    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
+                                                exchange_name=exchange_name,
                                                 ),
                               topic, callback)
 
index f3f3b9e90ce38e2f839f7d8993bccf2b3ce03ed2..570431778a061a9393b8ac92aad36d3cfafe22b8 100644 (file)
@@ -58,6 +58,9 @@ zmq_opts = [
     cfg.IntOpt('rpc_zmq_port', default=9501,
                help='ZeroMQ receiver listening port'),
 
+    cfg.IntOpt('rpc_zmq_port_pub', default=9502,
+               help='ZeroMQ fanout publisher port'),
+
     cfg.IntOpt('rpc_zmq_contexts', default=1,
                help='Number of ZeroMQ contexts, defaults to 1'),
 
@@ -206,7 +209,7 @@ class ZmqClient(object):
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
     def cast(self, msg_id, topic, data):
-        self.outq.send([str(msg_id), str(topic), str('cast'),
+        self.outq.send([str(topic), str(msg_id), str('cast'),
                         _serialize(data)])
 
     def close(self):
@@ -299,6 +302,9 @@ class ConsumerBase(object):
         else:
             return [result]
 
+    def consume(self, sock):
+        raise NotImplementedError()
+
     def process(self, style, target, proxy, ctx, data):
         # Method starting with - are
         # processed internally. (non-valid method name)
@@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
                       zmq.PUB, bind=True)
         self.sockets.append(self.topic_proxy['zmq_replies'])
 
+        self.topic_proxy['fanout~'] = \
+            ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
+                      CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
+        self.sockets.append(self.topic_proxy['fanout~'])
+
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
-        msg_id, topic, style, in_msg = data
+        topic, msg_id, style, in_msg = data
         topic = topic.split('.', 1)[0]
 
         LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
         # Handle zmq_replies magic
         if topic.startswith('fanout~'):
             sock_type = zmq.PUB
+
+            # This doesn't change what is in the message,
+            # it only specifies that these messages go to
+            # the generic fanout topic.
+            topic = 'fanout~'
         elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
             inside = _deserialize(in_msg)
@@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor):
         else:
             sock_type = zmq.PUSH
 
-        if not topic in self.topic_proxy:
-            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
-                             sock_type, bind=True)
-            self.topic_proxy[topic] = outq
-            self.sockets.append(outq)
-            LOG.info(_("Created topic proxy: %s"), topic)
-
-            # It takes some time for a pub socket to open,
-            # before we can have any faith in doing a send() to it.
-            if sock_type == zmq.PUB:
-                eventlet.sleep(.5)
+            if not topic in self.topic_proxy:
+                outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+                                 sock_type, bind=True)
+                self.topic_proxy[topic] = outq
+                self.sockets.append(outq)
+                LOG.info(_("Created topic proxy: %s"), topic)
 
         LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
         self.topic_proxy[topic].send(data)
         LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
 
 
+class CallbackReactor(ZmqBaseReactor):
+    """
+    A consumer class passing messages to a callback
+    """
+
+    def __init__(self, conf, callback):
+        self._cb = callback
+        super(CallbackReactor, self).__init__(conf)
+
+    def consume(self, sock):
+        data = sock.recv()
+        self._cb(data[3])
+
+
 class ZmqReactor(ZmqBaseReactor):
     """
     A consumer class implementing a
@@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor):
             self.mapping[sock].send(data)
             return
 
-        msg_id, topic, style, in_msg = data
+        topic, msg_id, style, in_msg = data
 
         ctx, request = _deserialize(in_msg)
         ctx = RpcContext.unmarshal(ctx)
@@ -488,6 +513,26 @@ class Connection(rpc_common.Connection):
     def __init__(self, conf):
         self.reactor = ZmqReactor(conf)
 
+    def _consume_fanout(self, reactor, topic, proxy, bind=False):
+        for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
+            inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
+            reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+
+    def declare_topic_consumer(self, topic, callback=None,
+                               queue_name=None):
+        """declare_topic_consumer is a private method, but
+           it is being used by Quantum (Folsom).
+           This has been added compatibility.
+        """
+        # Only consume on the base topic name.
+        topic = topic.split('.', 1)[0]
+
+        if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
+            return
+
+        reactor = CallbackReactor(CONF, callback)
+        self._consume_fanout(reactor, topic, None, bind=False)
+
     def create_consumer(self, topic, proxy, fanout=False):
         # Only consume on the base topic name.
         topic = topic.split('.', 1)[0]
@@ -495,22 +540,35 @@ class Connection(rpc_common.Connection):
         LOG.info(_("Create Consumer for topic (%(topic)s)") %
                  {'topic': topic})
 
-        # Subscription scenarios
+        # Consume direct-push fanout messages (relay to local consumers)
         if fanout:
-            subscribe = ('', fanout)[type(fanout) == str]
+            # If we're not in here, we can't receive direct fanout messages
+            if CONF.rpc_zmq_host in matchmaker.queues(topic):
+                # Consume from all remote publishers.
+                self._consume_fanout(self.reactor, topic, proxy)
+            else:
+                LOG.warn("This service cannot receive direct PUSH fanout "
+                         "messages without being known by the matchmaker.")
+                return
+
+            # Configure consumer for direct pushes.
+            subscribe = (topic, fanout)[type(fanout) == str]
             sock_type = zmq.SUB
             topic = 'fanout~' + topic
+
+            inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
         else:
             sock_type = zmq.PULL
             subscribe = None
 
-        # Receive messages from (local) proxy
-        inaddr = "ipc://%s/zmq_topic_%s" % \
-            (CONF.rpc_zmq_ipc_dir, topic)
+            # Receive messages from (local) proxy
+            inaddr = "ipc://%s/zmq_topic_%s" % \
+                (CONF.rpc_zmq_ipc_dir, topic)
 
         LOG.debug(_("Consumer is a zmq.%s"),
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
 
+        # Consume messages from local rpc-zmq-receiver daemon.
         self.reactor.register(proxy, inaddr, sock_type,
                               subscribe=subscribe, in_bind=False)
 
index 2c0aac5bb6b5b3ae534dd54e2c996b0b4f0b54eb..ffe4870aa4a0c12df6e7336fe076d2d3af53e1e6 100644 (file)
@@ -132,6 +132,14 @@ class FanoutBinding(Binding):
         return False
 
 
+class PublisherBinding(Binding):
+    """Match on publishers keys, where key starts with 'publishers.' string."""
+    def test(self, key):
+        if key.startswith('publishers~'):
+            return True
+        return False
+
+
 class StubExchange(Exchange):
     """Exchange that does nothing."""
     def run(self, key):
@@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
         return [(key + '.' + host, host)]
 
 
+class PublisherRingExchange(RingExchange):
+    """Fanout Exchange based on a hashmap."""
+    def __init__(self, ring=None):
+        super(PublisherRingExchange, self).__init__(ring)
+
+    def run(self, key):
+        # Assume starts with "publishers~", strip it for lookup.
+        nkey = key.split('publishers~')[1:][0]
+        if not self._ring_has(nkey):
+            LOG.warn(
+                _("No key defining hosts for topic '%s', "
+                  "see ringfile") % (nkey, )
+            )
+            return []
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
 class FanoutRingExchange(RingExchange):
     """Fanout Exchange based on a hashmap."""
     def __init__(self, ring=None):
@@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
                   "see ringfile") % (nkey, )
             )
             return []
-        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+        return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+                   ['localhost'])
 
 
 class LocalhostExchange(Exchange):
@@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
     """
     def __init__(self, ring=None):
         super(MatchMakerRing, self).__init__()
+        self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
         self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
     """
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
+        self.add_binding(PublisherBinding(), LocalhostExchange())
         self.add_binding(FanoutBinding(), LocalhostExchange())
         self.add_binding(DirectBinding(), DirectExchange())
         self.add_binding(TopicBinding(), LocalhostExchange())
@@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
     def __init__(self):
         super(MatchMakerLocalhost, self).__init__()
 
+        self.add_binding(PublisherBinding(), StubExchange())
         self.add_binding(FanoutBinding(), StubExchange())
         self.add_binding(DirectBinding(), StubExchange())
         self.add_binding(TopicBinding(), StubExchange())
diff --git a/cinder/openstack/common/rpc/service.py b/cinder/openstack/common/rpc/service.py
new file mode 100644 (file)
index 0000000..a35fd6a
--- /dev/null
@@ -0,0 +1,69 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import rpc
+from cinder.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+    """Service object for binaries running on hosts.
+
+    A service enables rpc by listening to queues based on topic and host."""
+    def __init__(self, host, topic, manager=None):
+        super(Service, self).__init__()
+        self.host = host
+        self.topic = topic
+        if manager is None:
+            self.manager = self
+        else:
+            self.manager = manager
+
+    def start(self):
+        super(Service, self).start()
+
+        self.conn = rpc.create_connection(new=True)
+        LOG.debug(_("Creating Consumer connection for Service %s") %
+                  self.topic)
+
+        rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
+
+        # Share this same connection for these Consumers
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
+
+        node_topic = '%s.%s' % (self.topic, self.host)
+        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
+
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
+
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+
+    def stop(self):
+        # Try to shut the connection down, but if we get any sort of
+        # errors, go ahead and ignore them.. as we're shutting down anyway
+        try:
+            self.conn.close()
+        except Exception:
+            pass
+        super(Service, self).stop()
index c4f6cf0497a5573270f104e98061ff69587213e6..93b34fc5b1d4b424a8503ac35f0f63b1109b1b0c 100644 (file)
@@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
 
 
 def normalize_time(timestamp):
-    """Normalize time in arbitrary timezone to UTC"""
+    """Normalize time in arbitrary timezone to UTC naive object"""
     offset = timestamp.utcoffset()
-    return timestamp.replace(tzinfo=None) - offset if offset else timestamp
+    if offset is None:
+        return timestamp
+    return timestamp.replace(tzinfo=None) - offset
 
 
 def is_older_than(before, seconds):
@@ -121,6 +123,10 @@ def marshall_now(now=None):
 
 def unmarshall_time(tyme):
     """Unmarshall a datetime dict."""
-    return datetime.datetime(day=tyme['day'], month=tyme['month'],
-                 year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
-                 second=tyme['second'], microsecond=tyme['microsecond'])
+    return datetime.datetime(day=tyme['day'],
+                             month=tyme['month'],
+                             year=tyme['year'],
+                             hour=tyme['hour'],
+                             minute=tyme['minute'],
+                             second=tyme['second'],
+                             microsecond=tyme['microsecond'])
index 646530385dcae3bb7e1f5504ac7287f869d06928..1a7e9ccd94c5e3a926e9653487ceaae91861589a 100644 (file)
 
 """Policy Engine For Cinder"""
 
-from cinder.common import policy
 from cinder import exception
 from cinder import flags
 from cinder.openstack.common import cfg
+from cinder.openstack.common import policy
 from cinder import utils
 
 
@@ -84,7 +84,5 @@ def enforce(context, action, target):
     match_list = ('rule:%s' % action,)
     credentials = context.to_dict()
 
-    try:
-        policy.enforce(match_list, target, credentials)
-    except policy.NotAuthorized:
-        raise exception.PolicyNotAuthorized(action=action)
+    policy.enforce(match_list, target, credentials,
+                   exception.PolicyNotAuthorized, action=action)
index 4cb66f088113bf1deb34c0f3b726aada8529fca9..3e339f755002230a2b243599212cddab60c3a852 100644 (file)
@@ -21,11 +21,11 @@ import os.path
 import StringIO
 import urllib2
 
-from cinder.common import policy as common_policy
 from cinder import context
 from cinder import exception
 from cinder import flags
-import cinder.common.policy
+import cinder.openstack.common.policy
+from cinder.openstack.common import policy as common_policy
 from cinder import policy
 from cinder import test
 from cinder import utils
@@ -169,8 +169,9 @@ class DefaultPolicyTestCase(test.TestCase):
         self.context = context.RequestContext('fake', 'fake')
 
     def _set_brain(self, default_rule):
-        brain = cinder.common.policy.HttpBrain(self.rules, default_rule)
-        cinder.common.policy.set_brain(brain)
+        brain = cinder.openstack.common.policy.HttpBrain(self.rules,
+                                                         default_rule)
+        cinder.openstack.common.policy.set_brain(brain)
 
     def tearDown(self):
         super(DefaultPolicyTestCase, self).tearDown()
index 091cab342049f7616384af14e9a24ed8fff20839..e31fb60cbde68c1efd744f4759fe8c444c8da766 100644 (file)
@@ -1,7 +1,7 @@
 [DEFAULT]
 
 # The list of modules to copy from openstack-common
-modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context
+modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy
 
 # The base module to hold the copy of openstack.common
 base=cinder