A regular user can reset an admin-only attribute to its default
value due to the fact that a corresponding policy rule is
enforced only in the case when an attribute is present in the
target AND has a non-default value.
Added a new attribute "attributes_to_update" which contains a list
of all to-be updated attributes to the body of the target that is
passed to policy.enforce.
Changed a check for whether an attribute is explicitly set.
Now, in the case of update, the function should not pay attention
to a default value of an attribute, but check whether it was
explicitly marked as being updated.
Added unit-tests.
Closes-Bug: #
1357379
Related-Bug: #
1338880
Change-Id: I6537bb1da5ef0d6899bc71e4e949f2c760c103c2
parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
+ # Make a list of attributes to be updated to inform the policy engine
+ # which attributes are set explicitly so that it can distinguish them
+ # from the ones that are set to their default values.
+ orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try:
policy.enforce(request.context,
action,
# Device names start with "tap"
TAP_DEVICE_PREFIX = 'tap'
+
+ATTRIBUTES_TO_UPDATE = 'attributes_to_update'
"""
Policy engine for neutron. Largely copied from nova.
"""
+
+import collections
import itertools
import re
from oslo.config import cfg
from neutron.api.v2 import attributes
+from neutron.common import constants as const
from neutron.common import exceptions
import neutron.common.utils as utils
from neutron.openstack.common import excutils
policy.set_rules(policies)
-def _is_attribute_explicitly_set(attribute_name, resource, target):
- """Verify that an attribute is present and has a non-default value."""
+def _is_attribute_explicitly_set(attribute_name, resource, target, action):
+ """Verify that an attribute is present and is explicitly set."""
+ if 'update' in action:
+ # In the case of update, the function should not pay attention to a
+ # default value of an attribute, but check whether it was explicitly
+ # marked as being updated instead.
+ return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
+ target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED)
return ('default' in resource[attribute_name] and
attribute_name in target and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
target[attribute_name] != resource[attribute_name]['default'])
+def _should_validate_sub_attributes(attribute, sub_attr):
+ """Verify that sub-attributes are iterable and should be validated."""
+ validate = attribute.get('validate')
+ return (validate and isinstance(sub_attr, collections.Iterable) and
+ any([k.startswith('type:dict') and
+ v for (k, v) in validate.iteritems()]))
+
+
def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce
for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name,
res_map[resource],
- target):
+ target, action):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name))
- # Build match entries for sub-attributes, if present
- validate = attribute.get('validate')
- if (validate and any([k.startswith('type:dict') and v
- for (k, v) in
- validate.iteritems()])):
+ # Build match entries for sub-attributes
+ if _should_validate_sub_attributes(
+ attribute, target[attribute_name]):
attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule(
attribute_name, attribute,
network_id='id1',
dummy=body)
+ def test_update_subresource_to_none(self):
+ instance = self.plugin.return_value
+
+ dummy_id = _uuid()
+ body = {'dummy': {}}
+ self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
+ body)
+ instance.update_network_dummy.assert_called_once_with(mock.ANY,
+ dummy_id,
+ network_id='id1',
+ dummy=body)
+
def test_delete_sub_resource(self):
instance = self.plugin.return_value
import neutron
from neutron.api.v2 import attributes
+from neutron.common import constants as const
from neutron.common import exceptions
from neutron import context
from neutron import manager
fake_manager_instance.plugin = plugin_klass()
def _test_action_on_attr(self, context, action, attr, value,
- exception=None):
+ exception=None, **kwargs):
action = "%s_network" % action
target = {'tenant_id': 'the_owner', attr: value}
+ if kwargs:
+ target.update(kwargs)
if exception:
self.assertRaises(exception, policy.enforce,
context, action, target)
self.assertEqual(result, True)
def _test_nonadmin_action_on_attr(self, action, attr, value,
- exception=None):
+ exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, attr,
- value, exception)
+ value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
- def _test_enforce_adminonly_attribute(self, action):
+ def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context()
target = {'shared': True}
+ if kwargs:
+ target.update(kwargs)
result = policy.enforce(admin_context, action, target)
self.assertEqual(result, True)
self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self):
- self._test_enforce_adminonly_attribute('update_network')
+ kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
+ self._test_enforce_adminonly_attribute('update_network', **kwargs)
+
+ def test_reset_adminonly_attr_to_default_fails(self):
+ kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
+ self._test_nonadmin_action_on_attr('update', 'shared', False,
+ exceptions.PolicyNotAuthorized,
+ **kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
del self.rules[policy.ADMIN_CTX_POLICY]