From: Salvatore Orlando Date: Tue, 13 Oct 2015 22:08:47 +0000 (-0700) Subject: Pecan: Fixes and tests for the policy enforcement hook X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=293c3e01efce74d110ff34703a9e68ce2cd782e6;p=openstack-build%2Fneutron-build.git Pecan: Fixes and tests for the policy enforcement hook As PolicyNotAuthorizedException is raised in a hook, the ExceptionTranslationHook is not invoked for it; therefore a 500 response is returned whereas a 403 was expected. This patch explicitly handles the exception in the hook in order to ensure the appropriate response code is returned. Moreover, the structure of the 'before' hook prevented checks on DELETE requests from being performed. As a result the check was not performed at all (checks on the 'after' hook only pertain GET requests). This patch changes the logic of the 'before' hook by ensuring the item to authorize acces to is loaded both on PUT and DELETE requests. This patch also adds functional tests specific for the policy enforcement hook. Change-Id: I8c76cb05568df47648cff71a107cfe701b286bb7 Closes-Bug: #1520180 Closes-Bug: #1505831 --- diff --git a/neutron/pecan_wsgi/hooks/policy_enforcement.py b/neutron/pecan_wsgi/hooks/policy_enforcement.py index cfc70d5aa..e505eea48 100644 --- a/neutron/pecan_wsgi/hooks/policy_enforcement.py +++ b/neutron/pecan_wsgi/hooks/policy_enforcement.py @@ -46,30 +46,44 @@ class PolicyHook(hooks.PecanHook): def before(self, state): # This hook should be run only for PUT,POST and DELETE methods and for # requests targeting a neutron resource - # FIXME(salv-orlando): DELETE support. It does not work with the - # current logic. See LP Bug #1520180 - items = state.request.context.get('resources') - if state.request.method not in ('POST', 'PUT') or not items: + resources = state.request.context.get('resources', []) + if state.request.method not in ('POST', 'PUT', 'DELETE'): return + # As this routine will likely alter the resources, do a shallow copy + resources_copy = resources[:] neutron_context = state.request.context.get('neutron_context') resource = state.request.context.get('resource') + # If there is no resource for this request, don't bother running authZ + # policies + if not resource: + return collection = state.request.context.get('collection') - is_update = (state.request.method == 'PUT') + needs_prefetch = (state.request.method == 'PUT' or + state.request.method == 'DELETE') policy.init() action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource) # NOTE(salv-orlando): As bulk updates are not supported, in case of PUT # requests there will be only a single item to process, and its - # identifier would have been already retrieved by the lookup process - for item in items: - if is_update: - resource_id = state.request.context.get('resource_id') - obj = copy.copy(self._fetch_resource(neutron_context, - resource, - resource_id)) - obj.update(item) - obj[const.ATTRIBUTES_TO_UPDATE] = item.keys() - item = obj + # identifier would have been already retrieved by the lookup process; + # in the case of DELETE requests there won't be any item to process in + # the request body + if needs_prefetch: + try: + item = resources_copy.pop() + except IndexError: + # Ops... this was a delete after all! + item = {} + resource_id = state.request.context.get('resource_id') + obj = copy.copy(self._fetch_resource(neutron_context, + resource, + resource_id)) + obj.update(item) + obj[const.ATTRIBUTES_TO_UPDATE] = item.keys() + # Put back the item in the list so that policies could be enforced + resources_copy.append(obj) + + for item in resources_copy: try: policy.enforce( neutron_context, action, item, @@ -79,8 +93,8 @@ class PolicyHook(hooks.PecanHook): # If a tenant is modifying it's own object, it's safe to # return a 403. Otherwise, pretend that it doesn't exist # to avoid giving away information. - if (is_update and - neutron_context.tenant_id != obj['tenant_id']): + if (needs_prefetch and + neutron_context.tenant_id != item['tenant_id']): ctxt.reraise = False msg = _('The resource could not be found.') raise webob.exc.HTTPNotFound(msg) @@ -110,13 +124,20 @@ class PolicyHook(hooks.PecanHook): # in the plural case, we just check so violating items are hidden policy_method = policy.enforce if is_single else policy.check plugin = manager.NeutronManager.get_plugin_for_resource(resource) - resp = [self._get_filtered_item(state.request, resource, - collection, item) - for item in to_process - if (state.request.method != 'GET' or - policy_method(neutron_context, action, item, - plugin=plugin, - pluralized=collection))] + try: + resp = [self._get_filtered_item(state.request, resource, + collection, item) + for item in to_process + if (state.request.method != 'GET' or + policy_method(neutron_context, action, item, + plugin=plugin, + pluralized=collection))] + except oslo_policy.PolicyNotAuthorized as e: + # This exception must be explicitly caught as the exception + # translation hook won't be called if an error occurs in the + # 'after' handler. + raise webob.exc.HTTPForbidden(e.message) + if is_single: resp = resp[0] data[key] = resp diff --git a/neutron/tests/functional/pecan_wsgi/test_functional.py b/neutron/tests/functional/pecan_wsgi/test_functional.py index f84901a0d..2c05b20fb 100644 --- a/neutron/tests/functional/pecan_wsgi/test_functional.py +++ b/neutron/tests/functional/pecan_wsgi/test_functional.py @@ -17,6 +17,7 @@ import os import mock from oslo_config import cfg +from oslo_policy import policy as oslo_policy from oslo_serialization import jsonutils from oslo_utils import uuidutils from pecan import request @@ -30,6 +31,7 @@ from neutron.common import exceptions as n_exc from neutron import context from neutron import manager from neutron.pecan_wsgi.controllers import root as controllers +from neutron import policy from neutron.tests.unit import testlib_api @@ -293,9 +295,139 @@ class TestEnforcementHooks(PecanFunctionalTest): # TODO(kevinbenton): this test should do something pass - def test_policy_enforcement(self): - # TODO(kevinbenton): this test should do something - pass + +class TestPolicyEnforcementHook(PecanFunctionalTest): + + FAKE_RESOURCE = { + 'mehs': { + 'id': {'allow_post': False, 'allow_put': False, + 'is_visible': True, 'primary_key': True}, + 'attr': {'allow_post': True, 'allow_put': True, + 'is_visible': True, 'default': ''}, + 'restricted_attr': {'allow_post': True, 'allow_put': True, + 'is_visible': True, 'default': ''}, + 'tenant_id': {'allow_post': True, 'allow_put': False, + 'required_by_policy': True, + 'validate': {'type:string': + attributes.TENANT_ID_MAX_LEN}, + 'is_visible': True} + } + } + + def setUp(self): + # Create a controller for a fake resource. This will make the tests + # independent from the evolution of the API (so if one changes the API + # or the default policies there won't be any risk of breaking these + # tests, or at least I hope so) + super(TestPolicyEnforcementHook, self).setUp() + self.mock_plugin = mock.Mock() + attributes.RESOURCE_ATTRIBUTE_MAP.update(self.FAKE_RESOURCE) + attributes.PLURALS['mehs'] = 'meh' + manager.NeutronManager.set_plugin_for_resource('meh', self.mock_plugin) + fake_controller = controllers.CollectionsController('mehs', 'meh') + manager.NeutronManager.set_controller_for_resource( + 'mehs', fake_controller) + # Inject policies for the fake resource + policy.init() + policy._ENFORCER.set_rules( + oslo_policy.Rules.from_dict( + {'create_meh': '', + 'update_meh': 'rule:admin_only', + 'delete_meh': 'rule:admin_only', + 'get_meh': 'rule:admin_only or field:mehs:id=xxx', + 'get_meh:restricted_attr': 'rule:admin_only'}), + overwrite=False) + + def test_before_on_create_authorized(self): + # Mock a return value for an hypothetical create operation + self.mock_plugin.create_meh.return_value = { + 'id': 'xxx', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'} + response = self.app.post_json('/v2.0/mehs.json', + params={'meh': {'attr': 'meh'}}, + headers={'X-Project-Id': 'tenid'}) + # We expect this operation to succeed + self.assertEqual(201, response.status_int) + self.assertEqual(0, self.mock_plugin.get_meh.call_count) + self.assertEqual(1, self.mock_plugin.create_meh.call_count) + + def test_before_on_put_not_authorized(self): + # The policy hook here should load the resource, and therefore we must + # mock a get response + self.mock_plugin.get_meh.return_value = { + 'id': 'xxx', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'} + # The policy engine should trigger an exception in 'before', and the + # plugin method should not be called at all + response = self.app.put_json('/v2.0/mehs/xxx.json', + params={'meh': {'attr': 'meh'}}, + headers={'X-Project-Id': 'tenid'}, + expect_errors=True) + self.assertEqual(403, response.status_int) + self.assertEqual(1, self.mock_plugin.get_meh.call_count) + self.assertEqual(0, self.mock_plugin.update_meh.call_count) + + def test_before_on_delete_not_authorized(self): + # The policy hook here should load the resource, and therefore we must + # mock a get response + self.mock_plugin.delete_meh.return_value = None + self.mock_plugin.get_meh.return_value = { + 'id': 'xxx', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'} + # The policy engine should trigger an exception in 'before', and the + # plugin method should not be called + response = self.app.delete_json('/v2.0/mehs/xxx.json', + headers={'X-Project-Id': 'tenid'}, + expect_errors=True) + self.assertEqual(403, response.status_int) + self.assertEqual(1, self.mock_plugin.get_meh.call_count) + self.assertEqual(0, self.mock_plugin.delete_meh.call_count) + + def test_after_on_get_not_authorized(self): + # The GET test policy will deny access to anything whose id is not + # 'xxx', so the following request should be forbidden + self.mock_plugin.get_meh.return_value = { + 'id': 'yyy', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'} + # The policy engine should trigger an exception in 'after', and the + # plugin method should be called + response = self.app.get('/v2.0/mehs/yyy.json', + headers={'X-Project-Id': 'tenid'}, + expect_errors=True) + self.assertEqual(403, response.status_int) + self.assertEqual(1, self.mock_plugin.get_meh.call_count) + + def test_after_on_get_excludes_admin_attribute(self): + self.mock_plugin.get_meh.return_value = { + 'id': 'xxx', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'} + response = self.app.get('/v2.0/mehs/xxx.json', + headers={'X-Project-Id': 'tenid'}) + self.assertEqual(200, response.status_int) + json_response = jsonutils.loads(response.body) + self.assertNotIn('restricted_attr', json_response['meh']) + + def test_after_on_list_excludes_admin_attribute(self): + self.mock_plugin.get_mehs.return_value = [{ + 'id': 'xxx', + 'attr': 'meh', + 'restricted_attr': '', + 'tenant_id': 'tenid'}] + response = self.app.get('/v2.0/mehs', + headers={'X-Project-Id': 'tenid'}) + self.assertEqual(200, response.status_int) + json_response = jsonutils.loads(response.body) + self.assertNotIn('restricted_attr', json_response['mehs'][0]) class TestRootController(PecanFunctionalTest):