]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Enable policy control over external_gateway_info sub-attributes
authorSalvatore Orlando <salv.orlando@gmail.com>
Mon, 13 May 2013 11:12:46 +0000 (13:12 +0200)
committerSalvatore Orlando <salv.orlando@gmail.com>
Tue, 16 Jul 2013 15:52:27 +0000 (17:52 +0200)
Part 2 of blueprint l3-ext-gw-modes

This patch extends the logic for building policy rule matches in order to
include sub-attributes as well. This logic will be leveraged by the
ext-gw-mode api extension.

Change-Id: I7f46a395597b71bb1c5110aa4e792a04a5010d4c

etc/policy.json
neutron/policy.py
neutron/tests/unit/test_policy.py

index 62a90b94667d23c2c1a5592c577124492bf62362..b85384e993d3826425341cc6ddfada6473a37a5a 100644 (file)
@@ -55,6 +55,9 @@
     "update_port:mac_learning_enabled": "rule:admin_or_network_owner",
     "delete_port": "rule:admin_or_owner",
 
+    "create_router:external_gateway_info:enable_snat": "rule:admin_only",
+    "update_router:external_gateway_info:enable_snat": "rule:admin_only",
+
     "create_service_type": "rule:admin_only",
     "update_service_type": "rule:admin_only",
     "delete_service_type": "rule:admin_only",
index 28d8f60e99aa20f041fe98d0c9fb6cc3cb250850..f22894de16db952cc29f5447c20f9a0804121c88 100644 (file)
@@ -126,6 +126,31 @@ def _is_attribute_explicitly_set(attribute_name, resource, target):
             target[attribute_name] != resource[attribute_name]['default'])
 
 
+def _build_subattr_match_rule(attr_name, attr, action, target):
+    """Create the rule to match for sub-attribute policy checks."""
+    # TODO(salv-orlando): Instead of relying on validator info, introduce
+    # typing for API attributes
+    # Expect a dict as type descriptor
+    validate = attr['validate']
+    key = filter(lambda k: k.startswith('type:dict'), validate.keys())
+    if not key:
+        LOG.warn(_("Unable to find data type descriptor for attribute %s"),
+                 attr_name)
+        return
+    data = validate[key[0]]
+    if not isinstance(data, dict):
+        LOG.debug(_("Attribute type descriptor is not a dict. Unable to "
+                    "generate any sub-attr policy rule for %s."),
+                  attr_name)
+        return
+    sub_attr_rules = [policy.RuleCheck('rule', '%s:%s:%s' %
+                                       (action, attr_name,
+                                        sub_attr_name)) for
+                      sub_attr_name in data if sub_attr_name in
+                      target[attr_name]]
+    return policy.AndCheck(sub_attr_rules)
+
+
 def _build_match_rule(action, target):
     """Create the rule to match for a given action.
 
@@ -134,7 +159,9 @@ def _build_match_rule(action, target):
     2) add an entry for the specific action (e.g.: create_network)
     3) add an entry for attributes of a resource for which the action
        is being executed (e.g.: create_network:shared)
-
+    4) add an entry for sub-attributes of a resource for which the
+       action is being executed
+       (e.g.: create_router:external_gateway_info:network_id)
     """
 
     match_rule = policy.RuleCheck('rule', action)
@@ -152,8 +179,16 @@ def _build_match_rule(action, target):
                     if 'enforce_policy' in attribute:
                         attr_rule = policy.RuleCheck('rule', '%s:%s' %
                                                      (action, attribute_name))
+                        # Build match entries for sub-attributes, if present
+                        validate = attribute.get('validate')
+                        if (validate and any([k.startswith('type:dict') and v
+                                              for (k, v) in
+                                              validate.iteritems()])):
+                            attr_rule = policy.AndCheck(
+                                [attr_rule, _build_subattr_match_rule(
+                                    attribute_name, attribute,
+                                    action, target)])
                         match_rule = policy.AndCheck([match_rule, attr_rule])
