def before(self, state):
# This hook should be run only for PUT,POST and DELETE methods and for
# requests targeting a neutron resource
- # FIXME(salv-orlando): DELETE support. It does not work with the
- # current logic. See LP Bug #1520180
- items = state.request.context.get('resources')
- if state.request.method not in ('POST', 'PUT') or not items:
+ resources = state.request.context.get('resources', [])
+ if state.request.method not in ('POST', 'PUT', 'DELETE'):
return
+ # As this routine will likely alter the resources, do a shallow copy
+ resources_copy = resources[:]
neutron_context = state.request.context.get('neutron_context')
resource = state.request.context.get('resource')
+ # If there is no resource for this request, don't bother running authZ
+ # policies
+ if not resource:
+ return
collection = state.request.context.get('collection')
- is_update = (state.request.method == 'PUT')
+ needs_prefetch = (state.request.method == 'PUT' or
+ state.request.method == 'DELETE')
policy.init()
action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource)
# NOTE(salv-orlando): As bulk updates are not supported, in case of PUT
# requests there will be only a single item to process, and its
- # identifier would have been already retrieved by the lookup process
- for item in items:
- if is_update:
- resource_id = state.request.context.get('resource_id')
- obj = copy.copy(self._fetch_resource(neutron_context,
- resource,
- resource_id))
- obj.update(item)
- obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
- item = obj
+ # identifier would have been already retrieved by the lookup process;
+ # in the case of DELETE requests there won't be any item to process in
+ # the request body
+ if needs_prefetch:
+ try:
+ item = resources_copy.pop()
+ except IndexError:
+ # Ops... this was a delete after all!
+ item = {}
+ resource_id = state.request.context.get('resource_id')
+ obj = copy.copy(self._fetch_resource(neutron_context,
+ resource,
+ resource_id))
+ obj.update(item)
+ obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
+ # Put back the item in the list so that policies could be enforced
+ resources_copy.append(obj)
+
+ for item in resources_copy:
try:
policy.enforce(
neutron_context, action, item,
# If a tenant is modifying it's own object, it's safe to
# return a 403. Otherwise, pretend that it doesn't exist
# to avoid giving away information.
- if (is_update and
- neutron_context.tenant_id != obj['tenant_id']):
+ if (needs_prefetch and
+ neutron_context.tenant_id != item['tenant_id']):
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
# in the plural case, we just check so violating items are hidden
policy_method = policy.enforce if is_single else policy.check
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
- resp = [self._get_filtered_item(state.request, resource,
- collection, item)
- for item in to_process
- if (state.request.method != 'GET' or
- policy_method(neutron_context, action, item,
- plugin=plugin,
- pluralized=collection))]
+ try:
+ resp = [self._get_filtered_item(state.request, resource,
+ collection, item)
+ for item in to_process
+ if (state.request.method != 'GET' or
+ policy_method(neutron_context, action, item,
+ plugin=plugin,
+ pluralized=collection))]
+ except oslo_policy.PolicyNotAuthorized as e:
+ # This exception must be explicitly caught as the exception
+ # translation hook won't be called if an error occurs in the
+ # 'after' handler.
+ raise webob.exc.HTTPForbidden(e.message)
+
if is_single:
resp = resp[0]
data[key] = resp
import mock
from oslo_config import cfg
+from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from pecan import request
from neutron import context
from neutron import manager
from neutron.pecan_wsgi.controllers import root as controllers
+from neutron import policy
from neutron.tests.unit import testlib_api
# TODO(kevinbenton): this test should do something
pass
- def test_policy_enforcement(self):
- # TODO(kevinbenton): this test should do something
- pass
+
+class TestPolicyEnforcementHook(PecanFunctionalTest):
+
+ FAKE_RESOURCE = {
+ 'mehs': {
+ 'id': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True, 'primary_key': True},
+ 'attr': {'allow_post': True, 'allow_put': True,
+ 'is_visible': True, 'default': ''},
+ 'restricted_attr': {'allow_post': True, 'allow_put': True,
+ 'is_visible': True, 'default': ''},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True,
+ 'validate': {'type:string':
+ attributes.TENANT_ID_MAX_LEN},
+ 'is_visible': True}
+ }
+ }
+
+ def setUp(self):
+ # Create a controller for a fake resource. This will make the tests
+ # independent from the evolution of the API (so if one changes the API
+ # or the default policies there won't be any risk of breaking these
+ # tests, or at least I hope so)
+ super(TestPolicyEnforcementHook, self).setUp()
+ self.mock_plugin = mock.Mock()
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(self.FAKE_RESOURCE)
+ attributes.PLURALS['mehs'] = 'meh'
+ manager.NeutronManager.set_plugin_for_resource('meh', self.mock_plugin)
+ fake_controller = controllers.CollectionsController('mehs', 'meh')
+ manager.NeutronManager.set_controller_for_resource(
+ 'mehs', fake_controller)
+ # Inject policies for the fake resource
+ policy.init()
+ policy._ENFORCER.set_rules(
+ oslo_policy.Rules.from_dict(
+ {'create_meh': '',
+ 'update_meh': 'rule:admin_only',
+ 'delete_meh': 'rule:admin_only',
+ 'get_meh': 'rule:admin_only or field:mehs:id=xxx',
+ 'get_meh:restricted_attr': 'rule:admin_only'}),
+ overwrite=False)
+
+ def test_before_on_create_authorized(self):
+ # Mock a return value for an hypothetical create operation
+ self.mock_plugin.create_meh.return_value = {
+ 'id': 'xxx',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}
+ response = self.app.post_json('/v2.0/mehs.json',
+ params={'meh': {'attr': 'meh'}},
+ headers={'X-Project-Id': 'tenid'})
+ # We expect this operation to succeed
+ self.assertEqual(201, response.status_int)
+ self.assertEqual(0, self.mock_plugin.get_meh.call_count)
+ self.assertEqual(1, self.mock_plugin.create_meh.call_count)
+
+ def test_before_on_put_not_authorized(self):
+ # The policy hook here should load the resource, and therefore we must
+ # mock a get response
+ self.mock_plugin.get_meh.return_value = {
+ 'id': 'xxx',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}
+ # The policy engine should trigger an exception in 'before', and the
+ # plugin method should not be called at all
+ response = self.app.put_json('/v2.0/mehs/xxx.json',
+ params={'meh': {'attr': 'meh'}},
+ headers={'X-Project-Id': 'tenid'},
+ expect_errors=True)
+ self.assertEqual(403, response.status_int)
+ self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+ self.assertEqual(0, self.mock_plugin.update_meh.call_count)
+
+ def test_before_on_delete_not_authorized(self):
+ # The policy hook here should load the resource, and therefore we must
+ # mock a get response
+ self.mock_plugin.delete_meh.return_value = None
+ self.mock_plugin.get_meh.return_value = {
+ 'id': 'xxx',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}
+ # The policy engine should trigger an exception in 'before', and the
+ # plugin method should not be called
+ response = self.app.delete_json('/v2.0/mehs/xxx.json',
+ headers={'X-Project-Id': 'tenid'},
+ expect_errors=True)
+ self.assertEqual(403, response.status_int)
+ self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+ self.assertEqual(0, self.mock_plugin.delete_meh.call_count)
+
+ def test_after_on_get_not_authorized(self):
+ # The GET test policy will deny access to anything whose id is not
+ # 'xxx', so the following request should be forbidden
+ self.mock_plugin.get_meh.return_value = {
+ 'id': 'yyy',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}
+ # The policy engine should trigger an exception in 'after', and the
+ # plugin method should be called
+ response = self.app.get('/v2.0/mehs/yyy.json',
+ headers={'X-Project-Id': 'tenid'},
+ expect_errors=True)
+ self.assertEqual(403, response.status_int)
+ self.assertEqual(1, self.mock_plugin.get_meh.call_count)
+
+ def test_after_on_get_excludes_admin_attribute(self):
+ self.mock_plugin.get_meh.return_value = {
+ 'id': 'xxx',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}
+ response = self.app.get('/v2.0/mehs/xxx.json',
+ headers={'X-Project-Id': 'tenid'})
+ self.assertEqual(200, response.status_int)
+ json_response = jsonutils.loads(response.body)
+ self.assertNotIn('restricted_attr', json_response['meh'])
+
+ def test_after_on_list_excludes_admin_attribute(self):
+ self.mock_plugin.get_mehs.return_value = [{
+ 'id': 'xxx',
+ 'attr': 'meh',
+ 'restricted_attr': '',
+ 'tenant_id': 'tenid'}]
+ response = self.app.get('/v2.0/mehs',
+ headers={'X-Project-Id': 'tenid'})
+ self.assertEqual(200, response.status_int)
+ json_response = jsonutils.loads(response.body)
+ self.assertNotIn('restricted_attr', json_response['mehs'][0])
class TestRootController(PecanFunctionalTest):