From 4cfb90a9b82c52c476187204f8b0a83d9ba7c186 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Fri, 28 Sep 2012 16:27:09 -0500 Subject: [PATCH] Sync openstack common and add policy openstack-common.conf now includes network_utils and policy The default kombu imple for openstack.common.rpc seems to depends on network_utils. Fixes: bug #1058353 Change-Id: I2526c7355e7f2be67b25bf22854c56ec65741d9a --- cinder/openstack/common/network_utils.py | 68 ++++++++ cinder/{ => openstack}/common/policy.py | 182 +++++++++++++++------- cinder/openstack/common/rpc/impl_kombu.py | 132 ++++++++++------ cinder/openstack/common/rpc/impl_qpid.py | 20 ++- cinder/openstack/common/rpc/impl_zmq.py | 96 +++++++++--- cinder/openstack/common/rpc/matchmaker.py | 31 +++- cinder/openstack/common/rpc/service.py | 69 ++++++++ cinder/openstack/common/timeutils.py | 16 +- cinder/policy.py | 8 +- cinder/tests/test_policy.py | 9 +- openstack-common.conf | 2 +- 11 files changed, 493 insertions(+), 140 deletions(-) create mode 100644 cinder/openstack/common/network_utils.py rename cinder/{ => openstack}/common/policy.py (51%) create mode 100644 cinder/openstack/common/rpc/service.py diff --git a/cinder/openstack/common/network_utils.py b/cinder/openstack/common/network_utils.py new file mode 100644 index 000000000..69f673216 --- /dev/null +++ b/cinder/openstack/common/network_utils.py @@ -0,0 +1,68 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network-related utilities and helper functions. +""" + +import logging + +LOG = logging.getLogger(__name__) + + +def parse_host_port(address, default_port=None): + """ + Interpret a string as a host:port pair. + An IPv6 address MUST be escaped if accompanied by a port, + because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334 + means both [2001:db8:85a3::8a2e:370:7334] and + [2001:db8:85a3::8a2e:370]:7334. + + >>> parse_host_port('server01:80') + ('server01', 80) + >>> parse_host_port('server01') + ('server01', None) + >>> parse_host_port('server01', default_port=1234) + ('server01', 1234) + >>> parse_host_port('[::1]:80') + ('::1', 80) + >>> parse_host_port('[::1]') + ('::1', None) + >>> parse_host_port('[::1]', default_port=1234) + ('::1', 1234) + >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) + ('2001:db8:85a3::8a2e:370:7334', 1234) + + """ + if address[0] == '[': + # Escaped ipv6 + _host, _port = address[1:].split(']') + host = _host + if ':' in _port: + port = _port.split(':')[1] + else: + port = default_port + else: + if address.count(':') == 1: + host, port = address.split(':') + else: + # 0 means ipv4, >1 means ipv6. + # We prohibit unescaped ipv6 addresses with port. + host = address + port = default_port + + return (host, None if port is None else int(port)) diff --git a/cinder/common/policy.py b/cinder/openstack/common/policy.py similarity index 51% rename from cinder/common/policy.py rename to cinder/openstack/common/policy.py index 49ecbdaa9..0ca48ce9e 100644 --- a/cinder/common/policy.py +++ b/cinder/openstack/common/policy.py @@ -17,14 +17,15 @@ """Common Policy Engine Implementation""" +import logging import urllib import urllib2 +from cinder.openstack.common.gettextutils import _ from cinder.openstack.common import jsonutils -class NotAuthorized(Exception): - pass +LOG = logging.getLogger(__name__) _BRAIN = None @@ -46,7 +47,8 @@ def reset(): _BRAIN = None -def enforce(match_list, target_dict, credentials_dict): +def enforce(match_list, target_dict, credentials_dict, exc=None, + *args, **kwargs): """Enforces authorization of some rules against credentials. :param match_list: nested tuples of data to match against @@ -107,18 +109,35 @@ def enforce(match_list, target_dict, credentials_dict): Credentials dicts contain as much information as we can about the user performing the action. - :raises NotAuthorized: if the check fails + :param exc: exception to raise + + Class of the exception to raise if the check fails. Any remaining + arguments passed to enforce() (both positional and keyword arguments) + will be passed to the exception class. If exc is not provided, returns + False. + :return: True if the policy allows the action + :return: False if the policy does not allow the action and exc is not set """ global _BRAIN if not _BRAIN: _BRAIN = Brain() if not _BRAIN.check(match_list, target_dict, credentials_dict): - raise NotAuthorized() + if exc: + raise exc(*args, **kwargs) + return False + return True class Brain(object): """Implements policy checking.""" + + _checks = {} + + @classmethod + def _register(cls, name, func): + cls._checks[name] = func + @classmethod def load_json(cls, data, default_rule=None): """Init a brain using json instead of a rules dictionary.""" @@ -126,6 +145,11 @@ class Brain(object): return cls(rules=rules_dict, default_rule=default_rule) def __init__(self, rules=None, default_rule=None): + if self.__class__ != Brain: + LOG.warning(_("Inheritance-based rules are deprecated; use " + "the default brain instead of %s.") % + self.__class__.__name__) + self.rules = rules or {} self.default_rule = default_rule @@ -133,16 +157,31 @@ class Brain(object): self.rules[key] = match def _check(self, match, target_dict, cred_dict): - match_kind, match_value = match.split(':', 1) try: - f = getattr(self, '_check_%s' % match_kind) + match_kind, match_value = match.split(':', 1) + except Exception: + LOG.exception(_("Failed to understand rule %(match)r") % locals()) + # If the rule is invalid, fail closed + return False + + func = None + try: + old_func = getattr(self, '_check_%s' % match_kind) except AttributeError: - if not self._check_generic(match, target_dict, cred_dict): - return False + func = self._checks.get(match_kind, self._checks.get(None, None)) else: - if not f(match_value, target_dict, cred_dict): - return False - return True + LOG.warning(_("Inheritance-based rules are deprecated; update " + "_check_%s") % match_kind) + func = lambda brain, kind, value, target, cred: old_func(value, + target, + cred) + + if not func: + LOG.error(_("No handler for matches of kind %s") % match_kind) + # Fail closed + return False + + return func(self, match_kind, match_value, target_dict, cred_dict) def check(self, match_list, target_dict, cred_dict): """Checks authorization of some rules against credentials. @@ -166,58 +205,97 @@ class Brain(object): return True return False - def _check_rule(self, match, target_dict, cred_dict): - """Recursively checks credentials based on the brains rules.""" - try: - new_match_list = self.rules[match] - except KeyError: - if self.default_rule and match != self.default_rule: - new_match_list = ('rule:%s' % self.default_rule,) - else: - return False - return self.check(new_match_list, target_dict, cred_dict) +class HttpBrain(Brain): + """A brain that can check external urls for policy. + + Posts json blobs for target and credentials. + + Note that this brain is deprecated; the http check is registered + by default. + """ - def _check_role(self, match, target_dict, cred_dict): - """Check that there is a matching role in the cred dict.""" - return match.lower() in [x.lower() for x in cred_dict['roles']] + pass - def _check_generic(self, match, target_dict, cred_dict): - """Check an individual match. - Matches look like: +def register(name, func=None): + """ + Register a function as a policy check. + + :param name: Gives the name of the check type, e.g., 'rule', + 'role', etc. If name is None, a default function + will be registered. + :param func: If given, provides the function to register. If not + given, returns a function taking one argument to + specify the function to register, allowing use as a + decorator. + """ - tenant:%(tenant_id)s - role:compute:admin + # Perform the actual decoration by registering the function. + # Returns the function for compliance with the decorator + # interface. + def decorator(func): + # Register the function + Brain._register(name, func) + return func + + # If the function is given, do the registration + if func: + return decorator(func) + + return decorator + + +@register("rule") +def _check_rule(brain, match_kind, match, target_dict, cred_dict): + """Recursively checks credentials based on the brains rules.""" + try: + new_match_list = brain.rules[match] + except KeyError: + if brain.default_rule and match != brain.default_rule: + new_match_list = ('rule:%s' % brain.default_rule,) + else: + return False - """ + return brain.check(new_match_list, target_dict, cred_dict) - # TODO(termie): do dict inspection via dot syntax - match = match % target_dict - key, value = match.split(':', 1) - if key in cred_dict: - return value == cred_dict[key] - return False +@register("role") +def _check_role(brain, match_kind, match, target_dict, cred_dict): + """Check that there is a matching role in the cred dict.""" + return match.lower() in [x.lower() for x in cred_dict['roles']] -class HttpBrain(Brain): - """A brain that can check external urls for policy. - Posts json blobs for target and credentials. +@register('http') +def _check_http(brain, match_kind, match, target_dict, cred_dict): + """Check http: rules by calling to a remote server. + + This example implementation simply verifies that the response is + exactly 'True'. A custom brain using response codes could easily + be implemented. """ + url = 'http:' + (match % target_dict) + data = {'target': jsonutils.dumps(target_dict), + 'credentials': jsonutils.dumps(cred_dict)} + post_data = urllib.urlencode(data) + f = urllib2.urlopen(url, post_data) + return f.read() == "True" - def _check_http(self, match, target_dict, cred_dict): - """Check http: rules by calling to a remote server. - This example implementation simply verifies that the response is - exactly 'True'. A custom brain using response codes could easily - be implemented. +@register(None) +def _check_generic(brain, match_kind, match, target_dict, cred_dict): + """Check an individual match. - """ - url = match % target_dict - data = {'target': jsonutils.dumps(target_dict), - 'credentials': jsonutils.dumps(cred_dict)} - post_data = urllib.urlencode(data) - f = urllib2.urlopen(url, post_data) - return f.read() == "True" + Matches look like: + + tenant:%(tenant_id)s + role:compute:admin + + """ + + # TODO(termie): do dict inspection via dot syntax + match = match % target_dict + if match_kind in cred_dict: + return match == unicode(cred_dict[match_kind]) + return False diff --git a/cinder/openstack/common/rpc/impl_kombu.py b/cinder/openstack/common/rpc/impl_kombu.py index faf6c1384..01f67defd 100644 --- a/cinder/openstack/common/rpc/impl_kombu.py +++ b/cinder/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from cinder.openstack.common import cfg from cinder.openstack.common.gettextutils import _ from cinder.openstack.common.rpc import amqp as rpc_amqp from cinder.openstack.common.rpc import common as rpc_common +from cinder.openstack.common import network_utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='The RabbitMQ broker address where a single node is used'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='The RabbitMQ broker port where a single node is used'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" def __init__(self, conf, channel, topic, callback, tag, name=None, - **kwargs): + exchange_name=None, **kwargs): """Init a 'topic' queue. :param channel: the amqp channel to use @@ -207,13 +230,15 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=rpc_amqp.get_control_exchange(conf), - type='topic', durable=options['durable'], - auto_delete=options['auto_delete']) + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) + exchange = kombu.entity.Exchange(name=exchange_name, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) super(TopicConsumer, self).__init__(channel, callback, tag, @@ -307,9 +332,12 @@ class TopicPublisher(Publisher): 'auto_delete': False, 'exclusive': False} options.update(kwargs) + exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(channel, - rpc_amqp.get_control_exchange(conf), topic, - type='topic', **options) + exchange_name, + topic, + type='topic', + **options) class FanoutPublisher(Publisher): @@ -332,6 +360,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -344,7 +373,8 @@ class NotifyPublisher(TopicPublisher): exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) queue.declare() @@ -369,31 +399,37 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = network_utils.parse_host_port( + adr, default_port=self.conf.rabbit_port) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + params_list.append(params) + + self.params_list = params_list + + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -423,14 +459,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -438,7 +474,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -451,8 +487,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -465,11 +501,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -484,12 +521,12 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + LOG.error(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -503,9 +540,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -513,7 +550,8 @@ class Connection(object): try: return method(*args, **kwargs) except (self.connection_errors, socket.timeout, IOError), e: - pass + if error_callback: + error_callback(e) except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport @@ -523,8 +561,8 @@ class Connection(object): # and try to reconnect in this case. if 'timeout' not in str(e): raise - if error_callback: - error_callback(e) + if error_callback: + error_callback(e) self.reconnect() def get_channel(self): @@ -626,10 +664,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/cinder/openstack/common/rpc/impl_qpid.py b/cinder/openstack/common/rpc/impl_qpid.py index d9fc8210f..01f8a22c2 100644 --- a/cinder/openstack/common/rpc/impl_qpid.py +++ b/cinder/openstack/common/rpc/impl_qpid.py @@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback, name=None): + def __init__(self, conf, session, topic, callback, name=None, + exchange_name=None): """Init a 'topic' queue. :param session: the amqp session to use @@ -180,9 +181,10 @@ class TopicConsumer(ConsumerBase): :param name: optional queue name, defaults to topic """ + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), - {}, name or topic, {}) + "%s/%s" % (exchange_name, topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -255,8 +257,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ + exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(session, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic)) + "%s/%s" % (exchange_name, topic)) class FanoutPublisher(Publisher): @@ -274,9 +277,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ + exchange_name = rpc_amqp.get_control_exchange(conf) super(NotifyPublisher, self).__init__(session, - "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), - {"durable": True}) + "%s/%s" % (exchange_name, topic), + {"durable": True}) class Connection(object): @@ -461,10 +465,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/cinder/openstack/common/rpc/impl_zmq.py b/cinder/openstack/common/rpc/impl_zmq.py index f3f3b9e90..570431778 100644 --- a/cinder/openstack/common/rpc/impl_zmq.py +++ b/cinder/openstack/common/rpc/impl_zmq.py @@ -58,6 +58,9 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), + cfg.IntOpt('rpc_zmq_port_pub', default=9502, + help='ZeroMQ fanout publisher port'), + cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -206,7 +209,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), + self.outq.send([str(topic), str(msg_id), str('cast'), _serialize(data)]) def close(self): @@ -299,6 +302,9 @@ class ConsumerBase(object): else: return [result] + def consume(self, sock): + raise NotImplementedError() + def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) + self.topic_proxy['fanout~'] = \ + ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['fanout~']) + def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + + # This doesn't change what is in the message, + # it only specifies that these messages go to + # the generic fanout topic. + topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) +class CallbackReactor(ZmqBaseReactor): + """ + A consumer class passing messages to a callback + """ + + def __init__(self, conf, callback): + self._cb = callback + super(CallbackReactor, self).__init__(conf) + + def consume(self, sock): + data = sock.recv() + self._cb(data[3]) + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -488,6 +513,26 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) + def _consume_fanout(self, reactor, topic, proxy, bind=False): + for topic, host in matchmaker.queues("publishers~%s" % (topic, )): + inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) + reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + + def declare_topic_consumer(self, topic, callback=None, + queue_name=None): + """declare_topic_consumer is a private method, but + it is being used by Quantum (Folsom). + This has been added compatibility. + """ + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): + return + + reactor = CallbackReactor(CONF, callback) + self._consume_fanout(reactor, topic, None, bind=False) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -495,22 +540,35 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Subscription scenarios + # Consume direct-push fanout messages (relay to local consumers) if fanout: - subscribe = ('', fanout)[type(fanout) == str] + # If we're not in here, we can't receive direct fanout messages + if CONF.rpc_zmq_host in matchmaker.queues(topic): + # Consume from all remote publishers. + self._consume_fanout(self.reactor, topic, proxy) + else: + LOG.warn("This service cannot receive direct PUSH fanout " + "messages without being known by the matchmaker.") + return + + # Configure consumer for direct pushes. + subscribe = (topic, fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic + + inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) + # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/cinder/openstack/common/rpc/matchmaker.py b/cinder/openstack/common/rpc/matchmaker.py index 2c0aac5bb..ffe4870aa 100644 --- a/cinder/openstack/common/rpc/matchmaker.py +++ b/cinder/openstack/common/rpc/matchmaker.py @@ -132,6 +132,14 @@ class FanoutBinding(Binding): return False +class PublisherBinding(Binding): + """Match on publishers keys, where key starts with 'publishers.' string.""" + def test(self, key): + if key.startswith('publishers~'): + return True + return False + + class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] +class PublisherRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(PublisherRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "publishers~", strip it for lookup. + nkey = key.split('publishers~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + return map(lambda x: (key + '.' + x, x), self.ring[nkey] + + ['localhost']) class LocalhostExchange(Exchange): @@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() + self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) diff --git a/cinder/openstack/common/rpc/service.py b/cinder/openstack/common/rpc/service.py new file mode 100644 index 000000000..a35fd6ad4 --- /dev/null +++ b/cinder/openstack/common/rpc/service.py @@ -0,0 +1,69 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common import log as logging +from cinder.openstack.common import rpc +from cinder.openstack.common import service + + +LOG = logging.getLogger(__name__) + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host.""" + def __init__(self, host, topic, manager=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager]) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) + + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop() diff --git a/cinder/openstack/common/timeutils.py b/cinder/openstack/common/timeutils.py index c4f6cf049..93b34fc5b 100644 --- a/cinder/openstack/common/timeutils.py +++ b/cinder/openstack/common/timeutils.py @@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): def normalize_time(timestamp): - """Normalize time in arbitrary timezone to UTC""" + """Normalize time in arbitrary timezone to UTC naive object""" offset = timestamp.utcoffset() - return timestamp.replace(tzinfo=None) - offset if offset else timestamp + if offset is None: + return timestamp + return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): @@ -121,6 +123,10 @@ def marshall_now(now=None): def unmarshall_time(tyme): """Unmarshall a datetime dict.""" - return datetime.datetime(day=tyme['day'], month=tyme['month'], - year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], - second=tyme['second'], microsecond=tyme['microsecond']) + return datetime.datetime(day=tyme['day'], + month=tyme['month'], + year=tyme['year'], + hour=tyme['hour'], + minute=tyme['minute'], + second=tyme['second'], + microsecond=tyme['microsecond']) diff --git a/cinder/policy.py b/cinder/policy.py index 646530385..1a7e9ccd9 100644 --- a/cinder/policy.py +++ b/cinder/policy.py @@ -17,10 +17,10 @@ """Policy Engine For Cinder""" -from cinder.common import policy from cinder import exception from cinder import flags from cinder.openstack.common import cfg +from cinder.openstack.common import policy from cinder import utils @@ -84,7 +84,5 @@ def enforce(context, action, target): match_list = ('rule:%s' % action,) credentials = context.to_dict() - try: - policy.enforce(match_list, target, credentials) - except policy.NotAuthorized: - raise exception.PolicyNotAuthorized(action=action) + policy.enforce(match_list, target, credentials, + exception.PolicyNotAuthorized, action=action) diff --git a/cinder/tests/test_policy.py b/cinder/tests/test_policy.py index 4cb66f088..3e339f755 100644 --- a/cinder/tests/test_policy.py +++ b/cinder/tests/test_policy.py @@ -21,11 +21,11 @@ import os.path import StringIO import urllib2 -from cinder.common import policy as common_policy from cinder import context from cinder import exception from cinder import flags -import cinder.common.policy +import cinder.openstack.common.policy +from cinder.openstack.common import policy as common_policy from cinder import policy from cinder import test from cinder import utils @@ -169,8 +169,9 @@ class DefaultPolicyTestCase(test.TestCase): self.context = context.RequestContext('fake', 'fake') def _set_brain(self, default_rule): - brain = cinder.common.policy.HttpBrain(self.rules, default_rule) - cinder.common.policy.set_brain(brain) + brain = cinder.openstack.common.policy.HttpBrain(self.rules, + default_rule) + cinder.openstack.common.policy.set_brain(brain) def tearDown(self): super(DefaultPolicyTestCase, self).tearDown() diff --git a/openstack-common.conf b/openstack-common.conf index 091cab342..e31fb60cb 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context +modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy # The base module to hold the copy of openstack.common base=cinder -- 2.45.2