-
     return match_rule
 
 
index 76f1c79bab4570ea513222b597e9ec9f8486d0e7..d602cd93cbf1043fb8ea03439782f5042035b888 100644 (file)
@@ -23,6 +23,7 @@ import fixtures
 import mock
 
 import neutron
+from neutron.api.v2 import attributes
 from neutron.common import exceptions
 from neutron import context
 from neutron import manager
@@ -201,6 +202,19 @@ class DefaultPolicyTestCase(base.BaseTestCase):
                           self.context, "example:noexist", {})
 
 
+FAKE_RESOURCE_NAME = 'something'
+FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
+                 {'attr': {'allow_post': True,
+                           'allow_put': True,
+                           'is_visible': True,
+                           'default': None,
+                           'enforce_policy': True,
+                           'validate': {'type:dict':
+                                        {'sub_attr_1': {'type:string': None},
+                                         'sub_attr_2': {'type:string': None}}}
+                           }}}
+
+
 class NeutronPolicyTestCase(base.BaseTestCase):
 
     def setUp(self):
@@ -210,6 +224,8 @@ class NeutronPolicyTestCase(base.BaseTestCase):
         self.addCleanup(policy.reset)
         self.admin_only_legacy = "role:admin"
         self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
+        # Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
+        attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
         self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
             "context_is_admin": "role:admin",
             "admin_or_network_owner": "rule:context_is_admin or "
@@ -231,16 +247,24 @@ class NeutronPolicyTestCase(base.BaseTestCase):
                            "rule:shared or "
                            "rule:external",
             "create_port:mac": "rule:admin_or_network_owner",
+            "create_something": "rule:admin_or_owner",
+            "create_something:attr": "rule:admin_or_owner",
+            "create_something:attr:sub_attr_1": "rule:admin_or_owner",
+            "create_something:attr:sub_attr_2": "rule:admin_only"
         }.items())
 
         def fakepolicyinit():
             common_policy.set_rules(common_policy.Rules(self.rules))
 
+        def remove_fake_resource():
+            del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
+
         self.patcher = mock.patch.object(neutron.policy,
                                          'init',
                                          new=fakepolicyinit)
         self.patcher.start()
         self.addCleanup(self.patcher.stop)
+        self.addCleanup(remove_fake_resource)
         self.context = context.Context('fake', 'fake', roles=['user'])
         plugin_klass = importutils.import_class(
             "neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
@@ -319,6 +343,47 @@ class NeutronPolicyTestCase(base.BaseTestCase):
         self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
                           self.context, action, target)
 
+    def _test_build_subattribute_match_rule(self, validate_value):
+        bk = FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate']
+        FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = (
+            validate_value)
+        action = "create_something"
+        target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
+        self.assertFalse(policy._build_subattr_match_rule(
+            'attr',
+            FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr'],
+            action,
+            target))
+        FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk
+
+    def test_build_subattribute_match_rule_empty_dict_validator(self):
+        self._test_build_subattribute_match_rule({})
+
+    def test_build_subattribute_match_rule_wrong_validation_info(self):
+        self._test_build_subattribute_match_rule(
+            {'type:dict': 'wrong_stuff'})
+
+    def test_enforce_subattribute(self):
+        action = "create_something"
+        target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
+        result = policy.enforce(self.context, action, target, None)
+        self.assertEqual(result, True)
+
+    def test_enforce_admin_only_subattribute(self):
+        action = "create_something"
+        target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
+                                                'sub_attr_2': 'y'}}
+        result = policy.enforce(context.get_admin_context(),
+                                action, target, None)
+        self.assertEqual(result, True)
+
+    def test_enforce_admin_only_subattribute_nonadminctx_returns_403(self):
+        action = "create_something"
+        target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
+                                                'sub_attr_2': 'y'}}
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, target, None)
+
     def test_enforce_regularuser_on_read(self):
         action = "get_network"
         target = {'shared': True, 'tenant_id': 'somebody_else'}