target[attribute_name] != resource[attribute_name]['default'])
+def _build_subattr_match_rule(attr_name, attr, action, target):
+ """Create the rule to match for sub-attribute policy checks."""
+ # TODO(salv-orlando): Instead of relying on validator info, introduce
+ # typing for API attributes
+ # Expect a dict as type descriptor
+ validate = attr['validate']
+ key = filter(lambda k: k.startswith('type:dict'), validate.keys())
+ if not key:
+ LOG.warn(_("Unable to find data type descriptor for attribute %s"),
+ attr_name)
+ return
+ data = validate[key[0]]
+ if not isinstance(data, dict):
+ LOG.debug(_("Attribute type descriptor is not a dict. Unable to "
+ "generate any sub-attr policy rule for %s."),
+ attr_name)
+ return
+ sub_attr_rules = [policy.RuleCheck('rule', '%s:%s:%s' %
+ (action, attr_name,
+ sub_attr_name)) for
+ sub_attr_name in data if sub_attr_name in
+ target[attr_name]]
+ return policy.AndCheck(sub_attr_rules)
+
+
def _build_match_rule(action, target):
"""Create the rule to match for a given action.
2) add an entry for the specific action (e.g.: create_network)
3) add an entry for attributes of a resource for which the action
is being executed (e.g.: create_network:shared)
-
+ 4) add an entry for sub-attributes of a resource for which the
+ action is being executed
+ (e.g.: create_router:external_gateway_info:network_id)
"""
match_rule = policy.RuleCheck('rule', action)
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name))
+ # Build match entries for sub-attributes, if present
+ validate = attribute.get('validate')
+ if (validate and any([k.startswith('type:dict') and v
+ for (k, v) in
+ validate.iteritems()])):
+ attr_rule = policy.AndCheck(
+ [attr_rule, _build_subattr_match_rule(
+ attribute_name, attribute,
+ action, target)])
match_rule = policy.AndCheck([match_rule, attr_rule])
-
return match_rule
import mock
import neutron
+from neutron.api.v2 import attributes
from neutron.common import exceptions
from neutron import context
from neutron import manager
self.context, "example:noexist", {})
+FAKE_RESOURCE_NAME = 'something'
+FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
+ {'attr': {'allow_post': True,
+ 'allow_put': True,
+ 'is_visible': True,
+ 'default': None,
+ 'enforce_policy': True,
+ 'validate': {'type:dict':
+ {'sub_attr_1': {'type:string': None},
+ 'sub_attr_2': {'type:string': None}}}
+ }}}
+
+
class NeutronPolicyTestCase(base.BaseTestCase):
def setUp(self):
self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
+ # Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"context_is_admin": "role:admin",
"admin_or_network_owner": "rule:context_is_admin or "
"rule:shared or "
"rule:external",
"create_port:mac": "rule:admin_or_network_owner",
+ "create_something": "rule:admin_or_owner",
+ "create_something:attr": "rule:admin_or_owner",
+ "create_something:attr:sub_attr_1": "rule:admin_or_owner",
+ "create_something:attr:sub_attr_2": "rule:admin_only"
}.items())
def fakepolicyinit():
common_policy.set_rules(common_policy.Rules(self.rules))
+ def remove_fake_resource():
+ del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
+
self.patcher = mock.patch.object(neutron.policy,
'init',
new=fakepolicyinit)
self.patcher.start()
self.addCleanup(self.patcher.stop)
+ self.addCleanup(remove_fake_resource)
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
+ def _test_build_subattribute_match_rule(self, validate_value):
+ bk = FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate']
+ FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = (
+ validate_value)
+ action = "create_something"
+ target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
+ self.assertFalse(policy._build_subattr_match_rule(
+ 'attr',
+ FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr'],
+ action,
+ target))
+ FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk
+
+ def test_build_subattribute_match_rule_empty_dict_validator(self):
+ self._test_build_subattribute_match_rule({})
+
+ def test_build_subattribute_match_rule_wrong_validation_info(self):
+ self._test_build_subattribute_match_rule(
+ {'type:dict': 'wrong_stuff'})
+
+ def test_enforce_subattribute(self):
+ action = "create_something"
+ target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
+ result = policy.enforce(self.context, action, target, None)
+ self.assertEqual(result, True)
+
+ def test_enforce_admin_only_subattribute(self):
+ action = "create_something"
+ target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
+ 'sub_attr_2': 'y'}}
+ result = policy.enforce(context.get_admin_context(),
+ action, target, None)
+ self.assertEqual(result, True)
+
+ def test_enforce_admin_only_subattribute_nonadminctx_returns_403(self):
+ action = "create_something"
+ target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
+ 'sub_attr_2': 'y'}}
+ self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.context, action, target, None)
+
def test_enforce_regularuser_on_read(self):
action = "get_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}