]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Forbid regular users to reset admin-only attrs to default values
authorElena Ezhova <eezhova@mirantis.com>
Tue, 19 Aug 2014 11:54:36 +0000 (15:54 +0400)
committerElena Ezhova <eezhova@mirantis.com>
Tue, 23 Sep 2014 11:18:51 +0000 (15:18 +0400)
A regular user can reset an admin-only attribute to its default
value due to the fact that a corresponding policy rule is
enforced only in the case when an attribute is present in the
target AND has a non-default value.

Added a new attribute "attributes_to_update" which contains a list
of all to-be updated attributes to the body of the target that is
passed to policy.enforce.

Changed a check for whether an attribute is explicitly set.
Now, in the case of update, the function should not pay attention
to a default value of an attribute, but check whether it was
explicitly marked as being updated.

Added unit-tests.

Closes-Bug: #1357379
Related-Bug: #1338880
Change-Id: I6537bb1da5ef0d6899bc71e4e949f2c760c103c2

neutron/api/v2/base.py
neutron/common/constants.py
neutron/policy.py
neutron/tests/unit/test_api_v2.py
neutron/tests/unit/test_policy.py

index a8bf20555e69aba8611ddf2c33ff49d30f7c1fd7..53c11b9d218b51bdc339099a3b1f472113378d81 100644 (file)
@@ -513,6 +513,10 @@ class Controller(object):
                               parent_id=parent_id)
         orig_object_copy = copy.copy(orig_obj)
         orig_obj.update(body[self._resource])
+        # Make a list of attributes to be updated to inform the policy engine
+        # which attributes are set explicitly so that it can distinguish them
+        # from the ones that are set to their default values.
+        orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
         try:
             policy.enforce(request.context,
                            action,
index 79353937653592720f2be13d0b1c90abfa5722a3..158e34245ee87f7602c293341da903909ba17e56 100644 (file)
@@ -150,3 +150,5 @@ DEVICE_NAME_MAX_LEN = 15
 
 # Device names start with "tap"
 TAP_DEVICE_PREFIX = 'tap'
+
+ATTRIBUTES_TO_UPDATE = 'attributes_to_update'
index 562ee03630399aac357a0f51161a2ee6477fbde0..b7f7873e19122228f5fbd9819ec87f0c504db584 100644 (file)
 """
 Policy engine for neutron.  Largely copied from nova.
 """
+
+import collections
 import itertools
 import re
 
 from oslo.config import cfg
 
 from neutron.api.v2 import attributes
+from neutron.common import constants as const
 from neutron.common import exceptions
 import neutron.common.utils as utils
 from neutron.openstack.common import excutils
@@ -118,14 +121,28 @@ def _set_rules(data):
     policy.set_rules(policies)
 
 
-def _is_attribute_explicitly_set(attribute_name, resource, target):
-    """Verify that an attribute is present and has a non-default value."""
+def _is_attribute_explicitly_set(attribute_name, resource, target, action):
+    """Verify that an attribute is present and is explicitly set."""
+    if 'update' in action:
+        # In the case of update, the function should not pay attention to a
+        # default value of an attribute, but check whether it was explicitly
+        # marked as being updated instead.
+        return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
+                target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED)
     return ('default' in resource[attribute_name] and
             attribute_name in target and
             target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
             target[attribute_name] != resource[attribute_name]['default'])
 
 
+def _should_validate_sub_attributes(attribute, sub_attr):
+    """Verify that sub-attributes are iterable and should be validated."""
+    validate = attribute.get('validate')
+    return (validate and isinstance(sub_attr, collections.Iterable) and
+            any([k.startswith('type:dict') and
+                 v for (k, v) in validate.iteritems()]))
+
+
 def _build_subattr_match_rule(attr_name, attr, action, target):
     """Create the rule to match for sub-attribute policy checks."""
     # TODO(salv-orlando): Instead of relying on validator info, introduce
