class Brain(object):
"""Implements policy checking."""
+
+ _checks = {}
+
+ @classmethod
+ def _register(cls, name, func):
+ cls._checks[name] = func
+
@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None):
+ if self.__class__ != Brain:
+ LOG.warning(_("Inheritance-based rules are deprecated; use "
+ "the default brain instead of %s.") %
+ self.__class__.__name__)
+
self.rules = rules or {}
self.default_rule = default_rule
LOG.exception(_("Failed to understand rule %(match)r") % locals())
# If the rule is invalid, fail closed
return False
+
+ func = None
try:
- f = getattr(self, '_check_%s' % match_kind)
+ old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError:
- if not self._check_generic(match, target_dict, cred_dict):
- return False
+ func = self._checks.get(match_kind, self._checks.get(None, None))
else:
- if not f(match_value, target_dict, cred_dict):
- return False
- return True
+ LOG.warning(_("Inheritance-based rules are deprecated; update "
+ "_check_%s") % match_kind)
+ func = (lambda brain, kind, value, target, cred:
+ old_func(value, target, cred))
+
+ if not func:
+ LOG.error(_("No handler for matches of kind %s") % match_kind)
+ # Fail closed
+ return False
+
+ return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials.
return True
return False
- def _check_rule(self, match, target_dict, cred_dict):
- """Recursively checks credentials based on the brains rules."""
- try:
- new_match_list = self.rules[match]
- except KeyError:
- if self.default_rule and match != self.default_rule:
- new_match_list = ('rule:%s' % self.default_rule,)
- else:
- return False
- return self.check(new_match_list, target_dict, cred_dict)
+class HttpBrain(Brain):
+ """A brain that can check external urls for policy.
- def _check_role(self, match, target_dict, cred_dict):
- """Check that there is a matching role in the cred dict."""
- return match.lower() in [x.lower() for x in cred_dict['roles']]
+ Posts json blobs for target and credentials.
- def _check_generic(self, match, target_dict, cred_dict):
- """Check an individual match.
+ Note that this brain is deprecated; the http check is registered
+ by default.
+ """
- Matches look like:
+ pass
- tenant:%(tenant_id)s
- role:compute:admin
- """
+def register(name, func=None):
+ """
+ Register a function as a policy check.
+
+ :param name: Gives the name of the check type, e.g., 'rule',
+ 'role', etc. If name is None, a default function
+ will be registered.
+ :param func: If given, provides the function to register. If not
+ given, returns a function taking one argument to
+ specify the function to register, allowing use as a
+ decorator.
+ """
- # TODO(termie): do dict inspection via dot syntax
- match = match % target_dict
- key, value = match.split(':', 1)
- if key in cred_dict:
- return value == cred_dict[key]
- return False
+ # Perform the actual decoration by registering the function.
+ # Returns the function for compliance with the decorator
+ # interface.
+ def decorator(func):
+ # Register the function
+ Brain._register(name, func)
+ return func
+
+ # If the function is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+def _check_rule(brain, match_kind, match, target_dict, cred_dict):
+ """Recursively checks credentials based on the brains rules."""
+ try:
+ new_match_list = brain.rules[match]
+ except KeyError:
+ if brain.default_rule and match != brain.default_rule:
+ new_match_list = ('rule:%s' % brain.default_rule,)
+ else:
+ return False
+ return brain.check(new_match_list, target_dict, cred_dict)
-class HttpBrain(Brain):
- """A brain that can check external urls for policy.
- Posts json blobs for target and credentials.
+@register("role")
+def _check_role(brain, match_kind, match, target_dict, cred_dict):
+ """Check that there is a matching role in the cred dict."""
+ return match.lower() in [x.lower() for x in cred_dict['roles']]
+
+
+@register('http')
+def _check_http(brain, match_kind, match, target_dict, cred_dict):
+ """Check http: rules by calling to a remote server.
+
+ This example implementation simply verifies that the response is
+ exactly 'True'. A custom brain using response codes could easily
+ be implemented.
"""
+ url = 'http:' + (match % target_dict)
+ data = {'target': jsonutils.dumps(target_dict),
+ 'credentials': jsonutils.dumps(cred_dict)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
- def _check_http(self, match, target_dict, cred_dict):
- """Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response is
- exactly 'True'. A custom brain using response codes could easily
- be implemented.
+@register(None)
+def _check_generic(brain, match_kind, match, target_dict, cred_dict):
+ """Check an individual match.
- """
- url = match % target_dict
- data = {'target': jsonutils.dumps(target_dict),
- 'credentials': jsonutils.dumps(cred_dict)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
- return f.read() == "True"
+ Matches look like:
+
+ tenant:%(tenant_id)s
+ role:compute:admin
+
+ """
+
+ # TODO(termie): do dict inspection via dot syntax
+ match = match % target_dict
+ if match_kind in cred_dict:
+ return match == cred_dict[match_kind]
+ return False