]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Pecan: Fixes and tests for the policy enforcement hook
authorSalvatore Orlando <salv.orlando@gmail.com>
Tue, 13 Oct 2015 22:08:47 +0000 (15:08 -0700)
committerSalvatore Orlando <salv.orlando@gmail.com>
Wed, 13 Jan 2016 16:05:07 +0000 (16:05 +0000)
As PolicyNotAuthorizedException is raised in a hook, the
ExceptionTranslationHook is not invoked for it; therefore a 500
response is returned whereas a 403 was expected. This patch
explicitly handles the exception in the hook in order to ensure
the appropriate response code is returned.

Moreover, the structure of the 'before' hook prevented checks
on DELETE requests from being performed. As a result the check
was not performed at all (checks on the 'after' hook only pertain
GET requests). This patch changes the logic of the 'before' hook
by ensuring the item to authorize acces to is loaded both on PUT
and DELETE requests.

This patch also adds functional tests specific for the policy
enforcement hook.

Change-Id: I8c76cb05568df47648cff71a107cfe701b286bb7
Closes-Bug: #1520180
Closes-Bug: #1505831

neutron/pecan_wsgi/hooks/policy_enforcement.py
neutron/tests/functional/pecan_wsgi/test_functional.py

index cfc70d5aa3493db103e907c0b1f2e52aad7deaf9..e505eea482d8ccff8fb1dcb7ce8e8952d06c62f7 100644 (file)
@@ -46,30 +46,44 @@ class PolicyHook(hooks.PecanHook):
     def before(self, state):
         # This hook should be run only for PUT,POST and DELETE methods and for
         # requests targeting a neutron resource
-        # FIXME(salv-orlando): DELETE support. It does not work with the
-        # current logic. See LP Bug #1520180
-        items = state.request.context.get('resources')
-        if state.request.method not in ('POST', 'PUT') or not items:
+        resources = state.request.context.get('resources', [])
+        if state.request.method not in ('POST', 'PUT', 'DELETE'):
             return
+        # As this routine will likely alter the resources, do a shallow copy
+        resources_copy = resources[:]
         neutron_context = state.request.context.get('neutron_context')
         resource = state.request.context.get('resource')
+        # If there is no resource for this request, don't bother running authZ
+        # policies
+        if not resource:
+            return
         collection = state.request.context.get('collection')
-        is_update = (state.request.method == 'PUT')
+        needs_prefetch = (state.request.method == 'PUT' or
+                          state.request.method == 'DELETE')
         policy.init()
         action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource)
 
         # NOTE(salv-orlando): As bulk updates are not supported, in case of PUT
         # requests there will be only a single item to process, and its
