Policy engine for neutron. Largely copied from nova.
"""
import itertools
+import logging
import re
from oslo.config import cfg
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _LE, _LI, _LW
from neutron.openstack.common import importutils
-from neutron.openstack.common import log as logging
+from neutron.openstack.common import log
from neutron.openstack.common import policy
-LOG = logging.getLogger(__name__)
+LOG = log.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
ADMIN_CTX_POLICY = 'context_is_admin'
return policy.AndCheck(sub_attr_rules)
+def _process_rules_list(rules, match_rule):
+ """Recursively walk a policy rule to extract a list of match entries."""
+ if isinstance(match_rule, policy.RuleCheck):
+ rules.append(match_rule.match)
+ elif isinstance(match_rule, policy.AndCheck):
+ for rule in match_rule.rules:
+ _process_rules_list(rules, rule)
+ return rules
+
+
def _build_match_rule(action, target):
"""Create the rule to match for a given action.
attribute_name, attribute,
action, target)])
match_rule = policy.AndCheck([match_rule, attr_rule])
+ # Check that the logger has a DEBUG log level
+ if (cfg.CONF.debug and LOG.logger.level == logging.NOTSET or
+ LOG.logger.level == logging.DEBUG):
+ rules = _process_rules_list([], match_rule)
+ LOG.debug("Enforcing rules: %s", rules)
return match_rule
{'extension:provider_network:set': 'rule:admin_only'},
dict((policy, 'rule:admin_only') for policy in
expected_policies))
+
+ def test_process_rules(self):
+ action = "create_something"
+ # Construct RuleChecks for an action, attribute and subattribute
+ match_rule = common_policy.RuleCheck('rule', action)
+ attr_rule = common_policy.RuleCheck('rule', '%s:%s' %
+ (action, 'somethings'))
+ sub_attr_rules = [common_policy.RuleCheck('rule', '%s:%s:%s' %
+ (action, 'attr',
+ 'sub_attr_1'))]
+ # Build an AndCheck from the given RuleChecks
+ # Make the checks nested to better check the recursion
+ sub_attr_rules = common_policy.AndCheck(sub_attr_rules)
+ attr_rule = common_policy.AndCheck(
+ [attr_rule, sub_attr_rules])
+
+ match_rule = common_policy.AndCheck([match_rule, attr_rule])
+ # Assert that the rules are correctly extracted from the match_rule
+ rules = policy._process_rules_list([], match_rule)
+ self.assertEqual(['create_something', 'create_something:somethings',
+ 'create_something:attr:sub_attr_1'], rules)