attribute_name, attribute,
action, target)])
match_rule = policy.AndCheck([match_rule, attr_rule])
- # Check that the logger has a DEBUG log level
- if (cfg.CONF.debug and LOG.logger.level == logging.NOTSET or
- LOG.logger.level == logging.DEBUG):
- rules = _process_rules_list([], match_rule)
- LOG.debug("Enforcing rules: %s", rules)
return match_rule
return match_rule, target, credentials
+def log_rule_list(match_rule):
+ if LOG.isEnabledFor(logging.DEBUG):
+ rules = _process_rules_list([], match_rule)
+ LOG.debug("Enforcing rules: %s", rules)
+
+
def check(context, action, target, plugin=None, might_not_exist=False):
"""Verifies that the action is valid on the target in this context.
"""
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
- return _ENFORCER.enforce(*(_prepare_check(context, action, target)))
+ match_rule, target, credentials = _prepare_check(context, action, target)
+ result = _ENFORCER.enforce(match_rule, target, credentials)
+ # logging applied rules in case of failure
+ if not result:
+ log_rule_list(match_rule)
+ return result
def enforce(context, action, target, plugin=None):
"""
rule, target, credentials = _prepare_check(context, action, target)
try:
- result = _ENFORCER.enforce(rule, target, credentials,
- action=action, do_raise=True)
+ result = _ENFORCER.enforce(rule, target, credentials, action=action,
+ do_raise=True)
except policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception():
+ log_rule_list(rule)
LOG.debug("Failed policy check for '%s'", action)
return result
"""Test of Policy Engine For Neutron"""
+import contextlib
import StringIO
import urllib2
rules = policy._process_rules_list([], match_rule)
self.assertEqual(['create_something', 'create_something:somethings',
'create_something:attr:sub_attr_1'], rules)
+
+ def test_log_rule_list(self):
+ with contextlib.nested(
+ mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True),
+ mock.patch.object(policy.LOG, 'debug')
+ ) as (is_e, dbg):
+ policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
+ self.assertTrue(is_e.called)
+ self.assertTrue(dbg.called)