from neutron.common import rpc as n_rpc
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
+from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron import quota
exceptions.ServiceUnavailable: webob.exc.HTTPServiceUnavailable,
exceptions.NotAuthorized: webob.exc.HTTPForbidden,
netaddr.AddrFormatError: webob.exc.HTTPBadRequest,
+ common_policy.PolicyNotAuthorized: webob.exc.HTTPForbidden
}
# Fetch the resource and verify if the user can access it
try:
resource = self._item(request, id, True)
- except exceptions.PolicyNotAuthorized:
+ except common_policy.PolicyNotAuthorized:
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
body = kwargs.pop('body', None)
field_list=field_list,
parent_id=parent_id),
fields_to_strip=added_fields)}
- except exceptions.PolicyNotAuthorized:
+ except common_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
policy.enforce(request.context,
action,
obj)
- except exceptions.PolicyNotAuthorized:
+ except common_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
policy.enforce(request.context,
action,
orig_obj)
- except exceptions.PolicyNotAuthorized:
+ except common_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to return
# a 403. Otherwise, pretend that it doesn't exist to avoid
from neutron.common import exceptions
from neutron.openstack.common import gettextutils
from neutron.openstack.common import log as logging
+from neutron.openstack.common import policy as common_policy
from neutron import wsgi
result = method(request=request, **args)
except (exceptions.NeutronException,
- netaddr.AddrFormatError) as e:
+ netaddr.AddrFormatError,
+ common_policy.PolicyNotAuthorized) as e:
for fault in faults:
if isinstance(e, fault):
mapped_exc = faults[fault]
help=_("The API paste config file to use")),
cfg.StrOpt('api_extensions_path', default="",
help=_("The path for API extensions")),
- cfg.StrOpt('policy_file', default="policy.json",
- help=_("The policy file to use")),
cfg.StrOpt('auth_strategy', default='keystone',
help=_("The type of authentication to use")),
cfg.StrOpt('core_plugin',
message = _("User does not have admin privileges: %(reason)s")
-class PolicyNotAuthorized(NotAuthorized):
- message = _("Policy doesn't allow %(action)s to be performed.")
-
-
class NetworkNotFound(NotFound):
message = _("Network %(net_id)s could not be found")
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
project_id:%(project_id)s and not role:dunce
+It is possible to perform policy checks on the following user
+attributes (obtained through the token): user_id, domain_id or
+project_id::
+
+ domain_id:<some_value>
+
+Attributes sent along with API calls can be used by the policy engine
+(on the right side of the expression), by using the following syntax::
+
+ <some_value>:user.id
+
+Contextual attributes of objects identified by their IDs are loaded
+from the database. They are also available to the policy engine and
+can be checked through the `target` keyword::
+
+ <some_value>:target.role.name
+
+All these attributes (related to users, API calls, and context) can be
+checked against each other or against constants, be it literals (True,
+<a_number>) or strings.
+
Finally, two special policy checks should be mentioned; the policy
check "@" will always accept an access, and the policy check "!" will
always reject an access. (Note that if a rule is either the empty
"""
import abc
+import ast
+import os
import re
-import urllib
+from oslo.config import cfg
+from oslo.serialization import jsonutils
import six
-import urllib2
+import six.moves.urllib.parse as urlparse
+import six.moves.urllib.request as urlrequest
-from neutron.openstack.common.gettextutils import _
-from neutron.openstack.common import jsonutils
+from neutron.openstack.common import fileutils
+from neutron.openstack.common._i18n import _, _LE, _LW
from neutron.openstack.common import log as logging
-LOG = logging.getLogger(__name__)
+policy_opts = [
+ cfg.StrOpt('policy_file',
+ default='policy.json',
+ help=_('The JSON file that defines policies.')),
+ cfg.StrOpt('policy_default_rule',
+ default='default',
+ help=_('Default rule. Enforced when a requested rule is not '
+ 'found.')),
+ cfg.MultiStrOpt('policy_dirs',
+ default=['policy.d'],
+ help=_('Directories where policy configuration files are '
+ 'stored.')),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(policy_opts)
+LOG = logging.getLogger(__name__)
-_rules = None
_checks = {}
+class PolicyNotAuthorized(Exception):
+
+ def __init__(self, rule):
+ msg = _("Policy doesn't allow %s to be performed.") % rule
+ super(PolicyNotAuthorized, self).__init__(msg)
+
+
class Rules(dict):
- """
- A store for rules. Handles the default_rule setting directly.
- """
+ """A store for rules. Handles the default_rule setting directly."""
@classmethod
def load_json(cls, data, default_rule=None):
- """
- Allow loading of JSON rule data.
- """
+ """Allow loading of JSON rule data."""
# Suck in the JSON data and parse the rules
rules = dict((k, parse_rule(v)) for k, v in
def __missing__(self, key):
"""Implements the default rule handling."""
+ if isinstance(self.default_rule, dict):
+ raise KeyError(key)
+
# If the default rule isn't actually defined, do something
# reasonably intelligent
- if not self.default_rule or self.default_rule not in self:
+ if not self.default_rule:
+ raise KeyError(key)
+
+ if isinstance(self.default_rule, BaseCheck):
+ return self.default_rule
+
+ # We need to check this or we can get infinite recursion
+ if self.default_rule not in self:
raise KeyError(key)
- return self[self.default_rule]
+ elif isinstance(self.default_rule, six.string_types):
+ return self[self.default_rule]
def __str__(self):
"""Dumps a string representation of the rules."""
return jsonutils.dumps(out_rules, indent=4)
-# Really have to figure out a way to deprecate this
-def set_rules(rules):
- """Set the rules in use for policy checks."""
-
- global _rules
-
- _rules = rules
+class Enforcer(object):
+ """Responsible for loading and enforcing rules.
+ :param policy_file: Custom policy file to use, if none is
+ specified, `CONF.policy_file` will be
+ used.
+ :param rules: Default dictionary / Rules to use. It will be
+ considered just in the first instantiation. If
+ `load_rules(True)`, `clear()` or `set_rules(True)`
+ is called this will be overwritten.
+ :param default_rule: Default rule to use, CONF.default_rule will
+ be used if none is specified.
+ :param use_conf: Whether to load rules from cache or config file.
+ """
-# Ditto
-def reset():
- """Clear the rules used for policy checks."""
-
- global _rules
+ def __init__(self, policy_file=None, rules=None,
+ default_rule=None, use_conf=True):
+ self.default_rule = default_rule or CONF.policy_default_rule
+ self.rules = Rules(rules, self.default_rule)
- _rules = None
+ self.policy_path = None
+ self.policy_file = policy_file or CONF.policy_file
+ self.use_conf = use_conf
+ def set_rules(self, rules, overwrite=True, use_conf=False):
+ """Create a new Rules object based on the provided dict of rules.
-def check(rule, target, creds, exc=None, *args, **kwargs):
- """
- Checks authorization of a rule against the target and credentials.
-
- :param rule: The rule to evaluate.
- :param target: As much information about the object being operated
- on as possible, as a dictionary.
- :param creds: As much information about the user performing the
- action as possible, as a dictionary.
- :param exc: Class of the exception to raise if the check fails.
- Any remaining arguments passed to check() (both
- positional and keyword arguments) will be passed to
- the exception class. If exc is not provided, returns
- False.
-
- :return: Returns False if the policy does not allow the action and
- exc is not provided; otherwise, returns a value that
- evaluates to True. Note: for rules using the "case"
- expression, this True value will be the specified string
- from the expression.
- """
+ :param rules: New rules to use. It should be an instance of dict.
+ :param overwrite: Whether to overwrite current rules or update them
+ with the new rules.
+ :param use_conf: Whether to reload rules from cache or config file.
+ """
- # Allow the rule to be a Check tree
- if isinstance(rule, BaseCheck):
- result = rule(target, creds)
- elif not _rules:
- # No rules to reference means we're going to fail closed
- result = False
- else:
- try:
- # Evaluate the rule
- result = _rules[rule](target, creds)
- except KeyError:
- # If the rule doesn't exist, fail closed
+ if not isinstance(rules, dict):
+ raise TypeError(_("Rules must be an instance of dict or Rules, "
+ "got %s instead") % type(rules))
+ self.use_conf = use_conf
+ if overwrite:
+ self.rules = Rules(rules, self.default_rule)
+ else:
+ self.rules.update(rules)
+
+ def clear(self):
+ """Clears Enforcer rules, policy's cache and policy's path."""
+ self.set_rules({})
+ fileutils.delete_cached_file(self.policy_path)
+ self.default_rule = None
+ self.policy_path = None
+
+ def load_rules(self, force_reload=False):
+ """Loads policy_path's rules.
+
+ Policy file is cached and will be reloaded if modified.
+
+ :param force_reload: Whether to overwrite current rules.
+ """
+
+ if force_reload:
+ self.use_conf = force_reload
+
+ if self.use_conf:
+ if not self.policy_path:
+ self.policy_path = self._get_policy_path(self.policy_file)
+
+ self._load_policy_file(self.policy_path, force_reload)
+ for path in CONF.policy_dirs:
+ try:
+ path = self._get_policy_path(path)
+ except cfg.ConfigFilesNotFoundError:
+ LOG.warn(_LW("Can not find policy directory: %s"), path)
+ continue
+ self._walk_through_policy_directory(path,
+ self._load_policy_file,
+ force_reload, False)
+
+ def _walk_through_policy_directory(self, path, func, *args):
+ # We do not iterate over sub-directories.
+ policy_files = next(os.walk(path))[2]
+ policy_files.sort()
+ for policy_file in [p for p in policy_files if not p.startswith('.')]:
+ func(os.path.join(path, policy_file), *args)
+
+ def _load_policy_file(self, path, force_reload, overwrite=True):
+ reloaded, data = fileutils.read_cached_file(
+ path, force_reload=force_reload)
+ if reloaded or not self.rules:
+ rules = Rules.load_json(data, self.default_rule)
+ self.set_rules(rules, overwrite)
+ LOG.debug("Rules successfully reloaded")
+
+ def _get_policy_path(self, path):
+ """Locate the policy json data file/path.
+
+ :param path: It's value can be a full path or related path. When
+ full path specified, this function just returns the full
+ path. When related path specified, this function will
+ search configuration directories to find one that exists.
+
+ :returns: The policy path
+
+ :raises: ConfigFilesNotFoundError if the file/path couldn't
+ be located.
+ """
+ policy_path = CONF.find_file(path)
+
+ if policy_path:
+ return policy_path
+
+ raise cfg.ConfigFilesNotFoundError((path,))
+
+ def enforce(self, rule, target, creds, do_raise=False,
+ exc=None, *args, **kwargs):
+ """Checks authorization of a rule against the target and credentials.
+
+ :param rule: A string or BaseCheck instance specifying the rule
+ to evaluate.
+ :param target: As much information about the object being operated
+ on as possible, as a dictionary.
+ :param creds: As much information about the user performing the
+ action as possible, as a dictionary.
+ :param do_raise: Whether to raise an exception or not if check
+ fails.
+ :param exc: Class of the exception to raise if the check fails.
+ Any remaining arguments passed to enforce() (both
+ positional and keyword arguments) will be passed to
+ the exception class. If not specified, PolicyNotAuthorized
+ will be used.
+
+ :return: Returns False if the policy does not allow the action and
+ exc is not provided; otherwise, returns a value that
+ evaluates to True. Note: for rules using the "case"
+ expression, this True value will be the specified string
+ from the expression.
+ """
+
+ self.load_rules()
+
+ # Allow the rule to be a Check tree
+ if isinstance(rule, BaseCheck):
+ result = rule(target, creds, self)
+ elif not self.rules:
+ # No rules to reference means we're going to fail closed
result = False
+ else:
+ try:
+ # Evaluate the rule
+ result = self.rules[rule](target, creds, self)
+ except KeyError:
+ LOG.debug("Rule [%s] doesn't exist" % rule)
+ # If the rule doesn't exist, fail closed
+ result = False
- # If it is False, raise the exception if requested
- if exc and result is False:
- raise exc(*args, **kwargs)
+ # If it is False, raise the exception if requested
+ if do_raise and not result:
+ if exc:
+ raise exc(*args, **kwargs)
- return result
+ raise PolicyNotAuthorized(rule)
+ return result
-class BaseCheck(object):
- """
- Abstract base class for Check classes.
- """
- __metaclass__ = abc.ABCMeta
+@six.add_metaclass(abc.ABCMeta)
+class BaseCheck(object):
+ """Abstract base class for Check classes."""
@abc.abstractmethod
def __str__(self):
- """
- Retrieve a string representation of the Check tree rooted at
- this node.
- """
+ """String representation of the Check tree rooted at this node."""
pass
@abc.abstractmethod
- def __call__(self, target, cred):
- """
- Perform the check. Returns False to reject the access or a
+ def __call__(self, target, cred, enforcer):
+ """Triggers if instance of the class is called.
+
+ Performs the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
class FalseCheck(BaseCheck):
- """
- A policy check that always returns False (disallow).
- """
+ """A policy check that always returns False (disallow)."""
def __str__(self):
"""Return a string representation of this check."""
return "!"
- def __call__(self, target, cred):
+ def __call__(self, target, cred, enforcer):
"""Check the policy."""
return False
class TrueCheck(BaseCheck):
- """
- A policy check that always returns True (allow).
- """
+ """A policy check that always returns True (allow)."""
def __str__(self):
"""Return a string representation of this check."""
return "@"
- def __call__(self, target, cred):
+ def __call__(self, target, cred, enforcer):
"""Check the policy."""
return True
class Check(BaseCheck):
- """
- A base class to allow for user-defined policy checks.
- """
+ """A base class to allow for user-defined policy checks."""
def __init__(self, kind, match):
- """
+ """Initiates Check instance.
+
:param kind: The kind of the check, i.e., the field before the
':'.
:param match: The match of the check, i.e., the field after
class NotCheck(BaseCheck):
- """
+ """Implements the "not" logical operator.
+
A policy check that inverts the result of another policy check.
- Implements the "not" operator.
"""
def __init__(self, rule):
- """
- Initialize the 'not' check.
+ """Initialize the 'not' check.
:param rule: The rule to negate. Must be a Check.
"""
return "not %s" % self.rule
- def __call__(self, target, cred):
- """
- Check the policy. Returns the logical inverse of the wrapped
- check.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Returns the logical inverse of the wrapped check.
"""
- return not self.rule(target, cred)
+ return not self.rule(target, cred, enforcer)
class AndCheck(BaseCheck):
- """
- A policy check that requires that a list of other checks all
- return True. Implements the "and" operator.
+ """Implements the "and" logical operator.
+
+ A policy check that requires that a list of other checks all return True.
"""
def __init__(self, rules):
- """
- Initialize the 'and' check.
+ """Initialize the 'and' check.
:param rules: A list of rules that will be tested.
"""
return "(%s)" % ' and '.join(str(r) for r in self.rules)
- def __call__(self, target, cred):
- """
- Check the policy. Requires that all rules accept in order to
- return True.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that all rules accept in order to return True.
"""
for rule in self.rules:
- if not rule(target, cred):
+ if not rule(target, cred, enforcer):
return False
return True
def add_check(self, rule):
- """
+ """Adds rule to be tested.
+
Allows addition of another rule to the list of rules that will
be tested. Returns the AndCheck object for convenience.
"""
class OrCheck(BaseCheck):
- """
+ """Implements the "or" operator.
+
A policy check that requires that at least one of a list of other
- checks returns True. Implements the "or" operator.
+ checks returns True.
"""
def __init__(self, rules):
- """
- Initialize the 'or' check.
+ """Initialize the 'or' check.
:param rules: A list of rules that will be tested.
"""
return "(%s)" % ' or '.join(str(r) for r in self.rules)
- def __call__(self, target, cred):
- """
- Check the policy. Requires that at least one rule accept in
- order to return True.
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that at least one rule accept in order to return True.
"""
for rule in self.rules:
- if rule(target, cred):
+ if rule(target, cred, enforcer):
return True
-
return False
def add_check(self, rule):
- """
+ """Adds rule to be tested.
+
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
def _parse_check(rule):
- """
- Parse a single base check rule into an appropriate Check object.
- """
+ """Parse a single base check rule into an appropriate Check object."""
# Handle the special checks
if rule == '!':
try:
kind, match = rule.split(':', 1)
except Exception:
- LOG.exception(_("Failed to understand rule %(rule)s") % locals())
+ LOG.exception(_LE("Failed to understand rule %s") % rule)
# If the rule is invalid, we'll fail closed
return FalseCheck()
elif None in _checks:
return _checks[None](kind, match)
else:
- LOG.error(_("No handler for matches of kind %s") % kind)
+ LOG.error(_LE("No handler for matches of kind %s") % kind)
return FalseCheck()
def _parse_list_rule(rule):
- """
- Provided for backwards compatibility. Translates the old
- list-of-lists syntax into a tree of Check objects.
+ """Translates the old list-of-lists syntax into a tree of Check objects.
+
+ Provided for backwards compatibility.
"""
# Empty rule defaults to True
continue
# Handle bare strings
- if isinstance(inner_rule, basestring):
+ if isinstance(inner_rule, six.string_types):
inner_rule = [inner_rule]
# Parse the inner rules into Check objects
def _parse_tokenize(rule):
- """
- Tokenizer for the policy language.
+ """Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
class ParseStateMeta(type):
- """
- Metaclass for the ParseState class. Facilitates identifying
- reduction methods.
+ """Metaclass for the ParseState class.
+
+ Facilitates identifying reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
- """
- Create the class. Injects the 'reducers' list, a list of
- tuples matching token sequences to the names of the
- corresponding reduction methods.
+ """Create the class.
+
+ Injects the 'reducers' list, a list of tuples matching token sequences
+ to the names of the corresponding reduction methods.
"""
reducers = []
def reducer(*tokens):
- """
- Decorator for reduction methods. Arguments are a sequence of
- tokens, in order, which should trigger running this reduction
- method.
+ """Decorator for reduction methods.
+
+ Arguments are a sequence of tokens, in order, which should trigger running
+ this reduction method.
"""
def decorator(func):
return decorator
+@six.add_metaclass(ParseStateMeta)
class ParseState(object):
- """
- Implement the core of parsing the policy language. Uses a greedy
- reduction algorithm to reduce a sequence of tokens into a single
- terminal, the value of which will be the root of the Check tree.
+ """Implement the core of parsing the policy language.
+
+ Uses a greedy reduction algorithm to reduce a sequence of tokens into
+ a single terminal, the value of which will be the root of the Check tree.
Note: error reporting is rather lacking. The best we can get with
this parser formulation is an overall "parse failed" error.
shouldn't be that big a problem.
"""
- __metaclass__ = ParseStateMeta
-
def __init__(self):
"""Initialize the ParseState."""
self.values = []
def reduce(self):
- """
- Perform a greedy reduction of the token stream. If a reducer
- method matches, it will be executed, then the reduce() method
- will be called recursively to search for any more possible
- reductions.
+ """Perform a greedy reduction of the token stream.
+
+ If a reducer method matches, it will be executed, then the
+ reduce() method will be called recursively to search for any more
+ possible reductions.
"""
for reduction, methname in self.reducers:
@property
def result(self):
- """
- Obtain the final result of the parse. Raises ValueError if
- the parse failed to reduce to a single result.
+ """Obtain the final result of the parse.
+
+ Raises ValueError if the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
- """
- Create an 'and_expr' from two checks joined by the 'and'
- operator.
+ """Create an 'and_expr'.
+
+ Join two checks by the 'and' operator.
"""
return [('and_expr', AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
- """
- Extend an 'and_expr' by adding one more check.
- """
+ """Extend an 'and_expr' by adding one more check."""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
- """
- Create an 'or_expr' from two checks joined by the 'or'
- operator.
+ """Create an 'or_expr'.
+
+ Join two checks by the 'or' operator.
"""
return [('or_expr', OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
- """
- Extend an 'or_expr' by adding one more check.
- """
+ """Extend an 'or_expr' by adding one more check."""
return [('or_expr', or_expr.add_check(check))]
def _parse_text_rule(rule):
- """
+ """Parses policy to the tree.
+
Translates a policy written in the policy language into a tree of
Check objects.
"""
return state.result
except ValueError:
# Couldn't parse the rule
- LOG.exception(_("Failed to understand rule %(rule)r") % locals())
+ LOG.exception(_LE("Failed to understand rule %s") % rule)
# Fail closed
return FalseCheck()
def parse_rule(rule):
- """
- Parses a policy rule into a tree of Check objects.
- """
+ """Parses a policy rule into a tree of Check objects."""
# If the rule is a string, it's in the policy language
- if isinstance(rule, basestring):
+ if isinstance(rule, six.string_types):
return _parse_text_rule(rule)
return _parse_list_rule(rule)
def register(name, func=None):
- """
- Register a function or Check class as a policy check.
+ """Register a function or Check class as a policy check.
:param name: Gives the name of the check type, e.g., 'rule',
'role', etc. If name is None, a default check type
@register("rule")
class RuleCheck(Check):
- def __call__(self, target, creds):
- """
- Recursively checks credentials based on the defined rules.
- """
+ def __call__(self, target, creds, enforcer):
+ """Recursively checks credentials based on the defined rules."""
try:
- return _rules[self.match](target, creds)
+ return enforcer.rules[self.match](target, creds, enforcer)
except KeyError:
# We don't have any matching rule; fail closed
return False
@register("role")
class RoleCheck(Check):
- def __call__(self, target, creds):
+ def __call__(self, target, creds, enforcer):
"""Check that there is a matching role in the cred dict."""
return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http')
class HttpCheck(Check):
- def __call__(self, target, creds):
- """
- Check http: rules by calling to a remote server.
+ def __call__(self, target, creds, enforcer):
+ """Check http: rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly 'True'.
url = ('http:' + self.match) % target
data = {'target': jsonutils.dumps(target),
'credentials': jsonutils.dumps(creds)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
+ post_data = urlparse.urlencode(data)
+ f = urlrequest.urlopen(url, post_data)
return f.read() == "True"
@register(None)
class GenericCheck(Check):
- def __call__(self, target, creds):
- """
- Check an individual match.
+ def __call__(self, target, creds, enforcer):
+ """Check an individual match.
Matches look like:
tenant:%(tenant_id)s
role:compute:admin
+ True:%(user.enabled)s
+ 'Member':%(role.name)s
"""
- # TODO(termie): do dict inspection via dot syntax
- match = self.match % target
- if self.kind in creds:
- return match == six.text_type(creds[self.kind])
- return False
+ try:
+ match = self.match % target
+ except KeyError:
+ # While doing GenericCheck if key not
+ # present in Target return false
+ return False
+
+ try:
+ # Try to interpret self.kind as a literal
+ leftval = ast.literal_eval(self.kind)
+ except ValueError:
+ try:
+ kind_parts = self.kind.split('.')
+ leftval = creds
+ for kind_part in kind_parts:
+ leftval = leftval[kind_part]
+ except KeyError:
+ return False
+ return match == six.text_type(leftval)
LOG = log.getLogger(__name__)
-_POLICY_PATH = None
-_POLICY_CACHE = {}
+
+_ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
# Maps deprecated 'extension' policies to new-style policies
'set': ['create', 'update']
}
-cfg.CONF.import_opt('policy_file', 'neutron.common.config')
-
def reset():
- global _POLICY_PATH
- global _POLICY_CACHE
- _POLICY_PATH = None
- _POLICY_CACHE = {}
- policy.reset()
+ global _ENFORCER
+ if _ENFORCER:
+ _ENFORCER.clear()
+ _ENFORCER = None
def init():
- global _POLICY_PATH
- global _POLICY_CACHE
- if not _POLICY_PATH:
- _POLICY_PATH = utils.find_config_file({}, cfg.CONF.policy_file)
- if not _POLICY_PATH:
- raise exceptions.PolicyFileNotFound(path=cfg.CONF.policy_file)
- # pass _set_brain to read_cached_file so that the policy brain
- # is reset only if the file has changed
- utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
- reload_func=_set_rules)
+ """Init an instance of the Enforcer class."""
+
+ global _ENFORCER
+ if not _ENFORCER:
+ _ENFORCER = policy.Enforcer()
+ # NOTE: Method _get_policy_path in common.policy can not always locate
+ # neutron policy file (when init() is called in tests),
+ # so set it explicitly.
+ _ENFORCER.policy_path = utils.find_config_file({},
+ cfg.CONF.policy_file)
+ _ENFORCER.load_rules(True)
+
+
+def refresh():
+ """Reset policy and init a new instance of Enforcer."""
+ reset()
+ init()
def get_resource_and_action(action):
return ("%ss" % data[-1], data[0] != 'get')
-def _set_rules(data):
- default_rule = 'default'
- LOG.debug(_("Loading policies from file: %s"), _POLICY_PATH)
+def set_rules(policies, overwrite=True):
+ """Set rules based on the provided dict of rules.
+
+ :param policies: New policies to use. It should be an instance of dict.
+ :param overwrite: Whether to overwrite current rules or update them
+ with the new rules.
+ """
+
+ LOG.debug("Loading policies from file: %s", _ENFORCER.policy_path)
# Ensure backward compatibility with folsom/grizzly convention
# for extension rules
- policies = policy.Rules.load_json(data, default_rule)
for pol in policies.keys():
if any([pol.startswith(depr_pol) for depr_pol in
DEPRECATED_POLICY_MAP.keys()]):
LOG.error(_LE("Backward compatibility unavailable for "
"deprecated policy %s. The policy will "
"not be enforced"), pol)
- policy.set_rules(policies)
+ init()
+ _ENFORCER.set_rules(policies, overwrite)
def _is_attribute_explicitly_set(attribute_name, resource, target, action):
reason=err_reason)
super(OwnerCheck, self).__init__(kind, match)
- def __call__(self, target, creds):
+ def __call__(self, target, creds, enforcer):
if self.target_field not in target:
# policy needs a plugin check
# target field is in the form resource:field
self.field = field
self.value = conv_func(value)
- def __call__(self, target_dict, cred_dict):
+ def __call__(self, target_dict, cred_dict, enforcer):
target_value = target_dict.get(self.field)
# target_value might be a boolean, explicitly compare with None
if target_value is None:
:return: Returns True if access is permitted else False.
"""
- if might_not_exist and not (policy._rules and action in policy._rules):
+ if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
- return policy.check(*(_prepare_check(context, action, target)))
+ return _ENFORCER.enforce(*(_prepare_check(context, action, target)))
def enforce(context, action, target, plugin=None):
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
- :raises neutron.exceptions.PolicyNotAuthorized: if verification fails.
+ :raises neutron.openstack.common.policy.PolicyNotAuthorized:
+ if verification fails.
"""
-
rule, target, credentials = _prepare_check(context, action, target)
- result = policy.check(rule, target, credentials, action=action)
- if not result:
- LOG.debug(_("Failed policy check for '%s'"), action)
- raise exceptions.PolicyNotAuthorized(action=action)
+ try:
+ result = _ENFORCER.enforce(rule, target, credentials,
+ action=action, do_raise=True)
+ except policy.PolicyNotAuthorized:
+ with excutils.save_and_reraise_exception():
+ LOG.debug("Failed policy check for '%s'", action)
return result
target = credentials
# Backward compatibility: if ADMIN_CTX_POLICY is not
# found, default to validating role:admin
- admin_policy = (ADMIN_CTX_POLICY if ADMIN_CTX_POLICY in policy._rules
+ admin_policy = (ADMIN_CTX_POLICY if ADMIN_CTX_POLICY in _ENFORCER.rules
else 'role:admin')
- return policy.check(admin_policy, target, credentials)
+ return _ENFORCER.enforce(admin_policy, target, credentials)
def check_is_advsvc(context):
target = credentials
# Backward compatibility: if ADVSVC_CTX_POLICY is not
# found, default to validating role:advsvc
- advsvc_policy = (ADVSVC_CTX_POLICY in policy._rules
+ advsvc_policy = (ADVSVC_CTX_POLICY in _ENFORCER.rules
and ADVSVC_CTX_POLICY or 'role:advsvc')
- return policy.check(advsvc_policy, target, credentials)
+ return _ENFORCER.enforce(advsvc_policy, target, credentials)
def _extract_roles(rule, roles):
if isinstance(rule, policy.RoleCheck):
roles.append(rule.match.lower())
elif isinstance(rule, policy.RuleCheck):
- _extract_roles(policy._rules[rule.match], roles)
+ _extract_roles(_ENFORCER.rules[rule.match], roles)
elif hasattr(rule, 'rules'):
for rule in rule.rules:
_extract_roles(rule, roles)
# plugin modules. For backward compatibility it returns the literal
# admin if ADMIN_CTX_POLICY is not defined
init()
- if not policy._rules or ADMIN_CTX_POLICY not in policy._rules:
+ if not _ENFORCER.rules or ADMIN_CTX_POLICY not in _ENFORCER.rules:
return ['admin']
try:
- admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY]
+ admin_ctx_rule = _ENFORCER.rules[ADMIN_CTX_POLICY]
except (KeyError, TypeError):
return
roles = []
extraroute_test.ExtraRouteDBIntTestCase):
def setUp(self):
- self.session = context.get_admin_context().session
+ super(TestL3Sync, self).setUp()
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
- super(TestL3Sync, self).setUp()
+ self.session = context.get_admin_context().session
def _make_floatingip_for_tenant_port(self, net_id, port_id, tenant_id):
data = {'floatingip': {'floating_network_id': net_id,
class TestNetPartSync(test_netpartition.NetPartitionTestCase):
def setUp(self):
- self.session = context.get_admin_context().session
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestNetPartSync, self).setUp()
+ self.session = context.get_admin_context().session
def test_net_partition_sync(self):
# If the net-partition exists in neutron and not in VSD,
class TestL2Sync(test_nuage_plugin.NuagePluginV2TestCase):
def setUp(self):
- self.session = context.get_admin_context().session
+ super(TestL2Sync, self).setUp()
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
- super(TestL2Sync, self).setUp()
+ self.session = context.get_admin_context().session
def test_subnet_sync(self):
# If the subnet exists in neutron and not in VSD,
class TestSecurityGroupSync(test_sg.TestSecurityGroups):
def setUp(self):
- self.session = context.get_admin_context().session
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestSecurityGroupSync, self).setUp()
+ self.session = context.get_admin_context().session
def test_sg_get(self):
with self.security_group() as sg:
tenant_id = _uuid()
# Inject rule in policy engine
policy.init()
- common_policy._rules['get_network:name'] = common_policy.parse_rule(
- "rule:admin_only")
+ self.addCleanup(policy.reset)
+ rules = {'get_network:name': common_policy.parse_rule(
+ "rule:admin_only")}
+ policy.set_rules(rules, overwrite=False)
res = self._test_get(tenant_id, tenant_id, 200)
res = self.deserialize(res)
- try:
- self.assertNotIn('name', res['network'])
- finally:
- del common_policy._rules['get_network:name']
+ self.assertNotIn('name', res['network'])
def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
"""Test of Policy Engine For Neutron"""
+import StringIO
import urllib2
import fixtures
import mock
+from oslo.config import cfg
+from oslo.serialization import jsonutils
import six
+import six.moves.urllib.request as urlrequest
import neutron
from neutron.api.v2 import attributes
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
-from neutron.openstack.common import jsonutils
from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron.tests import base
class PolicyFileTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
- policy.reset()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake', is_admin=False)
self.target = {'tenant_id': 'fake'}
self.tempdir = self.useFixture(fixtures.TempDir())
def test_modified_policy_reloads(self):
- def fake_find_config_file(_1, _2):
- return self.tempdir.join('policy')
-
- with mock.patch.object(neutron.common.utils,
- 'find_config_file',
- new=fake_find_config_file):
- tmpfilename = fake_find_config_file(None, None)
- action = "example:test"
- with open(tmpfilename, "w") as policyfile:
- policyfile.write("""{"example:test": ""}""")
- policy.init()
- policy.enforce(self.context, action, self.target)
- with open(tmpfilename, "w") as policyfile:
- policyfile.write("""{"example:test": "!"}""")
- # NOTE(vish): reset stored policy cache so we don't have to
- # sleep(1)
- policy._POLICY_CACHE = {}
- policy.init()
- self.target = {'tenant_id': 'fake_tenant'}
- self.assertRaises(exceptions.PolicyNotAuthorized,
- policy.enforce,
- self.context,
- action,
- self.target)
+ tmpfilename = self.tempdir.join('policy')
+ action = "example:test"
+ with open(tmpfilename, "w") as policyfile:
+ policyfile.write("""{"example:test": ""}""")
+ cfg.CONF.set_override('policy_file', tmpfilename)
+ policy.refresh()
+ policy.enforce(self.context, action, self.target)
+ with open(tmpfilename, "w") as policyfile:
+ policyfile.write("""{"example:test": "!"}""")
+ policy.refresh()
+ self.target = {'tenant_id': 'fake_tenant'}
+ self.assertRaises(common_policy.PolicyNotAuthorized,
+ policy.enforce,
+ self.context,
+ action,
+ self.target)
class PolicyTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
- policy.reset()
self.addCleanup(policy.reset)
# NOTE(vish): preload rules to circumvent reloading from file
- policy.init()
rules = {
"true": '@',
"example:allowed": '@',
"example:lowercase_admin": "role:admin or role:sysadmin",
"example:uppercase_admin": "role:ADMIN or role:sysadmin",
}
+ policy.refresh()
# NOTE(vish): then overload underlying rules
- common_policy.set_rules(common_policy.Rules(
- dict((k, common_policy.parse_rule(v))
- for k, v in rules.items())))
+ policy.set_rules(dict((k, common_policy.parse_rule(v))
+ for k, v in rules.items()))
self.context = context.Context('fake', 'fake', roles=['member'])
self.target = {}
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_enforce_bad_action_throws(self):
action = "example:denied"
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_check_bad_action_noraise(self):
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, True)
- def test_enforce_http_true(self):
-
- def fakeurlopen(url, post_data):
- return six.StringIO("True")
-
- with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
- action = "example:get_http"
- target = {}
- result = policy.enforce(self.context, action, target)
- self.assertEqual(result, True)
+ @mock.patch.object(urlrequest, 'urlopen',
+ return_value=StringIO.StringIO("True"))
+ def test_enforce_http_true(self, mock_urlrequest):
+ action = "example:get_http"
+ target = {}
+ result = policy.enforce(self.context, action, target)
+ self.assertEqual(result, True)
def test_enforce_http_false(self):
with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
action = "example:get_http"
target = {}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
- self.context, action, target)
+ self.assertRaises(common_policy.PolicyNotAuthorized,
+ policy.enforce, self.context,
+ action, target)
def test_templatized_enforcement(self):
target_mine = {'tenant_id': 'fake'}
target_not_mine = {'tenant_id': 'another'}
action = "example:my_file"
policy.enforce(self.context, action, target_mine)
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target_not_mine)
def test_early_AND_enforcement(self):
action = "example:early_and_fail"
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_early_OR_enforcement(self):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
- policy.reset()
- policy.init()
- self.addCleanup(policy.reset)
-
+ self.tempdir = self.useFixture(fixtures.TempDir())
+ tmpfilename = self.tempdir.join('policy.json')
self.rules = {
"default": '',
"example:exist": '!',
}
-
- self._set_rules('default')
+ with open(tmpfilename, "w") as policyfile:
+ jsonutils.dump(self.rules, policyfile)
+ cfg.CONF.set_override('policy_file', tmpfilename)
+ policy.refresh()
+ self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake')
- def _set_rules(self, default_rule):
- rules = common_policy.Rules(
- dict((k, common_policy.parse_rule(v))
- for k, v in self.rules.items()), default_rule)
- common_policy.set_rules(rules)
-
def test_policy_called(self):
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
def test_not_found_policy_calls_default(self):
policy.enforce(self.context, "example:noexist", {})
- def test_default_not_found(self):
- self._set_rules("default_noexist")
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
- self.context, "example:noexist", {})
-
FAKE_RESOURCE_NAME = 'something'
FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
def setUp(self):
super(NeutronPolicyTestCase, self).setUp()
- policy.reset()
- policy.init()
+ policy.refresh()
self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
"update_network:shared": "rule:admin_only",
"get_network": "rule:admin_or_owner or rule:shared or "
"rule:external or rule:context_is_advsvc",
+ "create_subnet": "rule:admin_or_network_owner",
"create_port:mac": "rule:admin_or_network_owner or "
"rule:context_is_advsvc",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"rule:shared"
}.items())
- def fakepolicyinit():
- common_policy.set_rules(common_policy.Rules(self.rules))
+ def fakepolicyinit(**kwargs):
+ enf = policy._ENFORCER
+ enf.set_rules(common_policy.Rules(self.rules))
def remove_fake_resource():
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
- exceptions.PolicyNotAuthorized)
+ common_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_private_fails(self):
self._test_nonadmin_action_on_attr('get', 'shared', False,
- exceptions.PolicyNotAuthorized)
+ common_policy.PolicyNotAuthorized)
def test_nonadmin_write_on_shared_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', True,
- exceptions.PolicyNotAuthorized)
+ common_policy.PolicyNotAuthorized)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
def test_advsvc_create_network_fails(self):
self._test_advsvc_action_on_attr('create', 'network', 'shared', False,
- exceptions.PolicyNotAuthorized)
+ common_policy.PolicyNotAuthorized)
def test_advsvc_create_port_works(self):
self._test_advsvc_action_on_attr('create', 'port:mac', 'shared', False)
def test_advsvc_create_subnet_fails(self):
self._test_advsvc_action_on_attr('create', 'subnet', 'shared', False,
- exceptions.PolicyNotAuthorized)
+ common_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
- exceptions.PolicyNotAuthorized,
+ common_policy.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self):
self.admin_or_owner_legacy)
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
def _test_build_subattribute_match_rule(self, validate_value):
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"some_other_rule": "role:admin",
}.items())
- common_policy.set_rules(common_policy.Rules(rules))
+ policy.set_rules(common_policy.Rules(rules))
# 'admin' role is expected for bw compatibility
self.assertEqual(['admin'], policy.get_admin_roles())
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:admin",
}.items())
- common_policy.set_rules(common_policy.Rules(rules))
+ policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_rule_check(self):
policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
"some_other_rule": "role:admin",
}.items())
- common_policy.set_rules(common_policy.Rules(rules))
+ policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_or_check(self):
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
- policy._set_rules(jsonutils.dumps(input_rules))
+ policy.set_rules(input_rules.copy())
# verify deprecated policy has been removed
for pol in input_rules.keys():
- self.assertNotIn(pol, common_policy._rules)
+ self.assertNotIn(pol, policy._ENFORCER.rules)
# verify deprecated policy was correctly translated. Iterate
# over items for compatibility with unittest2 in python 2.6
for rule in expected_rules:
- self.assertIn(rule, common_policy._rules)
- self.assertEqual(str(common_policy._rules[rule]),
+ self.assertIn(rule, policy._ENFORCER.rules)
+ self.assertEqual(str(policy._ENFORCER.rules[rule]),
expected_rules[rule])
def test_set_rules_with_deprecated_view_policy(self):