-        # identifier would have been already retrieved by the lookup process
-        for item in items:
-            if is_update:
-                resource_id = state.request.context.get('resource_id')
-                obj = copy.copy(self._fetch_resource(neutron_context,
-                                                     resource,
-                                                     resource_id))
-                obj.update(item)
-                obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
-                item = obj
+        # identifier would have been already retrieved by the lookup process;
+        # in the case of DELETE requests there won't be any item to process in
+        # the request body
+        if needs_prefetch:
+            try:
+                item = resources_copy.pop()
+            except IndexError:
+                # Ops... this was a delete after all!
+                item = {}
+            resource_id = state.request.context.get('resource_id')
+            obj = copy.copy(self._fetch_resource(neutron_context,
+                                                 resource,
+                                                 resource_id))
+            obj.update(item)
+            obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
+            # Put back the item in the list so that policies could be enforced
+            resources_copy.append(obj)
+
+        for item in resources_copy:
             try:
                 policy.enforce(
                     neutron_context, action, item,
@@ -79,8 +93,8 @@ class PolicyHook(hooks.PecanHook):
                     # If a tenant is modifying it's own object, it's safe to
                     # return a 403. Otherwise, pretend that it doesn't exist
                     # to avoid giving away information.
-                    if (is_update and
-                            neutron_context.tenant_id != obj['tenant_id']):
+                    if (needs_prefetch and
+                        neutron_context.tenant_id != item['tenant_id']):
                         ctxt.reraise = False
                 msg = _('The resource could not be found.')
                 raise webob.exc.HTTPNotFound(msg)
@@ -110,13 +124,20 @@ class PolicyHook(hooks.PecanHook):
         # in the plural case, we just check so violating items are hidden
         policy_method = policy.enforce if is_single else policy.check
         plugin = manager.NeutronManager.get_plugin_for_resource(resource)
-        resp = [self._get_filtered_item(state.request, resource,
-                                        collection, item)
-                for item in to_process
-                if (state.request.method != 'GET' or
-                    policy_method(neutron_context, action, item,
-                                  plugin=plugin,
-                                  pluralized=collection))]
+        try:
+            resp = [self._get_filtered_item(state.request, resource,
+                                            collection, item)
+                    for item in to_process
+                    if (state.request.method != 'GET' or
+                        policy_method(neutron_context, action, item,
+                                      plugin=plugin,
+                                      pluralized=collection))]
+        except oslo_policy.PolicyNotAuthorized as e:
+            # This exception must be explicitly caught as the exception
+            # translation hook won't be called if an error occurs in the
+            # 'after' handler.
+            raise webob.exc.HTTPForbidden(e.message)
+
         if is_single:
             resp = resp[0]
         data[key] = resp
index f84901a0dace4e02c7a61e5e77a096a2ac60df8d..2c05b20fb0b8fe3ef8b3a2411bede171b9e281dd 100644 (file)
@@ -17,6 +17,7 @@ import os
 
 import mock
 from oslo_config import cfg
+from oslo_policy import policy as oslo_policy
 from oslo_serialization import jsonutils
 from oslo_utils import uuidutils
 from pecan import request
@@ -30,6 +31,7 @@ from neutron.common import exceptions as n_exc
 from neutron import context
 from neutron import manager
 from neutron.pecan_wsgi.controllers import root as controllers
+from neutron import policy
 from neutron.tests.unit import testlib_api
 
 
@@ -293,9 +295,139 @@ class TestEnforcementHooks(PecanFunctionalTest):
         # TODO(kevinbenton): this test should do something
         pass
 
-    def test_policy_enforcement(self):
-        # TODO(kevinbenton): this test should do something
-        pass
+
+class TestPolicyEnforcementHook(PecanFunctionalTest):
+
+    FAKE_RESOURCE = {
+        'mehs': {
+            'id': {'allow_post': False, 'allow_put': False,
+                   'is_visible': True, 'primary_key': True},
+            'attr': {'allow_post': True, 'allow_put': True,
+                     'is_visible': True, 'default': ''},
+            'restricted_attr': {'allow_post': True, 'allow_put': True,
+                                'is_visible': True, 'default': ''},
+            'tenant_id': {'allow_post': True, 'allow_put': False,
+                          'required_by_policy': True,
+                          'validate': {'type:string':
+                                       attributes.TENANT_ID_MAX_LEN},
+                          'is_visible': True}
+        }
+    }
+
+    def setUp(self):
+        # Create a controller for a fake resource. This will make the tests
+        # independent from the evolution of the API (so if one changes the API
+        # or the default policies there won't be any risk of breaking these
+        # tests, or at least I hope so)
+        super(TestPolicyEnforcementHook, self).setUp()
+        self.mock_plugin = mock.Mock()
+        attributes.RESOURCE_ATTRIBUTE_MAP.update(self.FAKE_RESOURCE)
+        attributes.PLURALS['mehs'] = 'meh'
+        manager.NeutronManager.set_plugin_for_resource('meh', self.mock_plugin)
+        fake_controller = controllers.CollectionsController('mehs', 'meh')
+        manager.NeutronManager.set_controller_for_resource(
+            'mehs', fake_controller)
+        # Inject policies for the fake resource
+        policy.init()
+        policy._ENFORCER.set_rules(
+            oslo_policy.Rules.from_dict(
+                {'create_meh': '',
+                 'update_meh': 'rule:admin_only',
+                 'delete_meh': 'rule:admin_only',
+                 'get_meh': 'rule:admin_only or field:mehs:id=xxx',
+                 'get_meh:restricted_attr': 'rule:admin_only'}),
+            overwrite=False)
+
+    def test_before_on_create_authorized(self):
+        # Mock a return value for an hypothetical create operation
+        self.mock_plugin.create_meh.return_value = {
+            'id': 'xxx',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}
+        response = self.app.post_json('/v2.0/mehs.json',
+                                      params={'meh': {'attr': 'meh'}},
+                                      headers={'X-Project-Id': 'tenid'})
+        # We expect this operation to succeed
+        self.assertEqual(201, response.status_int)
+        self.assertEqual(0, self.mock_plugin.get_meh.call_count)
+        self.assertEqual(1, self.mock_plugin.create_meh.call_count)
+
+    def test_before_on_put_not_authorized(self):
+        # The policy hook here should load the resource, and therefore we must
+        # mock a get response
+        self.mock_plugin.get_meh.return_value = {
+            'id': 'xxx',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}
+        # The policy engine should trigger an exception in 'before', and the
+        # plugin method should not be called at all
+        response = self.app.put_json('/v2.0/mehs/xxx.json',
+                                     params={'meh': {'attr': 'meh'}},
+                                     headers={'X-Project-Id': 'tenid'},
+                                     expect_errors=True)
+        self.assertEqual(403, response.status_int)
+        self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+        self.assertEqual(0, self.mock_plugin.update_meh.call_count)
+
+    def test_before_on_delete_not_authorized(self):
+        # The policy hook here should load the resource, and therefore we must
+        # mock a get response
+        self.mock_plugin.delete_meh.return_value = None
+        self.mock_plugin.get_meh.return_value = {
+            'id': 'xxx',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}
+        # The policy engine should trigger an exception in 'before', and the
+        # plugin method should not be called
+        response = self.app.delete_json('/v2.0/mehs/xxx.json',
+                                        headers={'X-Project-Id': 'tenid'},
+                                        expect_errors=True)
+        self.assertEqual(403, response.status_int)
+        self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+        self.assertEqual(0, self.mock_plugin.delete_meh.call_count)
+
+    def test_after_on_get_not_authorized(self):
+        # The GET test policy will deny access to anything whose id is not
+        # 'xxx', so the following request should be forbidden
+        self.mock_plugin.get_meh.return_value = {
+            'id': 'yyy',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}
+        # The policy engine should trigger an exception in 'after', and the
+        # plugin method should be called
+        response = self.app.get('/v2.0/mehs/yyy.json',
+                                headers={'X-Project-Id': 'tenid'},
+                                expect_errors=True)
+        self.assertEqual(403, response.status_int)
+        self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+
+    def test_after_on_get_excludes_admin_attribute(self):
+        self.mock_plugin.get_meh.return_value = {
+            'id': 'xxx',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}
+        response = self.app.get('/v2.0/mehs/xxx.json',
+                                headers={'X-Project-Id': 'tenid'})
+        self.assertEqual(200, response.status_int)
+        json_response = jsonutils.loads(response.body)
+        self.assertNotIn('restricted_attr', json_response['meh'])
+
+    def test_after_on_list_excludes_admin_attribute(self):
+        self.mock_plugin.get_mehs.return_value = [{
+            'id': 'xxx',
+            'attr': 'meh',
+            'restricted_attr': '',
+            'tenant_id': 'tenid'}]
+        response = self.app.get('/v2.0/mehs',
+                                headers={'X-Project-Id': 'tenid'})
+        self.assertEqual(200, response.status_int)
+        json_response = jsonutils.loads(response.body)
+        self.assertNotIn('restricted_attr', json_response['mehs'][0])
 
 
 class TestRootController(PecanFunctionalTest):