--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+ """
+ Interpret a string as a host:port pair.
+ An IPv6 address MUST be escaped if accompanied by a port,
+ because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+ means both [2001:db8:85a3::8a2e:370:7334] and
+ [2001:db8:85a3::8a2e:370]:7334.
+
+ >>> parse_host_port('server01:80')
+ ('server01', 80)
+ >>> parse_host_port('server01')
+ ('server01', None)
+ >>> parse_host_port('server01', default_port=1234)
+ ('server01', 1234)
+ >>> parse_host_port('[::1]:80')
+ ('::1', 80)
+ >>> parse_host_port('[::1]')
+ ('::1', None)
+ >>> parse_host_port('[::1]', default_port=1234)
+ ('::1', 1234)
+ >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+ ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+ """
+ if address[0] == '[':
+ # Escaped ipv6
+ _host, _port = address[1:].split(']')
+ host = _host
+ if ':' in _port:
+ port = _port.split(':')[1]
+ else:
+ port = default_port
+ else:
+ if address.count(':') == 1:
+ host, port = address.split(':')
+ else:
+ # 0 means ipv4, >1 means ipv6.
+ # We prohibit unescaped ipv6 addresses with port.
+ host = address
+ port = default_port
+
+ return (host, None if port is None else int(port))
"""Common Policy Engine Implementation"""
+import logging
import urllib
import urllib2
+from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import jsonutils
-class NotAuthorized(Exception):
- pass
+LOG = logging.getLogger(__name__)
_BRAIN = None
_BRAIN = None
-def enforce(match_list, target_dict, credentials_dict):
+def enforce(match_list, target_dict, credentials_dict, exc=None,
+ *args, **kwargs):
"""Enforces authorization of some rules against credentials.
:param match_list: nested tuples of data to match against
Credentials dicts contain as much information as we can about the user
performing the action.
- :raises NotAuthorized: if the check fails
+ :param exc: exception to raise
+
+ Class of the exception to raise if the check fails. Any remaining
+ arguments passed to enforce() (both positional and keyword arguments)
+ will be passed to the exception class. If exc is not provided, returns
+ False.
+ :return: True if the policy allows the action
+ :return: False if the policy does not allow the action and exc is not set
"""
global _BRAIN
if not _BRAIN:
_BRAIN = Brain()
if not _BRAIN.check(match_list, target_dict, credentials_dict):
- raise NotAuthorized()
+ if exc:
+ raise exc(*args, **kwargs)
+ return False
+ return True
class Brain(object):
"""Implements policy checking."""
+
+ _checks = {}
+
+ @classmethod
+ def _register(cls, name, func):
+ cls._checks[name] = func
+
@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None):
+ if self.__class__ != Brain:
+ LOG.warning(_("Inheritance-based rules are deprecated; use "
+ "the default brain instead of %s.") %
+ self.__class__.__name__)
+
self.rules = rules or {}
self.default_rule = default_rule
self.rules[key] = match
def _check(self, match, target_dict, cred_dict):
- match_kind, match_value = match.split(':', 1)
try:
- f = getattr(self, '_check_%s' % match_kind)
+ match_kind, match_value = match.split(':', 1)
+ except Exception:
+ LOG.exception(_("Failed to understand rule %(match)r") % locals())
+ # If the rule is invalid, fail closed
+ return False
+
+ func = None
+ try:
+ old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError:
- if not self._check_generic(match, target_dict, cred_dict):
- return False
+ func = self._checks.get(match_kind, self._checks.get(None, None))
else:
- if not f(match_value, target_dict, cred_dict):
- return False
- return True
+ LOG.warning(_("Inheritance-based rules are deprecated; update "
+ "_check_%s") % match_kind)
+ func = lambda brain, kind, value, target, cred: old_func(value,
+ target,
+ cred)
+
+ if not func:
+ LOG.error(_("No handler for matches of kind %s") % match_kind)
+ # Fail closed
+ return False
+
+ return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials.
return True
return False
- def _check_rule(self, match, target_dict, cred_dict):
- """Recursively checks credentials based on the brains rules."""
- try:
- new_match_list = self.rules[match]
- except KeyError:
- if self.default_rule and match != self.default_rule:
- new_match_list = ('rule:%s' % self.default_rule,)
- else:
- return False
- return self.check(new_match_list, target_dict, cred_dict)
+class HttpBrain(Brain):
+ """A brain that can check external urls for policy.
+
+ Posts json blobs for target and credentials.
+
+ Note that this brain is deprecated; the http check is registered
+ by default.
+ """
- def _check_role(self, match, target_dict, cred_dict):
- """Check that there is a matching role in the cred dict."""
- return match.lower() in [x.lower() for x in cred_dict['roles']]
+ pass
- def _check_generic(self, match, target_dict, cred_dict):
- """Check an individual match.
- Matches look like:
+def register(name, func=None):
+ """
+ Register a function as a policy check.
+
+ :param name: Gives the name of the check type, e.g., 'rule',
+ 'role', etc. If name is None, a default function
+ will be registered.
+ :param func: If given, provides the function to register. If not
+ given, returns a function taking one argument to
+ specify the function to register, allowing use as a
+ decorator.
+ """
- tenant:%(tenant_id)s
- role:compute:admin
+ # Perform the actual decoration by registering the function.
+ # Returns the function for compliance with the decorator
+ # interface.
+ def decorator(func):
+ # Register the function
+ Brain._register(name, func)
+ return func
+
+ # If the function is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+def _check_rule(brain, match_kind, match, target_dict, cred_dict):
+ """Recursively checks credentials based on the brains rules."""
+ try:
+ new_match_list = brain.rules[match]
+ except KeyError:
+ if brain.default_rule and match != brain.default_rule:
+ new_match_list = ('rule:%s' % brain.default_rule,)
+ else:
+ return False
- """
+ return brain.check(new_match_list, target_dict, cred_dict)
- # TODO(termie): do dict inspection via dot syntax
- match = match % target_dict
- key, value = match.split(':', 1)
- if key in cred_dict:
- return value == cred_dict[key]
- return False
+@register("role")
+def _check_role(brain, match_kind, match, target_dict, cred_dict):
+ """Check that there is a matching role in the cred dict."""
+ return match.lower() in [x.lower() for x in cred_dict['roles']]
-class HttpBrain(Brain):
- """A brain that can check external urls for policy.
- Posts json blobs for target and credentials.
+@register('http')
+def _check_http(brain, match_kind, match, target_dict, cred_dict):
+ """Check http: rules by calling to a remote server.
+
+ This example implementation simply verifies that the response is
+ exactly 'True'. A custom brain using response codes could easily
+ be implemented.
"""
+ url = 'http:' + (match % target_dict)
+ data = {'target': jsonutils.dumps(target_dict),
+ 'credentials': jsonutils.dumps(cred_dict)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
- def _check_http(self, match, target_dict, cred_dict):
- """Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response is
- exactly 'True'. A custom brain using response codes could easily
- be implemented.
+@register(None)
+def _check_generic(brain, match_kind, match, target_dict, cred_dict):
+ """Check an individual match.
- """
- url = match % target_dict
- data = {'target': jsonutils.dumps(target_dict),
- 'credentials': jsonutils.dumps(cred_dict)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
- return f.read() == "True"
+ Matches look like:
+
+ tenant:%(tenant_id)s
+ role:compute:admin
+
+ """
+
+ # TODO(termie): do dict inspection via dot syntax
+ match = match % target_dict
+ if match_kind in cred_dict:
+ return match == unicode(cred_dict[match_kind])
+ return False
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
+from cinder.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
- help='the RabbitMQ host'),
+ help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port',
default=5672,
- help='the RabbitMQ port'),
+ help='The RabbitMQ broker port where a single node is used'),
+ cfg.ListOpt('rabbit_hosts',
+ default=['$rabbit_host:$rabbit_port'],
+ help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
+ cfg.BoolOpt('rabbit_ha_queues',
+ default=False,
+ help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+ 'You need to wipe RabbitMQ database when '
+ 'changing this option.'),
]
LOG = rpc_common.LOG
+def _get_queue_arguments(conf):
+ """Construct the arguments for declaring a queue.
+
+ If the rabbit_ha_queues option is set, we declare a mirrored queue
+ as described here:
+
+ http://www.rabbitmq.com/ha.html
+
+ Setting x-ha-policy to all means that the queue will be mirrored
+ to all nodes in the cluster.
+ """
+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
class ConsumerBase(object):
"""Consumer base class."""
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
- **kwargs):
+ exchange_name=None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=rpc_amqp.get_control_exchange(conf),
- type='topic', durable=options['durable'],
- auto_delete=options['auto_delete'])
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+ exchange = kombu.entity.Exchange(name=exchange_name,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel,
- rpc_amqp.get_control_exchange(conf), topic,
- type='topic', **options)
+ exchange_name,
+ topic,
+ type='topic',
+ **options)
class FanoutPublisher(Publisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
- routing_key=self.routing_key)
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments)
queue.declare()
if server_params is None:
server_params = {}
-
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
- params = {}
- for sp_key, value in server_params.iteritems():
- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
- params[p_key] = value
+ ssl_params = self._fetch_ssl_params()
+ params_list = []
+ for adr in self.conf.rabbit_hosts:
+ hostname, port = network_utils.parse_host_port(
+ adr, default_port=self.conf.rabbit_port)
- params.setdefault('hostname', self.conf.rabbit_host)
- params.setdefault('port', self.conf.rabbit_port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+ params = {}
- self.params = params
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
- if self.conf.fake_rabbit:
- self.params['transport'] = 'memory'
- self.memory_transport = True
- else:
- self.memory_transport = False
+ params.setdefault('hostname', hostname)
+ params.setdefault('port', port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+ if self.conf.fake_rabbit:
+ params['transport'] = 'memory'
+ if self.conf.rabbit_use_ssl:
+ params['ssl'] = ssl_params
- if self.conf.rabbit_use_ssl:
- self.params['ssl'] = self._fetch_ssl_params()
+ params_list.append(params)
+
+ self.params_list = params_list
+
+ self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.reconnect()
# Return the extended behavior
return ssl_params
- def _connect(self):
+ def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % params)
try:
self.connection.close()
except self.connection_errors:
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(**self.params)
+ self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
- self.params)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+ params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
attempt = 0
while True:
+ params = self.params_list[attempt % len(self.params_list)]
attempt += 1
try:
- self._connect()
+ self._connect(params)
return
- except (self.connection_errors, IOError), e:
+ except (IOError, self.connection_errors) as e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
- log_info.update(self.params)
+ log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ LOG.error(_('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
- LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+ 'unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e:
- pass
+ if error_callback:
+ error_callback(e)
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
- if error_callback:
- error_callback(e)
+ if error_callback:
+ error_callback(e)
self.reconnect()
def get_channel(self):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
:param name: optional queue name, defaults to topic
"""
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
- {}, name or topic, {})
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
+ "%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
+ exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
- {"durable": True})
+ "%s/%s" % (exchange_name, topic),
+ {"durable": True})
class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
+ cfg.IntOpt('rpc_zmq_port_pub', default=9502,
+ help='ZeroMQ fanout publisher port'),
+
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
+ self.outq.send([str(topic), str(msg_id), str('cast'),
_serialize(data)])
def close(self):
else:
return [result]
+ def consume(self, sock):
+ raise NotImplementedError()
+
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
+ self.topic_proxy['fanout~'] = \
+ ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['fanout~'])
+
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
+
+ # This doesn't change what is in the message,
+ # it only specifies that these messages go to
+ # the generic fanout topic.
+ topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+class CallbackReactor(ZmqBaseReactor):
+ """
+ A consumer class passing messages to a callback
+ """
+
+ def __init__(self, conf, callback):
+ self._cb = callback
+ super(CallbackReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ data = sock.recv()
+ self._cb(data[3])
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
+ def _consume_fanout(self, reactor, topic, proxy, bind=False):
+ for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
+ inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
+ reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+
+ def declare_topic_consumer(self, topic, callback=None,
+ queue_name=None):
+ """declare_topic_consumer is a private method, but
+ it is being used by Quantum (Folsom).
+ This has been added compatibility.
+ """
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
+ return
+
+ reactor = CallbackReactor(CONF, callback)
+ self._consume_fanout(reactor, topic, None, bind=False)
+
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
- # Subscription scenarios
+ # Consume direct-push fanout messages (relay to local consumers)
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
+ # If we're not in here, we can't receive direct fanout messages
+ if CONF.rpc_zmq_host in matchmaker.queues(topic):
+ # Consume from all remote publishers.
+ self._consume_fanout(self.reactor, topic, proxy)
+ else:
+ LOG.warn("This service cannot receive direct PUSH fanout "
+ "messages without being known by the matchmaker.")
+ return
+
+ # Configure consumer for direct pushes.
+ subscribe = (topic, fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
+
+ inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
+ # Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
return False
+class PublisherBinding(Binding):
+ """Match on publishers keys, where key starts with 'publishers.' string."""
+ def test(self, key):
+ if key.startswith('publishers~'):
+ return True
+ return False
+
+
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key + '.' + host, host)]
+class PublisherRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(PublisherRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "publishers~", strip it for lookup.
+ nkey = key.split('publishers~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+ ['localhost'])
class LocalhostExchange(Exchange):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
+ self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import rpc
+from cinder.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+ """Service object for binaries running on hosts.
+
+ A service enables rpc by listening to queues based on topic and host."""
+ def __init__(self, host, topic, manager=None):
+ super(Service, self).__init__()
+ self.host = host
+ self.topic = topic
+ if manager is None:
+ self.manager = self
+ else:
+ self.manager = manager
+
+ def start(self):
+ super(Service, self).start()
+
+ self.conn = rpc.create_connection(new=True)
+ LOG.debug(_("Creating Consumer connection for Service %s") %
+ self.topic)
+
+ rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
+
+ # Share this same connection for these Consumers
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
+
+ node_topic = '%s.%s' % (self.topic, self.host)
+ self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
+
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
+
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
+
+ def stop(self):
+ # Try to shut the connection down, but if we get any sort of
+ # errors, go ahead and ignore them.. as we're shutting down anyway
+ try:
+ self.conn.close()
+ except Exception:
+ pass
+ super(Service, self).stop()
def normalize_time(timestamp):
- """Normalize time in arbitrary timezone to UTC"""
+ """Normalize time in arbitrary timezone to UTC naive object"""
offset = timestamp.utcoffset()
- return timestamp.replace(tzinfo=None) - offset if offset else timestamp
+ if offset is None:
+ return timestamp
+ return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
- return datetime.datetime(day=tyme['day'], month=tyme['month'],
- year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
- second=tyme['second'], microsecond=tyme['microsecond'])
+ return datetime.datetime(day=tyme['day'],
+ month=tyme['month'],
+ year=tyme['year'],
+ hour=tyme['hour'],
+ minute=tyme['minute'],
+ second=tyme['second'],
+ microsecond=tyme['microsecond'])
"""Policy Engine For Cinder"""
-from cinder.common import policy
from cinder import exception
from cinder import flags
from cinder.openstack.common import cfg
+from cinder.openstack.common import policy
from cinder import utils
match_list = ('rule:%s' % action,)
credentials = context.to_dict()
- try:
- policy.enforce(match_list, target, credentials)
- except policy.NotAuthorized:
- raise exception.PolicyNotAuthorized(action=action)
+ policy.enforce(match_list, target, credentials,
+ exception.PolicyNotAuthorized, action=action)
import StringIO
import urllib2
-from cinder.common import policy as common_policy
from cinder import context
from cinder import exception
from cinder import flags
-import cinder.common.policy
+import cinder.openstack.common.policy
+from cinder.openstack.common import policy as common_policy
from cinder import policy
from cinder import test
from cinder import utils
self.context = context.RequestContext('fake', 'fake')
def _set_brain(self, default_rule):
- brain = cinder.common.policy.HttpBrain(self.rules, default_rule)
- cinder.common.policy.set_brain(brain)
+ brain = cinder.openstack.common.policy.HttpBrain(self.rules,
+ default_rule)
+ cinder.openstack.common.policy.set_brain(brain)
def tearDown(self):
super(DefaultPolicyTestCase, self).tearDown()
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context
+modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy
# The base module to hold the copy of openstack.common
base=cinder