@@ -173,16 +190,14 @@ def _build_match_rule(action, target):
             for attribute_name in res_map[resource]:
                 if _is_attribute_explicitly_set(attribute_name,
                                                 res_map[resource],
-                                                target):
+                                                target, action):
                     attribute = res_map[resource][attribute_name]
                     if 'enforce_policy' in attribute:
                         attr_rule = policy.RuleCheck('rule', '%s:%s' %
                                                      (action, attribute_name))
-                        # Build match entries for sub-attributes, if present
-                        validate = attribute.get('validate')
-                        if (validate and any([k.startswith('type:dict') and v
-                                              for (k, v) in
-                                              validate.iteritems()])):
+                        # Build match entries for sub-attributes
+                        if _should_validate_sub_attributes(
+                                attribute, target[attribute_name]):
                             attr_rule = policy.AndCheck(
                                 [attr_rule, _build_subattr_match_rule(
                                     attribute_name, attribute,
index e2ee2aacb486077a4debc48ffef11e21710714f8..ec8ca75ed2d7748323d642a8e2baa710da3bb5fd 100644 (file)
@@ -1207,6 +1207,18 @@ class SubresourceTest(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
                                                               network_id='id1',
                                                               dummy=body)
 
+    def test_update_subresource_to_none(self):
+        instance = self.plugin.return_value
+
+        dummy_id = _uuid()
+        body = {'dummy': {}}
+        self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
+                          body)
+        instance.update_network_dummy.assert_called_once_with(mock.ANY,
+                                                              dummy_id,
+                                                              network_id='id1',
+                                                              dummy=body)
+
     def test_delete_sub_resource(self):
         instance = self.plugin.return_value
 
index bc0074c909c736db6bba823e4836a0cf7231eb7c..ad3e81381803598110d921ada8f6b3e2e6893911 100644 (file)
@@ -23,6 +23,7 @@ import six
 
 import neutron
 from neutron.api.v2 import attributes
+from neutron.common import constants as const
 from neutron.common import exceptions
 from neutron import context
 from neutron import manager
@@ -282,9 +283,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
         fake_manager_instance.plugin = plugin_klass()
 
     def _test_action_on_attr(self, context, action, attr, value,
-                             exception=None):
+                             exception=None, **kwargs):
         action = "%s_network" % action
         target = {'tenant_id': 'the_owner', attr: value}
+        if kwargs:
+            target.update(kwargs)
         if exception:
             self.assertRaises(exception, policy.enforce,
                               context, action, target)
@@ -293,10 +296,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
             self.assertEqual(result, True)
 
     def _test_nonadmin_action_on_attr(self, action, attr, value,
-                                      exception=None):
+                                      exception=None, **kwargs):
         user_context = context.Context('', "user", roles=['user'])
         self._test_action_on_attr(user_context, action, attr,
-                                  value, exception)
+                                  value, exception, **kwargs)
 
     def test_nonadmin_write_on_private_fails(self):
         self._test_nonadmin_action_on_attr('create', 'shared', False,
@@ -313,9 +316,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
     def test_nonadmin_read_on_shared_succeeds(self):
         self._test_nonadmin_action_on_attr('get', 'shared', True)
 
-    def _test_enforce_adminonly_attribute(self, action):
+    def _test_enforce_adminonly_attribute(self, action, **kwargs):
         admin_context = context.get_admin_context()
         target = {'shared': True}
+        if kwargs:
+            target.update(kwargs)
         result = policy.enforce(admin_context, action, target)
         self.assertEqual(result, True)
 
@@ -323,7 +328,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
         self._test_enforce_adminonly_attribute('create_network')
 
     def test_enforce_adminonly_attribute_update(self):
-        self._test_enforce_adminonly_attribute('update_network')
+        kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
+        self._test_enforce_adminonly_attribute('update_network', **kwargs)
+
+    def test_reset_adminonly_attr_to_default_fails(self):
+        kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
+        self._test_nonadmin_action_on_attr('update', 'shared', False,
+                                           exceptions.PolicyNotAuthorized,
+                                           **kwargs)
 
     def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
         del self.rules[policy.ADMIN_CTX_POLICY]