# Based on glance/api/policy.py
"""Policy Engine For Heat"""
-import json
-import os.path
-
from oslo.config import cfg
from heat.common import exception
import heat.openstack.common.log as logging
from heat.openstack.common import policy
-from heat.openstack.common.gettextutils import _
logger = logging.getLogger(__name__)
-policy_opts = [
- cfg.StrOpt('policy_file',
- default='policy.json',
- help=_("Policy file to use")),
- cfg.StrOpt('policy_default_rule',
- default='default',
- help=_("Default Rule of Policy File"))
-]
CONF = cfg.CONF
-CONF.register_opts(policy_opts)
-
DEFAULT_RULES = {
- 'default': policy.TrueCheck(),
+ 'default': policy.FalseCheck(),
}
class Enforcer(object):
"""Responsible for loading and enforcing rules."""
- def __init__(self, scope='heat', exc=exception.Forbidden):
+ def __init__(self, scope='heat', exc=exception.Forbidden,
+ default_rule=DEFAULT_RULES['default']):
self.scope = scope
self.exc = exc
- self.default_rule = CONF.policy_default_rule
- self.policy_path = self._find_policy_file()
- self.policy_file_mtime = None
- self.policy_file_contents = None
+ self.default_rule = default_rule
+ self.enforcer = policy.Enforcer(default_rule=default_rule)
- def set_rules(self, rules):
+ def set_rules(self, rules, overwrite=True):
"""Create a new Rules object based on the provided dict of rules."""
rules_obj = policy.Rules(rules, self.default_rule)
- policy.set_rules(rules_obj)
+ self.enforcer.set_rules(rules_obj, overwrite)
- def load_rules(self):
+ def load_rules(self, force_reload=False):
"""Set the rules found in the json file on disk."""
- if self.policy_path:
- rules = self._read_policy_file()
- rule_type = ""
- else:
- rules = DEFAULT_RULES
- rule_type = "default "
-
- text_rules = dict((k, str(v)) for k, v in rules.items())
-
- self.set_rules(rules)
-
- @staticmethod
- def _find_policy_file():
- """Locate the policy json data file."""
- policy_file = CONF.find_file(CONF.policy_file)
- if policy_file:
- return policy_file
- else:
- logger.warn(_('Unable to find policy file'))
- return None
-
- def _read_policy_file(self):
- """Read contents of the policy file
-
- This re-caches policy data if the file has been changed.
- """
- mtime = os.path.getmtime(self.policy_path)
- if not self.policy_file_contents or mtime != self.policy_file_mtime:
- logger.debug(_("Loading policy from %s") % self.policy_path)
- with open(self.policy_path) as fap:
- raw_contents = fap.read()
- rules_dict = json.loads(raw_contents)
- self.policy_file_contents = dict(
- (k, policy.parse_rule(v))
- for k, v in rules_dict.items())
- self.policy_file_mtime = mtime
- return self.policy_file_contents
-
- def _check(self, context, rule, target, *args, **kwargs):
+ self.enforcer.load_rules(force_reload)
+
+ def _check(self, context, rule, target, exc, *args, **kwargs):
"""Verifies that the action is valid on the target in this context.
:param context: Heat request context
:raises: self.exc (defaults to heat.common.exception.Forbidden)
:returns: A non-False value if access is allowed.
"""
- self.load_rules()
-
+ do_raise = False if not exc else True
credentials = {
'roles': context.roles,
'user': context.username,
'tenant': context.tenant,
}
-
- return policy.check(rule, target, credentials, *args, **kwargs)
+ return self.enforcer.enforce(rule, target, credentials,
+ do_raise, exc=exc, *args, **kwargs)
def enforce(self, context, action, target):
"""Verifies that the action is valid on the target in this context.
:returns: A non-False value if access is allowed.
"""
return self._check(context, action, target)
+
+ def clear(self):
+ self.enforcer.clear()
import abc
import re
import urllib
+import urllib2
+from oslo.config import cfg
import six
-import urllib2
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common import fileutils
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
+policy_opts = [
+ cfg.StrOpt('policy_file',
+ default='policy.json',
+ help=_('JSON file containing policy')),
+ cfg.StrOpt('policy_default_rule',
+ default='default',
+ help=_('Rule enforced when requested rule is not found')),
+]
-LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+CONF.register_opts(policy_opts)
+LOG = logging.getLogger(__name__)
-_rules = None
_checks = {}
+class PolicyNotAuthorized(Exception):
+
+ def __init__(self, rule):
+ msg = _("Policy doesn't allow %s to be performed.") % rule
+ super(PolicyNotAuthorized, self).__init__(msg)
+
+
class Rules(dict):
- """
- A store for rules. Handles the default_rule setting directly.
- """
+ """A store for rules. Handles the default_rule setting directly."""
@classmethod
def load_json(cls, data, default_rule=None):
- """
- Allow loading of JSON rule data.
- """
+ """Allow loading of JSON rule data."""
# Suck in the JSON data and parse the rules
rules = dict((k, parse_rule(v)) for k, v in
def __missing__(self, key):
"""Implements the default rule handling."""
+ if isinstance(self.default_rule, dict):
+ raise KeyError(key)
+
# If the default rule isn't actually defined, do something
# reasonably intelligent
if not self.default_rule or self.default_rule not in self:
raise KeyError(key)
- return self[self.default_rule]
+ if isinstance(self.default_rule, BaseCheck):
+ return self.default_rule
+ elif isinstance(self.default_rule, six.string_types):
+ return self[self.default_rule]
def __str__(self):
"""Dumps a string representation of the rules."""
return jsonutils.dumps(out_rules, indent=4)
-# Really have to figure out a way to deprecate this
-def set_rules(rules):
- """Set the rules in use for policy checks."""
+class Enforcer(object):
+ """Responsible for loading and enforcing rules.
- global _rules
+ :param policy_file: Custom policy file to use, if none is
+ specified, `CONF.policy_file` will be
+ used.
+ :param rules: Default dictionary / Rules to use. It will be
+ considered just in the first instantiation. If
+ `load_rules(True)`, `clear()` or `set_rules(True)`
+ is called this will be overwritten.
+ :param default_rule: Default rule to use, CONF.default_rule will
+ be used if none is specified.
+ """
- _rules = rules
+ def __init__(self, policy_file=None, rules=None, default_rule=None):
+ self.rules = Rules(rules, default_rule)
+ self.default_rule = default_rule or CONF.policy_default_rule
+ self.policy_path = None
+ self.policy_file = policy_file or CONF.policy_file
-# Ditto
-def reset():
- """Clear the rules used for policy checks."""
+ def set_rules(self, rules, overwrite=True):
+ """Create a new Rules object based on the provided dict of rules.
- global _rules
+ :param rules: New rules to use. It should be an instance of dict.
+ :param overwrite: Whether to overwrite current rules or update them
+ with the new rules.
+ """
- _rules = None
+ if not isinstance(rules, dict):
+ raise TypeError(_("Rules must be an instance of dict or Rules, "
+ "got %s instead") % type(rules))
+ if overwrite:
+ self.rules = Rules(rules, self.default_rule)
+ else:
+ self.rules.update(rules)
-def check(rule, target, creds, exc=None, *args, **kwargs):
- """
- Checks authorization of a rule against the target and credentials.
-
- :param rule: The rule to evaluate.
- :param target: As much information about the object being operated
- on as possible, as a dictionary.
- :param creds: As much information about the user performing the
- action as possible, as a dictionary.
- :param exc: Class of the exception to raise if the check fails.
- Any remaining arguments passed to check() (both
- positional and keyword arguments) will be passed to
- the exception class. If exc is not provided, returns
- False.
-
- :return: Returns False if the policy does not allow the action and
- exc is not provided; otherwise, returns a value that
- evaluates to True. Note: for rules using the "case"
- expression, this True value will be the specified string
- from the expression.
- """
+ def clear(self):
+ """Clears Enforcer rules, policy's cache and policy's path."""
+ self.set_rules({})
+ self.default_rule = None
+ self.policy_path = None
- # Allow the rule to be a Check tree
- if isinstance(rule, BaseCheck):
- result = rule(target, creds)
- elif not _rules:
- # No rules to reference means we're going to fail closed
- result = False
- else:
- try:
- # Evaluate the rule
- result = _rules[rule](target, creds)
- except KeyError:
- # If the rule doesn't exist, fail closed
+ def load_rules(self, force_reload=False):
+ """Loads policy_path's rules.
+
+ Policy file is cached and will be reloaded if modified.
+
+ :param force_reload: Whether to overwrite current rules.
+ """
+
+ if not self.policy_path:
+ self.policy_path = self._get_policy_path()
+
+ reloaded, data = fileutils.read_cached_file(self.policy_path,
+ force_reload=force_reload)
+ if reloaded or not self.rules:
+ rules = Rules.load_json(data, self.default_rule)
+ self.set_rules(rules)
+ LOG.debug(_("Rules successfully reloaded"))
+
+ def _get_policy_path(self):
+ """Locate the policy json data file.
+
+ :param policy_file: Custom policy file to locate.
+
+ :returns: The policy path
+
+ :raises: ConfigFilesNotFoundError if the file couldn't
+ be located.
+ """
+ policy_file = CONF.find_file(self.policy_file)
+
+ if policy_file:
+ return policy_file
+
+ raise cfg.ConfigFilesNotFoundError(path=CONF.policy_file)
+
+ def enforce(self, rule, target, creds, do_raise=False,
+ exc=None, *args, **kwargs):
+ """Checks authorization of a rule against the target and credentials.
+
+ :param rule: A string or BaseCheck instance specifying the rule
+ to evaluate.
+ :param target: As much information about the object being operated
+ on as possible, as a dictionary.
+ :param creds: As much information about the user performing the
+ action as possible, as a dictionary.
+ :param do_raise: Whether to raise an exception or not if check
+ fails.
+ :param exc: Class of the exception to raise if the check fails.
+ Any remaining arguments passed to check() (both
+ positional and keyword arguments) will be passed to
+ the exception class. If not specified, PolicyNotAuthorized
+ will be used.
+
+ :return: Returns False if the policy does not allow the action and
+ exc is not provided; otherwise, returns a value that
+ evaluates to True. Note: for rules using the "case"
+ expression, this True value will be the specified string
+ from the expression.
+ """
+
+ # NOTE(flaper87): Not logging target or creds to avoid
+ # potential security issues.
+ LOG.debug(_("Rule %s will be now enforced") % rule)
+
+ self.load_rules()
+
+ # Allow the rule to be a Check tree
+ if isinstance(rule, BaseCheck):
+ result = rule(target, creds, self)
+ elif not self.rules:
+ # No rules to reference means we're going to fail closed
result = False
+ else:
+ try:
+ # Evaluate the rule
+ result = self.rules[rule](target, creds, self)
+ except KeyError:
+ LOG.debug(_("Rule [%s] doesn't exist") % rule)
+ # If the rule doesn't exist, fail closed
+ result = False
- # If it is False, raise the exception if requested
- if exc and result is False:
- raise exc(*args, **kwargs)
+ # If it is False, raise the exception if requested
+ if do_raise and not result:
+ if exc:
+ raise exc(*args, **kwargs)
- return result
+ raise PolicyNotAuthorized(rule)
+
+ return result
class BaseCheck(object):
- """
- Abstract base class for Check classes.
- """
+ """Abstract base class for Check classes."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __str__(self):
- """
- Retrieve a string representation of the Check tree rooted at
- this node.
- """
+ """String representation of the Check tree rooted at this node."""
pass
@abc.abstractmethod
- def __call__(self, target, cred):
- """
- Perform the check. Returns False to reject the access or a
+ def __call__(self, target, cred, enforcer):
+ """Triggers if instance of the class is called.
+
+ Performs the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
class FalseCheck(BaseCheck):
- """
- A policy check that always returns False (disallow).
- """
+ """A policy check that always returns False (disallow)."""
def __str__(self):
"""Return a string representation of this check."""
return "!"
- def __call__(self, target, cred):
+ def __call__(self, target, cred, enforcer):
"""Check the policy."""
return False
class TrueCheck(BaseCheck):
- """
- A policy check that always returns True (allow).
- """
+ """A policy check that always returns True (allow)."""
def __str__(self):
"""Return a string representation of this check."""
return "@"
- def __call__(self, target, cred):
+ def __call__(self, target, cred, enforcer):
"""Check the policy."""
return True
class Check(BaseCheck):
- """
- A base class to allow for user-defined policy checks.
- """
+ """A base class to allow for user-defined policy checks."""
def __init__(self, kind, match):
- """
+ """Initiates Check instance.
+
:param kind: The kind of the check, i.e., the field before the
':'.
:param match: The match of the check, i.e., the field after
class NotCheck(BaseCheck):
- """
+ """Implements the "not" logical operator.
+
A policy check that inverts the result of another policy check.
- Implements the "not" operator.
"""
def __init__(self, rule):
- """
- Initialize the 'not' check.
+ """Initialize the 'not' check.
:param rule: The rule to negate. Must be a Check.
"""
return "not %s" % self.rule
- def __call__(self, target, cred):
- """
- Check the policy. Returns the logical inverse of the wrapped
- check.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Returns the logical inverse of the wrapped check.
"""
- return not self.rule(target, cred)
+ return not self.rule(target, cred, enforcer)
class AndCheck(BaseCheck):
- """
- A policy check that requires that a list of other checks all
- return True. Implements the "and" operator.
+ """Implements the "and" logical operator.
+
+ A policy check that requires that a list of other checks all return True.
"""
def __init__(self, rules):
- """
- Initialize the 'and' check.
+ """Initialize the 'and' check.
:param rules: A list of rules that will be tested.
"""
return "(%s)" % ' and '.join(str(r) for r in self.rules)
- def __call__(self, target, cred):
- """
- Check the policy. Requires that all rules accept in order to
- return True.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that all rules accept in order to return True.
"""
for rule in self.rules:
return True
def add_check(self, rule):
- """
+ """Adds rule to be tested.
+
Allows addition of another rule to the list of rules that will
be tested. Returns the AndCheck object for convenience.
"""
class OrCheck(BaseCheck):
- """
+ """Implements the "or" operator.
+
A policy check that requires that at least one of a list of other
- checks returns True. Implements the "or" operator.
+ checks returns True.
"""
def __init__(self, rules):
- """
- Initialize the 'or' check.
+ """Initialize the 'or' check.
:param rules: A list of rules that will be tested.
"""
return "(%s)" % ' or '.join(str(r) for r in self.rules)
- def __call__(self, target, cred):
- """
- Check the policy. Requires that at least one rule accept in
- order to return True.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that at least one rule accept in order to return True.
"""
for rule in self.rules:
return False
def add_check(self, rule):
- """
+ """Adds rule to be tested.
+
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
def _parse_check(rule):
- """
- Parse a single base check rule into an appropriate Check object.
- """
+ """Parse a single base check rule into an appropriate Check object."""
# Handle the special checks
if rule == '!':
try:
kind, match = rule.split(':', 1)
except Exception:
- LOG.exception(_("Failed to understand rule %(rule)s") % locals())
+ LOG.exception(_("Failed to understand rule %s") % rule)
# If the rule is invalid, we'll fail closed
return FalseCheck()
def _parse_list_rule(rule):
- """
- Provided for backwards compatibility. Translates the old
- list-of-lists syntax into a tree of Check objects.
+ """Translates the old list-of-lists syntax into a tree of Check objects.
+
+ Provided for backwards compatibility.
"""
# Empty rule defaults to True
def _parse_tokenize(rule):
- """
- Tokenizer for the policy language.
+ """Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
class ParseStateMeta(type):
- """
- Metaclass for the ParseState class. Facilitates identifying
- reduction methods.
+ """Metaclass for the ParseState class.
+
+ Facilitates identifying reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
- """
- Create the class. Injects the 'reducers' list, a list of
- tuples matching token sequences to the names of the
- corresponding reduction methods.
+ """Create the class.
+
+ Injects the 'reducers' list, a list of tuples matching token sequences
+ to the names of the corresponding reduction methods.
"""
reducers = []
def reducer(*tokens):
- """
- Decorator for reduction methods. Arguments are a sequence of
- tokens, in order, which should trigger running this reduction
- method.
+ """Decorator for reduction methods.
+
+ Arguments are a sequence of tokens, in order, which should trigger running
+ this reduction method.
"""
def decorator(func):
class ParseState(object):
- """
- Implement the core of parsing the policy language. Uses a greedy
- reduction algorithm to reduce a sequence of tokens into a single
- terminal, the value of which will be the root of the Check tree.
+ """Implement the core of parsing the policy language.
+
+ Uses a greedy reduction algorithm to reduce a sequence of tokens into
+ a single terminal, the value of which will be the root of the Check tree.
Note: error reporting is rather lacking. The best we can get with
this parser formulation is an overall "parse failed" error.
self.values = []
def reduce(self):
- """
- Perform a greedy reduction of the token stream. If a reducer
- method matches, it will be executed, then the reduce() method
- will be called recursively to search for any more possible
- reductions.
+ """Perform a greedy reduction of the token stream.
+
+ If a reducer method matches, it will be executed, then the
+ reduce() method will be called recursively to search for any more
+ possible reductions.
"""
for reduction, methname in self.reducers:
@property
def result(self):
- """
- Obtain the final result of the parse. Raises ValueError if
- the parse failed to reduce to a single result.
+ """Obtain the final result of the parse.
+
+ Raises ValueError if the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
- """
- Create an 'and_expr' from two checks joined by the 'and'
- operator.
+ """Create an 'and_expr'.
+
+ Join two checks by the 'and' operator.
"""
return [('and_expr', AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
- """
- Extend an 'and_expr' by adding one more check.
- """
+ """Extend an 'and_expr' by adding one more check."""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
- """
- Create an 'or_expr' from two checks joined by the 'or'
- operator.
+ """Create an 'or_expr'.
+
+ Join two checks by the 'or' operator.
"""
return [('or_expr', OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
- """
- Extend an 'or_expr' by adding one more check.
- """
+ """Extend an 'or_expr' by adding one more check."""
return [('or_expr', or_expr.add_check(check))]
def _parse_text_rule(rule):
- """
+ """Parses policy to the tree.
+
Translates a policy written in the policy language into a tree of
Check objects.
"""
return state.result
except ValueError:
# Couldn't parse the rule
- LOG.exception(_("Failed to understand rule %(rule)r") % locals())
+ LOG.exception(_("Failed to understand rule %r") % rule)
# Fail closed
return FalseCheck()
def parse_rule(rule):
- """
- Parses a policy rule into a tree of Check objects.
- """
+ """Parses a policy rule into a tree of Check objects."""
# If the rule is a string, it's in the policy language
if isinstance(rule, basestring):
def register(name, func=None):
- """
- Register a function or Check class as a policy check.
+ """Register a function or Check class as a policy check.
:param name: Gives the name of the check type, e.g., 'rule',
'role', etc. If name is None, a default check type
@register("rule")
class RuleCheck(Check):
- def __call__(self, target, creds):
- """
- Recursively checks credentials based on the defined rules.
- """
+ def __call__(self, target, creds, enforcer):
+ """Recursively checks credentials based on the defined rules."""
try:
- return _rules[self.match](target, creds)
+ return enforcer.rules[self.match](target, creds, enforcer)
except KeyError:
# We don't have any matching rule; fail closed
return False
@register("role")
class RoleCheck(Check):
- def __call__(self, target, creds):
+ def __call__(self, target, creds, enforcer):
"""Check that there is a matching role in the cred dict."""
return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http')
class HttpCheck(Check):
- def __call__(self, target, creds):
- """
- Check http: rules by calling to a remote server.
+ def __call__(self, target, creds, enforcer):
+ """Check http: rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly 'True'.
@register(None)
class GenericCheck(Check):
- def __call__(self, target, creds):
- """
- Check an individual match.
+ def __call__(self, target, creds, enforcer):
+ """Check an individual match.
Matches look like:
from heat.common import policy
from heat.common import exception
+from heat.openstack.common import policy as base_policy
from heat.tests.common import HeatTestCase
from heat.tests import utils
cfg.CONF.register_opts(opts)
def test_policy_cfn_default(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
enforcer = policy.Enforcer(scope='cloudformation')
ctx = utils.dummy_context(roles=[])
def test_policy_cfn_notallowed(self):
pf = policy_path + 'notallowed.json'
- self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
- policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
self.m.ReplayAll()
enforcer = policy.Enforcer(scope='cloudformation')
def test_policy_cfn_deny_stack_user(self):
pf = policy_path + 'deny_stack_user.json'
- self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
- policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
self.m.ReplayAll()
enforcer = policy.Enforcer(scope='cloudformation')
def test_policy_cfn_allow_non_stack_user(self):
pf = policy_path + 'deny_stack_user.json'
- self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
- policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
self.m.ReplayAll()
enforcer = policy.Enforcer(scope='cloudformation')
def test_policy_cw_deny_stack_user(self):
pf = policy_path + 'deny_stack_user.json'
- self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
- policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
self.m.ReplayAll()
enforcer = policy.Enforcer(scope='cloudwatch')
def test_policy_cw_allow_non_stack_user(self):
pf = policy_path + 'deny_stack_user.json'
- self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
- policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
self.m.ReplayAll()
enforcer = policy.Enforcer(scope='cloudwatch')
# Everything should be allowed
enforcer.enforce(ctx, action, {})
self.m.VerifyAll()
+
+ def test_clear(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer()
+ enforcer.load_rules(force_reload=True)
+ enforcer.clear()
+ self.assertEqual(enforcer.enforcer.rules, {})
+ self.m.VerifyAll()
+
+ def test_set_rules_overwrite_true(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer()
+ enforcer.load_rules(True)
+ enforcer.set_rules({'test_heat_rule': 1}, True)
+ self.assertEqual(enforcer.enforcer.rules, {'test_heat_rule': 1})
+
+ def test_set_rules_overwrite_false(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer()
+ enforcer.load_rules(True)
+ enforcer.set_rules({'test_heat_rule': 1}, False)
+ self.assertIn('test_heat_rule', enforcer.enforcer.rules)
+
+ def test_load_rules_force_reload_true(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer()
+ enforcer.set_rules({'test_heat_rule': 'test'})
+ enforcer.load_rules(True)
+ self.assertNotIn({'test_heat_rule': 'test'}, enforcer.enforcer.rules)
+
+ def test_load_rules_force_reload_false(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer()
+ enforcer.load_rules(True)
+ enforcer.set_rules({'test_heat_rule': 'test'})
+ enforcer.load_rules(False)
+ self.assertIn('test_heat_rule', enforcer.enforcer.rules)
+
+ def test_default_rule(self):
+ pf = policy_path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(base_policy.Enforcer, '_get_policy_path')
+ base_policy.Enforcer._get_policy_path().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ ctx = utils.dummy_context(roles=['not_a_stack_user'])
+ default_rule = base_policy.FalseCheck()
+ enforcer = policy.Enforcer(scope='cloudformation',
+ exc=None, default_rule=default_rule)
+ action = 'no_such_action'
+ self.assertEqual(enforcer.enforce(ctx, action, {}), False)
+ self.m.VerifyAll()