+++ /dev/null
-# -*- coding: utf-8 -*-
-# Copyright (c) 2012 OpenStack Foundation.
-# All Rights Reserved.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-Common Policy Engine Implementation
-Policies can be expressed in one of two forms: A list of lists, or a
-string written in the new policy language.
-In the list-of-lists representation, each check inside the innermost
-list is combined as with an "and" conjunction--for that check to pass,
-all the specified checks must pass. These innermost lists are then
-combined as with an "or" conjunction. As an example, take the following
-rule, expressed in the list-of-lists representation::
- [["role:admin"], ["project_id:%(project_id)s", "role:projectadmin"]]
-This is the original way of expressing policies, but there now exists a
-new way: the policy language.
-In the policy language, each check is specified the same way as in the
-list-of-lists representation: a simple "a:b" pair that is matched to
-the correct class to perform that check::
- +===========================================================================+
- +===========================================================================+
- |User's Role | role:admin |
- +---------------------------------------------------------------------------+
- |Rules already defined on policy | rule:admin_required |
- +---------------------------------------------------------------------------+
- |Against URL's¹ | http://my-url.org/check |
- +---------------------------------------------------------------------------+
- |User attributes² | project_id:%(target.project.id)s |
- +---------------------------------------------------------------------------+
- |Strings | <variable>:'xpto2035abc' |
- | | 'myproject':<variable> |
- +---------------------------------------------------------------------------+
- | | project_id:xpto2035abc |
- |Literals | domain_id:20 |
- | | True:%(user.enabled)s |
- +===========================================================================+
-¹URL checking must return 'True' to be valid
-²User attributes (obtained through the token): user_id, domain_id or project_id
-Conjunction operators are available, allowing for more expressiveness
-in crafting policies. So, in the policy language, the previous check in
-list-of-lists becomes::
- role:admin or (project_id:%(project_id)s and role:projectadmin)
-The policy language also has the "not" operator, allowing a richer
-policy rule::
- project_id:%(project_id)s and not role:dunce
-Attributes sent along with API calls can be used by the policy engine
-(on the right side of the expression), by using the following syntax::
- <some_value>:%(user.id)s
-Contextual attributes of objects identified by their IDs are loaded
-from the database. They are also available to the policy engine and
-can be checked through the `target` keyword::
- <some_value>:%(target.role.name)s
-Finally, two special policy checks should be mentioned; the policy
-check "@" will always accept an access, and the policy check "!" will
-always reject an access. (Note that if a rule is either the empty
-list ("[]") or the empty string, this is equivalent to the "@" policy
-check.) Of these, the "!" policy check is probably the most useful,
-as it allows particular rules to be explicitly disabled.
-import abc
-import ast
-import copy
-import logging
-import os
-import re
-from oslo_config import cfg
-from oslo_serialization import jsonutils
-import six
-import six.moves.urllib.parse as urlparse
-import six.moves.urllib.request as urlrequest
-from neutron.openstack.common import fileutils
-from neutron.openstack.common._i18n import _, _LE
-policy_opts = [
- cfg.StrOpt('policy_file',
- default='policy.json',
- help=_('The JSON file that defines policies.')),
- cfg.StrOpt('policy_default_rule',
- default='default',
- help=_('Default rule. Enforced when a requested rule is not '
- 'found.')),
- cfg.MultiStrOpt('policy_dirs',
- default=['policy.d'],
- help=_('Directories where policy configuration files are '
- 'stored. They can be relative to any directory '
- 'in the search path defined by the config_dir '
- 'option, or absolute paths. The file defined by '
- 'policy_file must exist for these directories to '
- 'be searched. Missing or empty directories are '
- 'ignored.')),
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-_checks = {}
-def list_opts():
- """Entry point for oslo-config-generator."""
- return [(None, copy.deepcopy(policy_opts))]
-class PolicyNotAuthorized(Exception):
- def __init__(self, rule):
- msg = _("Policy doesn't allow %s to be performed.") % rule
- super(PolicyNotAuthorized, self).__init__(msg)
-class Rules(dict):
- """A store for rules. Handles the default_rule setting directly."""
- @classmethod
- def load_json(cls, data, default_rule=None):
- """Allow loading of JSON rule data."""
- # Suck in the JSON data and parse the rules
- rules = dict((k, parse_rule(v)) for k, v in
- jsonutils.loads(data).items())
- return cls(rules, default_rule)
- def __init__(self, rules=None, default_rule=None):
- """Initialize the Rules store."""
- super(Rules, self).__init__(rules or {})
- self.default_rule = default_rule
- def __missing__(self, key):
- """Implements the default rule handling."""
- if isinstance(self.default_rule, dict):
- raise KeyError(key)
- # If the default rule isn't actually defined, do something
- # reasonably intelligent
- if not self.default_rule:
- raise KeyError(key)
- if isinstance(self.default_rule, BaseCheck):
- return self.default_rule
- # We need to check this or we can get infinite recursion
- if self.default_rule not in self:
- raise KeyError(key)
- elif isinstance(self.default_rule, six.string_types):
- return self[self.default_rule]
- def __str__(self):
- """Dumps a string representation of the rules."""
- # Start by building the canonical strings for the rules
- out_rules = {}
- for key, value in self.items():
- # Use empty string for singleton TrueCheck instances
- if isinstance(value, TrueCheck):
- out_rules[key] = ''
- else:
- out_rules[key] = str(value)
- # Dump a pretty-printed JSON representation
- return jsonutils.dumps(out_rules, indent=4)
-class Enforcer(object):
- """Responsible for loading and enforcing rules.
- :param policy_file: Custom policy file to use, if none is
- specified, `CONF.policy_file` will be
- used.
- :param rules: Default dictionary / Rules to use. It will be
- considered just in the first instantiation. If
- `load_rules(True)`, `clear()` or `set_rules(True)`
- is called this will be overwritten.
- :param default_rule: Default rule to use, CONF.default_rule will
- be used if none is specified.
- :param use_conf: Whether to load rules from cache or config file.
- :param overwrite: Whether to overwrite existing rules when reload rules
- from config file.
- """
- def __init__(self, policy_file=None, rules=None,
- default_rule=None, use_conf=True, overwrite=True):
- self.default_rule = default_rule or CONF.policy_default_rule
- self.rules = Rules(rules, self.default_rule)
- self.policy_path = None
- self.policy_file = policy_file or CONF.policy_file
- self.use_conf = use_conf
- self.overwrite = overwrite
- def set_rules(self, rules, overwrite=True, use_conf=False):
- """Create a new Rules object based on the provided dict of rules.
- :param rules: New rules to use. It should be an instance of dict.
- :param overwrite: Whether to overwrite current rules or update them
- with the new rules.
- :param use_conf: Whether to reload rules from cache or config file.
- """
- if not isinstance(rules, dict):
- raise TypeError(_("Rules must be an instance of dict or Rules, "
- "got %s instead") % type(rules))
- self.use_conf = use_conf
- if overwrite:
- self.rules = Rules(rules, self.default_rule)
- else:
- self.rules.update(rules)
- def clear(self):
- """Clears Enforcer rules, policy's cache and policy's path."""
- self.set_rules({})
- fileutils.delete_cached_file(self.policy_path)
- self.default_rule = None
- self.policy_path = None
- def load_rules(self, force_reload=False):
- """Loads policy_path's rules.
- Policy file is cached and will be reloaded if modified.
- :param force_reload: Whether to reload rules from config file.
- """
- if force_reload:
- self.use_conf = force_reload
- if self.use_conf:
- if not self.policy_path:
- self.policy_path = self._get_policy_path(self.policy_file)
- self._load_policy_file(self.policy_path, force_reload,
- overwrite=self.overwrite)
- for path in CONF.policy_dirs:
- try:
- path = self._get_policy_path(path)
- except cfg.ConfigFilesNotFoundError:
- continue
- self._walk_through_policy_directory(path,
- self._load_policy_file,
- force_reload, False)
- @staticmethod
- def _walk_through_policy_directory(path, func, *args):
- # We do not iterate over sub-directories.
- policy_files = next(os.walk(path))[2]
- policy_files.sort()
- for policy_file in [p for p in policy_files if not p.startswith('.')]:
- func(os.path.join(path, policy_file), *args)
- def _load_policy_file(self, path, force_reload, overwrite=True):
- reloaded, data = fileutils.read_cached_file(
- path, force_reload=force_reload)
- if reloaded or not self.rules or not overwrite:
- rules = Rules.load_json(data, self.default_rule)
- self.set_rules(rules, overwrite=overwrite, use_conf=True)
- LOG.debug("Reloaded policy file: %(path)s",
- {'path': path})
- def _get_policy_path(self, path):
- """Locate the policy json data file/path.
- :param path: It's value can be a full path or related path. When
- full path specified, this function just returns the full
- path. When related path specified, this function will
- search configuration directories to find one that exists.
- :returns: The policy path
- :raises: ConfigFilesNotFoundError if the file/path couldn't
- be located.
- """
- policy_path = CONF.find_file(path)
- if policy_path:
- return policy_path
- raise cfg.ConfigFilesNotFoundError((path,))
- def enforce(self, rule, target, creds, do_raise=False,
- exc=None, *args, **kwargs):
- """Checks authorization of a rule against the target and credentials.
- :param rule: A string or BaseCheck instance specifying the rule
- to evaluate.
- :param target: As much information about the object being operated
- on as possible, as a dictionary.
- :param creds: As much information about the user performing the
- action as possible, as a dictionary.
- :param do_raise: Whether to raise an exception or not if check
- fails.
- :param exc: Class of the exception to raise if the check fails.
- Any remaining arguments passed to enforce() (both
- positional and keyword arguments) will be passed to
- the exception class. If not specified, PolicyNotAuthorized
- will be used.
- :return: Returns False if the policy does not allow the action and
- exc is not provided; otherwise, returns a value that
- evaluates to True. Note: for rules using the "case"
- expression, this True value will be the specified string
- from the expression.
- """
- self.load_rules()
- # Allow the rule to be a Check tree
- if isinstance(rule, BaseCheck):
- result = rule(target, creds, self)
- elif not self.rules:
- # No rules to reference means we're going to fail closed
- result = False
- else:
- try:
- # Evaluate the rule
- result = self.rules[rule](target, creds, self)
- except KeyError:
- LOG.debug("Rule [%s] doesn't exist" % rule)
- # If the rule doesn't exist, fail closed
- result = False
- # If it is False, raise the exception if requested
- if do_raise and not result:
- if exc:
- raise exc(*args, **kwargs)
- raise PolicyNotAuthorized(rule)
- return result
-class BaseCheck(object):
- """Abstract base class for Check classes."""
- @abc.abstractmethod
- def __str__(self):
- """String representation of the Check tree rooted at this node."""
- pass
- @abc.abstractmethod
- def __call__(self, target, cred, enforcer):
- """Triggers if instance of the class is called.
- Performs the check. Returns False to reject the access or a
- true value (not necessary True) to accept the access.
- """
- pass
-class FalseCheck(BaseCheck):
- """A policy check that always returns False (disallow)."""
- def __str__(self):
- """Return a string representation of this check."""
- return "!"
- def __call__(self, target, cred, enforcer):
- """Check the policy."""
- return False
-class TrueCheck(BaseCheck):
- """A policy check that always returns True (allow)."""
- def __str__(self):
- """Return a string representation of this check."""
- return "@"
- def __call__(self, target, cred, enforcer):
- """Check the policy."""
- return True
-class Check(BaseCheck):
- """A base class to allow for user-defined policy checks."""
- def __init__(self, kind, match):
- """Initiates Check instance.
- :param kind: The kind of the check, i.e., the field before the
- ':'.
- :param match: The match of the check, i.e., the field after
- the ':'.
- """
- self.kind = kind
- self.match = match
- def __str__(self):
- """Return a string representation of this check."""
- return "%s:%s" % (self.kind, self.match)
-class NotCheck(BaseCheck):
- """Implements the "not" logical operator.
- A policy check that inverts the result of another policy check.
- """
- def __init__(self, rule):
- """Initialize the 'not' check.
- :param rule: The rule to negate. Must be a Check.
- """
- self.rule = rule
- def __str__(self):
- """Return a string representation of this check."""
- return "not %s" % self.rule
- def __call__(self, target, cred, enforcer):
- """Check the policy.
- Returns the logical inverse of the wrapped check.
- """
- return not self.rule(target, cred, enforcer)
-class AndCheck(BaseCheck):
- """Implements the "and" logical operator.
- A policy check that requires that a list of other checks all return True.
- """
- def __init__(self, rules):
- """Initialize the 'and' check.
- :param rules: A list of rules that will be tested.
- """
- self.rules = rules
- def __str__(self):
- """Return a string representation of this check."""
- return "(%s)" % ' and '.join(str(r) for r in self.rules)
- def __call__(self, target, cred, enforcer):
- """Check the policy.
- Requires that all rules accept in order to return True.
- """
- for rule in self.rules:
- if not rule(target, cred, enforcer):
- return False
- return True
- def add_check(self, rule):
- """Adds rule to be tested.
- Allows addition of another rule to the list of rules that will
- be tested. Returns the AndCheck object for convenience.
- """
- self.rules.append(rule)
- return self
-class OrCheck(BaseCheck):
- """Implements the "or" operator.
- A policy check that requires that at least one of a list of other
- checks returns True.
- """
- def __init__(self, rules):
- """Initialize the 'or' check.
- :param rules: A list of rules that will be tested.
- """
- self.rules = rules
- def __str__(self):
- """Return a string representation of this check."""
- return "(%s)" % ' or '.join(str(r) for r in self.rules)
- def __call__(self, target, cred, enforcer):
- """Check the policy.
- Requires that at least one rule accept in order to return True.
- """
- for rule in self.rules:
- if rule(target, cred, enforcer):
- return True
- return False
- def add_check(self, rule):
- """Adds rule to be tested.
- Allows addition of another rule to the list of rules that will
- be tested. Returns the OrCheck object for convenience.
- """
- self.rules.append(rule)
- return self
-def _parse_check(rule):
- """Parse a single base check rule into an appropriate Check object."""
- # Handle the special checks
- if rule == '!':
- return FalseCheck()
- elif rule == '@':
- return TrueCheck()
- try:
- kind, match = rule.split(':', 1)
- except Exception:
- LOG.exception(_LE("Failed to understand rule %s") % rule)
- # If the rule is invalid, we'll fail closed
- return FalseCheck()
- # Find what implements the check
- if kind in _checks:
- return _checks[kind](kind, match)
- elif None in _checks:
- return _checks[None](kind, match)
- else:
- LOG.error(_LE("No handler for matches of kind %s") % kind)
- return FalseCheck()
-def _parse_list_rule(rule):
- """Translates the old list-of-lists syntax into a tree of Check objects.
- Provided for backwards compatibility.
- """
- # Empty rule defaults to True
- if not rule:
- return TrueCheck()
- # Outer list is joined by "or"; inner list by "and"
- or_list = []
- for inner_rule in rule:
- # Elide empty inner lists
- if not inner_rule:
- continue
- # Handle bare strings
- if isinstance(inner_rule, six.string_types):
- inner_rule = [inner_rule]
- # Parse the inner rules into Check objects
- and_list = [_parse_check(r) for r in inner_rule]
- # Append the appropriate check to the or_list
- if len(and_list) == 1:
- or_list.append(and_list[0])
- else:
- or_list.append(AndCheck(and_list))
- # If we have only one check, omit the "or"
- if not or_list:
- return FalseCheck()
- elif len(or_list) == 1:
- return or_list[0]
- return OrCheck(or_list)
-# Used for tokenizing the policy language
-_tokenize_re = re.compile(r'\s+')
-def _parse_tokenize(rule):
- """Tokenizer for the policy language.
- Most of the single-character tokens are specified in the
- _tokenize_re; however, parentheses need to be handled specially,
- because they can appear inside a check string. Thankfully, those
- parentheses that appear inside a check string can never occur at
- the very beginning or end ("%(variable)s" is the correct syntax).
- """
- for tok in _tokenize_re.split(rule):
- # Skip empty tokens
- if not tok or tok.isspace():
- continue
- # Handle leading parens on the token
- clean = tok.lstrip('(')
- for i in range(len(tok) - len(clean)):
- yield '(', '('
- # If it was only parentheses, continue
- if not clean:
- continue
- else:
- tok = clean
- # Handle trailing parens on the token
- clean = tok.rstrip(')')
- trail = len(tok) - len(clean)
- # Yield the cleaned token
- lowered = clean.lower()
- if lowered in ('and', 'or', 'not'):
- # Special tokens
- yield lowered, clean
- elif clean:
- # Not a special token, but not composed solely of ')'
- if len(tok) >= 2 and ((tok[0], tok[-1]) in
- [('"', '"'), ("'", "'")]):
- # It's a quoted string
- yield 'string', tok[1:-1]
- else:
- yield 'check', _parse_check(clean)
- # Yield the trailing parens
- for i in range(trail):
- yield ')', ')'
-class ParseStateMeta(type):
- """Metaclass for the ParseState class.
- Facilitates identifying reduction methods.
- """
- def __new__(mcs, name, bases, cls_dict):
- """Create the class.
- Injects the 'reducers' list, a list of tuples matching token sequences
- to the names of the corresponding reduction methods.
- """
- reducers = []
- for key, value in cls_dict.items():
- if not hasattr(value, 'reducers'):
- continue
- for reduction in value.reducers:
- reducers.append((reduction, key))
- cls_dict['reducers'] = reducers
- return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
-def reducer(*tokens):
- """Decorator for reduction methods.
- Arguments are a sequence of tokens, in order, which should trigger running
- this reduction method.
- """
- def decorator(func):
- # Make sure we have a list of reducer sequences
- if not hasattr(func, 'reducers'):
- func.reducers = []
- # Add the tokens to the list of reducer sequences
- func.reducers.append(list(tokens))
- return func
- return decorator
-class ParseState(object):
- """Implement the core of parsing the policy language.
- Uses a greedy reduction algorithm to reduce a sequence of tokens into
- a single terminal, the value of which will be the root of the Check tree.
- Note: error reporting is rather lacking. The best we can get with
- this parser formulation is an overall "parse failed" error.
- Fortunately, the policy language is simple enough that this
- shouldn't be that big a problem.
- """
- def __init__(self):
- """Initialize the ParseState."""
- self.tokens = []
- self.values = []
- def reduce(self):
- """Perform a greedy reduction of the token stream.
- If a reducer method matches, it will be executed, then the
- reduce() method will be called recursively to search for any more
- possible reductions.
- """
- for reduction, methname in self.reducers:
- if (len(self.tokens) >= len(reduction) and
- self.tokens[-len(reduction):] == reduction):
- # Get the reduction method
- meth = getattr(self, methname)
- # Reduce the token stream
- results = meth(*self.values[-len(reduction):])
- # Update the tokens and values
- self.tokens[-len(reduction):] = [r[0] for r in results]
- self.values[-len(reduction):] = [r[1] for r in results]
- # Check for any more reductions
- return self.reduce()
- def shift(self, tok, value):
- """Adds one more token to the state. Calls reduce()."""
- self.tokens.append(tok)
- self.values.append(value)
- # Do a greedy reduce...
- self.reduce()
- @property
- def result(self):
- """Obtain the final result of the parse.
- Raises ValueError if the parse failed to reduce to a single result.
- """
- if len(self.values) != 1:
- raise ValueError("Could not parse rule")
- return self.values[0]
- @reducer('(', 'check', ')')
- @reducer('(', 'and_expr', ')')
- @reducer('(', 'or_expr', ')')
- def _wrap_check(self, _p1, check, _p2):
- """Turn parenthesized expressions into a 'check' token."""
- return [('check', check)]
- @reducer('check', 'and', 'check')
- def _make_and_expr(self, check1, _and, check2):
- """Create an 'and_expr'.
- Join two checks by the 'and' operator.
- """
- return [('and_expr', AndCheck([check1, check2]))]
- @reducer('and_expr', 'and', 'check')
- def _extend_and_expr(self, and_expr, _and, check):
- """Extend an 'and_expr' by adding one more check."""
- return [('and_expr', and_expr.add_check(check))]
- @reducer('check', 'or', 'check')
- def _make_or_expr(self, check1, _or, check2):
- """Create an 'or_expr'.
- Join two checks by the 'or' operator.
- """
- return [('or_expr', OrCheck([check1, check2]))]
- @reducer('or_expr', 'or', 'check')
- def _extend_or_expr(self, or_expr, _or, check):
- """Extend an 'or_expr' by adding one more check."""
- return [('or_expr', or_expr.add_check(check))]
- @reducer('not', 'check')
- def _make_not_expr(self, _not, check):
- """Invert the result of another check."""
- return [('check', NotCheck(check))]
-def _parse_text_rule(rule):
- """Parses policy to the tree.
- Translates a policy written in the policy language into a tree of
- Check objects.
- """
- # Empty rule means always accept
- if not rule:
- return TrueCheck()
- # Parse the token stream
- state = ParseState()
- for tok, value in _parse_tokenize(rule):
- state.shift(tok, value)
- try:
- return state.result
- except ValueError:
- # Couldn't parse the rule
- LOG.exception(_LE("Failed to understand rule %s") % rule)
- # Fail closed
- return FalseCheck()
-def parse_rule(rule):
- """Parses a policy rule into a tree of Check objects."""
- # If the rule is a string, it's in the policy language
- if isinstance(rule, six.string_types):
- return _parse_text_rule(rule)
- return _parse_list_rule(rule)
-def register(name, func=None):
- """Register a function or Check class as a policy check.
- :param name: Gives the name of the check type, e.g., 'rule',
- 'role', etc. If name is None, a default check type
- will be registered.
- :param func: If given, provides the function or class to register.
- If not given, returns a function taking one argument
- to specify the function or class to register,
- allowing use as a decorator.
- """
- # Perform the actual decoration by registering the function or
- # class. Returns the function or class for compliance with the
- # decorator interface.
- def decorator(func):
- _checks[name] = func
- return func
- # If the function or class is given, do the registration
- if func:
- return decorator(func)
- return decorator
-class RuleCheck(Check):
- def __call__(self, target, creds, enforcer):
- """Recursively checks credentials based on the defined rules."""
- try:
- return enforcer.rules[self.match](target, creds, enforcer)
- except KeyError:
- # We don't have any matching rule; fail closed
- return False
-class RoleCheck(Check):
- def __call__(self, target, creds, enforcer):
- """Check that there is a matching role in the cred dict."""
- return self.match.lower() in [x.lower() for x in creds['roles']]
-class HttpCheck(Check):
- def __call__(self, target, creds, enforcer):
- """Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response
- is exactly 'True'.
- """
- url = ('http:' + self.match) % target
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
- data = {'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- post_data = urlparse.urlencode(data)
- f = urlrequest.urlopen(url, post_data)
- return f.read() == "True"
-class GenericCheck(Check):
- def __call__(self, target, creds, enforcer):
- """Check an individual match.
- Matches look like:
- tenant:%(tenant_id)s
- role:compute:admin
- True:%(user.enabled)s
- 'Member':%(role.name)s
- """
- try:
- match = self.match % target
- except KeyError:
- # While doing GenericCheck if key not
- # present in Target return false
- return False
- try:
- # Try to interpret self.kind as a literal
- leftval = ast.literal_eval(self.kind)
- except ValueError:
- try:
- kind_parts = self.kind.split('.')
- leftval = creds
- for kind_part in kind_parts:
- leftval = leftval[kind_part]
- except KeyError:
- return False
- return match == six.text_type(leftval)
"""Test of Policy Engine For Neutron"""
import mock
-from oslo_config import cfg
+from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import importutils
import six
from neutron.common import exceptions
from neutron import context
from neutron import manager
-from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron.tests import base
action = "example:test"
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": ""}""")
- cfg.CONF.set_override('policy_file', tmpfilename)
- policy.refresh()
+ policy.refresh(policy_file=tmpfilename)
policy.enforce(self.context, action, self.target)
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": "!"}""")
- policy.refresh()
+ policy.refresh(policy_file=tmpfilename)
self.target = {'tenant_id': 'fake_tenant'}
- self.assertRaises(common_policy.PolicyNotAuthorized,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized,
# NOTE(vish): then overload underlying rules
- policy.set_rules(dict((k, common_policy.parse_rule(v))
- for k, v in rules.items()))
+ policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.context = context.Context('fake', 'fake', roles=['member'])
self.target = {}
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_enforce_bad_action_throws(self):
action = "example:denied"
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_check_bad_action_noraise(self):
with mock.patch.object(urlrequest, 'urlopen', new=fakeurlopen):
action = "example:get_http"
target = {}
- self.assertRaises(common_policy.PolicyNotAuthorized,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized,
policy.enforce, self.context,
action, target)
target_not_mine = {'tenant_id': 'another'}
action = "example:my_file"
policy.enforce(self.context, action, target_mine)
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target_not_mine)
def test_early_AND_enforcement(self):
action = "example:early_and_fail"
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_early_OR_enforcement(self):
with open(tmpfilename, "w") as policyfile:
jsonutils.dump(self.rules, policyfile)
- cfg.CONF.set_override('policy_file', tmpfilename)
- policy.refresh()
+ policy.refresh(policy_file=tmpfilename)
self.context = context.Context('fake', 'fake')
def test_policy_called(self):
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
def test_not_found_policy_calls_default(self):
def fakepolicyinit(self, **kwargs):
enf = policy._ENFORCER
- enf.set_rules(common_policy.Rules(self.rules))
+ enf.set_rules(oslo_policy.Rules(self.rules))
def setUp(self):
super(NeutronPolicyTestCase, self).setUp()
# Add Fake resources to RESOURCE_ATTRIBUTE_MAP
- self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ self._set_rules()
+ def remove_fake_resource():
+ self.patcher = mock.patch.object(neutron.policy,
+ 'init',
+ new=self.fakepolicyinit)
+ self.patcher.start()
+ self.addCleanup(remove_fake_resource)
+ self.context = context.Context('fake', 'fake', roles=['user'])
+ plugin_klass = importutils.import_class(
+ "neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
+ self.manager_patcher = mock.patch('neutron.manager.NeutronManager')
+ fake_manager = self.manager_patcher.start()
+ fake_manager_instance = fake_manager.return_value
+ fake_manager_instance.plugin = plugin_klass()
+ def _set_rules(self, **kwargs):
+ rules_dict = {
"context_is_admin": "role:admin",
"context_is_advsvc": "role:advsvc",
"admin_or_network_owner": "rule:context_is_admin or "
"insert_rule": "rule:admin_or_owner",
"remove_rule": "rule:admin_or_owner",
- }.items())
- def remove_fake_resource():
- self.patcher = mock.patch.object(neutron.policy,
- 'init',
- new=self.fakepolicyinit)
- self.patcher.start()
- self.addCleanup(remove_fake_resource)
- self.context = context.Context('fake', 'fake', roles=['user'])
- plugin_klass = importutils.import_class(
- "neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
- self.manager_patcher = mock.patch('neutron.manager.NeutronManager')
- fake_manager = self.manager_patcher.start()
- fake_manager_instance = fake_manager.return_value
- fake_manager_instance.plugin = plugin_klass()
+ }
+ rules_dict.update(**kwargs)
+ self.rules = oslo_policy.Rules.from_dict(rules_dict)
def test_firewall_policy_insert_rule_with_admin_context(self):
action = "insert_rule"
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
- common_policy.PolicyNotAuthorized)
+ oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_private_fails(self):
self._test_nonadmin_action_on_attr('get', 'shared', False,
- common_policy.PolicyNotAuthorized)
+ oslo_policy.PolicyNotAuthorized)
def test_nonadmin_write_on_shared_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', True,
- common_policy.PolicyNotAuthorized)
+ oslo_policy.PolicyNotAuthorized)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
def test_advsvc_create_network_fails(self):
self._test_advsvc_action_on_attr('create', 'network', 'shared', False,
- common_policy.PolicyNotAuthorized)
+ oslo_policy.PolicyNotAuthorized)
def test_advsvc_create_port_works(self):
self._test_advsvc_action_on_attr('create', 'port:mac', 'shared', False)
def test_advsvc_create_subnet_fails(self):
self._test_advsvc_action_on_attr('create', 'subnet', 'shared', False,
- common_policy.PolicyNotAuthorized)
+ oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
- common_policy.PolicyNotAuthorized,
+ oslo_policy.PolicyNotAuthorized,
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
def _test_build_subattribute_match_rule(self, validate_value):
action = "create_" + FAKE_RESOURCE_NAME
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
- self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake'}
- del self.rules['admin_or_network_owner']
- self.rules['admin_or_network_owner'] = common_policy.parse_rule(
- "role:admin or tenant_id:%(network_tenant_id)s")
+ self._set_rules(
+ admin_or_network_owner="role:admin or "
+ "tenant_id:%(network_tenant_id)s")
action = "create_port:mac"
with mock.patch.object(manager.NeutronManager.get_instance().plugin,
'get_network', new=fakegetnetwork):
# Try and add a bad rule
- common_policy.parse_rule,
- 'tenant_id:(wrong_stuff)')
+ oslo_policy.Rules.from_dict,
+ {'test_policy': 'tenant_id:(wrong_stuff)'})
def _test_enforce_tenant_id_raises(self, bad_rule):
- self.rules['admin_or_owner'] = common_policy.parse_rule(bad_rule)
+ self._set_rules(admin_or_owner=bad_rule)
# Trigger a policy with rule admin_or_owner
action = "create_network"
target = {'tenant_id': 'fake'}
def test_process_rules(self):
action = "create_" + FAKE_RESOURCE_NAME
# Construct RuleChecks for an action, attribute and subattribute
- match_rule = common_policy.RuleCheck('rule', action)
- attr_rule = common_policy.RuleCheck('rule', '%s:%ss' %
- (action,
- sub_attr_rules = [common_policy.RuleCheck('rule', '%s:%s:%s' %
- (action, 'attr',
- 'sub_attr_1'))]
+ match_rule = oslo_policy.RuleCheck('rule', action)
+ attr_rule = oslo_policy.RuleCheck(
+ 'rule', '%s:%ss' % (action, FAKE_RESOURCE_NAME))
+ sub_attr_rules = [oslo_policy.RuleCheck(
+ 'rule', '%s:%s:%s' % (action, 'attr', 'sub_attr_1'))]
# Build an AndCheck from the given RuleChecks
# Make the checks nested to better check the recursion
- sub_attr_rules = common_policy.AndCheck(sub_attr_rules)
- attr_rule = common_policy.AndCheck(
+ sub_attr_rules = oslo_policy.AndCheck(sub_attr_rules)
+ attr_rule = oslo_policy.AndCheck(
[attr_rule, sub_attr_rules])
- match_rule = common_policy.AndCheck([match_rule, attr_rule])
+ match_rule = oslo_policy.AndCheck([match_rule, attr_rule])
# Assert that the rules are correctly extracted from the match_rule
rules = policy._process_rules_list([], match_rule)
@mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True)
@mock.patch.object(policy.LOG, 'debug')
def test_log_rule_list(self, mock_debug, mock_is_e):
- policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
+ policy.log_rule_list(oslo_policy.RuleCheck('rule', 'create_'))