context,
'%s:%s' % (self._plugin_handlers[self.SHOW], attr_name),
data,
- might_not_exist=True):
+ might_not_exist=True,
+ pluralized=self._collection):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
arg_list.append(body)
# It is ok to raise a 403 because accessibility to the
# object was checked earlier in this method
- policy.enforce(request.context, name, resource)
+ policy.enforce(request.context,
+ name,
+ resource,
+ pluralized=self._collection)
return getattr(self._plugin, name)(*arg_list, **kwargs)
return _handle_action
else:
if policy.check(request.context,
self._plugin_handlers[self.SHOW],
obj,
- plugin=self._plugin)]
+ plugin=self._plugin,
+ pluralized=self._collection)]
# Use the first element in the list for discriminating which attributes
# should be filtered out because of authZ policies
# fields_to_add contains a list of attributes added for request policy
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
if do_authz:
- policy.enforce(request.context, action, obj)
+ policy.enforce(request.context,
+ action,
+ obj,
+ pluralized=self._collection)
return obj
def _send_dhcp_notification(self, context, data, methodname):
item[self._resource])
policy.enforce(request.context,
action,
- item[self._resource])
+ item[self._resource],
+ pluralized=self._collection)
try:
tenant_id = item[self._resource]['tenant_id']
count = quota.QUOTAS.count(request.context, self._resource,
try:
policy.enforce(request.context,
action,
- obj)
+ obj,
+ pluralized=self._collection)
except common_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
try:
policy.enforce(request.context,
action,
- orig_obj)
+ orig_obj,
+ pluralized=self._collection)
except common_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to return
init()
-def get_resource_and_action(action):
+def get_resource_and_action(action, pluralized=None):
"""Extract resource and action (write, read) from api operation."""
data = action.split(':', 1)[0].split('_', 1)
- return ("%ss" % data[-1], data[0] != 'get')
+ resource = pluralized or ("%ss" % data[-1])
+ return (resource, data[0] != 'get')
def set_rules(policies, overwrite=True):
return rules
-def _build_match_rule(action, target):
+def _build_match_rule(action, target, pluralized):
"""Create the rule to match for a given action.
The policy rule to be matched is built in the following way:
(e.g.: create_router:external_gateway_info:network_id)
"""
match_rule = policy.RuleCheck('rule', action)
- resource, is_write = get_resource_and_action(action)
+ resource, is_write = get_resource_and_action(action, pluralized)
# Attribute-based checks shall not be enforced on GETs
if is_write:
# assigning to variable with short name for improving readability
return target_value == self.value
-def _prepare_check(context, action, target):
+def _prepare_check(context, action, target, pluralized):
"""Prepare rule, target, and credentials for the policy engine."""
# Compare with None to distinguish case in which target is {}
if target is None:
target = {}
- match_rule = _build_match_rule(action, target)
+ match_rule = _build_match_rule(action, target, pluralized)
credentials = context.to_dict()
return match_rule, target, credentials
LOG.debug("Enforcing rules: %s", rules)
-def check(context, action, target, plugin=None, might_not_exist=False):
+def check(context, action, target, plugin=None, might_not_exist=False,
+ pluralized=None):
"""Verifies that the action is valid on the target in this context.
:param context: neutron context
:param might_not_exist: If True the policy check is skipped (and the
function returns True) if the specified policy does not exist.
Defaults to false.
+ :param pluralized: pluralized case of resource
+ e.g. firewall_policy -> pluralized = "firewall_policies"
:return: Returns True if access is permitted else False.
"""
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
- match_rule, target, credentials = _prepare_check(context, action, target)
- result = _ENFORCER.enforce(match_rule, target, credentials)
+ match_rule, target, credentials = _prepare_check(context,
+ action,
+ target,
+ pluralized)
+ result = _ENFORCER.enforce(match_rule,
+ target,
+ credentials,
+ pluralized=pluralized)
# logging applied rules in case of failure
if not result:
log_rule_list(match_rule)
return result
-def enforce(context, action, target, plugin=None):
+def enforce(context, action, target, plugin=None, pluralized=None):
"""Verifies that the action is valid on the target in this context.
:param context: neutron context
location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
+ :param pluralized: pluralized case of resource
+ e.g. firewall_policy -> pluralized = "firewall_policies"
:raises neutron.openstack.common.policy.PolicyNotAuthorized:
if verification fails.
"""
- rule, target, credentials = _prepare_check(context, action, target)
+ rule, target, credentials = _prepare_check(context,
+ action,
+ target,
+ pluralized)
try:
result = _ENFORCER.enforce(rule, target, credentials, action=action,
do_raise=True)
policy.enforce(self.context, "example:noexist", {})
-FAKE_RESOURCE_NAME = 'something'
-FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
- {'attr': {'allow_post': True,
- 'allow_put': True,
- 'is_visible': True,
- 'default': None,
- 'enforce_policy': True,
- 'validate': {'type:dict':
- {'sub_attr_1': {'type:string': None},
- 'sub_attr_2': {'type:string': None}}}
- }}}
+FAKE_RESOURCE_NAME = 'fake_resource'
+FAKE_SPECIAL_RESOURCE_NAME = 'fake_policy'
+FAKE_RESOURCES = {"%ss" % FAKE_RESOURCE_NAME:
+ {'attr': {'allow_post': True,
+ 'allow_put': True,
+ 'is_visible': True,
+ 'default': None,
+ 'enforce_policy': True,
+ 'validate': {'type:dict':
+ {'sub_attr_1': {'type:string': None},
+ 'sub_attr_2': {'type:string': None}}}
+ }},
+ # special plural name
+ "%s" % FAKE_SPECIAL_RESOURCE_NAME.replace('y', 'ies'):
+ {'attr': {'allow_post': True,
+ 'allow_put': True,
+ 'is_visible': True,
+ 'default': None,
+ 'enforce_policy': True,
+ 'validate': {'type:dict':
+ {'sub_attr_1': {'type:string': None},
+ 'sub_attr_2': {'type:string': None}}}
+ }}}
class NeutronPolicyTestCase(base.BaseTestCase):
policy.refresh()
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
- # Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
- attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
+ # Add Fake resources to RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCES)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"context_is_admin": "role:admin",
"context_is_advsvc": "role:advsvc",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
- "create_something": "rule:admin_or_owner",
- "create_something:attr": "rule:admin_or_owner",
- "create_something:attr:sub_attr_1": "rule:admin_or_owner",
- "create_something:attr:sub_attr_2": "rule:admin_only",
+ "create_fake_resource": "rule:admin_or_owner",
+ "create_fake_resource:attr": "rule:admin_or_owner",
+ "create_fake_resource:attr:sub_attr_1": "rule:admin_or_owner",
+ "create_fake_resource:attr:sub_attr_2": "rule:admin_only",
+ "create_fake_policy:": "rule:admin_or_owner",
"get_firewall_policy": "rule:admin_or_owner or "
"rule:shared",
"get_firewall_rule": "rule:admin_or_owner or "
self.context, action, target)
def _test_build_subattribute_match_rule(self, validate_value):
- bk = FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate']
- FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = (
+ bk = FAKE_RESOURCES['%ss' % FAKE_RESOURCE_NAME]['attr']['validate']
+ FAKE_RESOURCES['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = (
validate_value)
- action = "create_something"
+ action = "create_" + FAKE_RESOURCE_NAME
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
self.assertFalse(policy._build_subattr_match_rule(
'attr',
- FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr'],
+ FAKE_RESOURCES['%ss' % FAKE_RESOURCE_NAME]['attr'],
action,
target))
- FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk
+ FAKE_RESOURCES['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk
def test_build_subattribute_match_rule_empty_dict_validator(self):
self._test_build_subattribute_match_rule({})
self._test_build_subattribute_match_rule(
{'type:dict': 'wrong_stuff'})
+ def test_build_match_rule_special_pluralized(self):
+ action = "create_" + FAKE_SPECIAL_RESOURCE_NAME
+ pluralized = "create_fake_policies"
+ target = {}
+ result = policy._build_match_rule(action, target, pluralized)
+ self.assertEqual("rule:" + action, str(result))
+
+ def test_build_match_rule_normal_pluralized_when_create(self):
+ action = "create_" + FAKE_RESOURCE_NAME
+ target = {}
+ result = policy._build_match_rule(action, target, None)
+ self.assertEqual("rule:" + action, str(result))
+
def test_enforce_subattribute(self):
- action = "create_something"
+ action = "create_" + FAKE_RESOURCE_NAME
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
result = policy.enforce(self.context, action, target, None)
self.assertEqual(result, True)
def test_enforce_admin_only_subattribute(self):
- action = "create_something"
+ action = "create_" + FAKE_RESOURCE_NAME
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
result = policy.enforce(context.get_admin_context(),
self.assertEqual(result, True)
def test_enforce_admin_only_subattribute_nonadminctx_returns_403(self):
- action = "create_something"
+ action = "create_" + FAKE_RESOURCE_NAME
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
expected_policies))
def test_process_rules(self):
- action = "create_something"
+ action = "create_" + FAKE_RESOURCE_NAME
# Construct RuleChecks for an action, attribute and subattribute
match_rule = common_policy.RuleCheck('rule', action)
- attr_rule = common_policy.RuleCheck('rule', '%s:%s' %
- (action, 'somethings'))
+ attr_rule = common_policy.RuleCheck('rule', '%s:%ss' %
+ (action,
+ FAKE_RESOURCE_NAME))
sub_attr_rules = [common_policy.RuleCheck('rule', '%s:%s:%s' %
(action, 'attr',
'sub_attr_1'))]
match_rule = common_policy.AndCheck([match_rule, attr_rule])
# Assert that the rules are correctly extracted from the match_rule
rules = policy._process_rules_list([], match_rule)
- self.assertEqual(['create_something', 'create_something:somethings',
- 'create_something:attr:sub_attr_1'], rules)
+ self.assertEqual(['create_fake_resource',
+ 'create_fake_resource:fake_resources',
+ 'create_fake_resource:attr:sub_attr_1'], rules)
def test_log_rule_list(self):
with contextlib.nested(