From: Kevin Benton Date: Thu, 11 Jun 2015 13:59:45 +0000 (-0700) Subject: Add ownership validation hook X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=99a0947aaab4bb832931a0cce02b0d152c0c0af8;p=openstack-build%2Fneutron-build.git Add ownership validation hook Add hook to validate same ownership between objects that aren't shared (e.g. port and network or port and subnet). Partially-Implements: blueprint wsgi-pecan-switch Change-Id: I2cb8dd6c49bac05d30a1f46408cabd5b73663c0b --- diff --git a/neutron/newapi/app.py b/neutron/newapi/app.py index 4792868a0..e4e3ff515 100644 --- a/neutron/newapi/app.py +++ b/neutron/newapi/app.py @@ -45,7 +45,8 @@ def setup_app(*args, **kwargs): hooks.ExceptionTranslationHook(), # priority 100 hooks.ContextHook(), # priority 95 hooks.ResourceIdentifierHook(), # priority 95 - hooks.AttributePopulationHook(), # priority 120 (depends on 2 above) + hooks.AttributePopulationHook(), # priority 120 + hooks.OwnershipValidationHook(), # priority 125 ] app = pecan.make_app( diff --git a/neutron/newapi/hooks/__init__.py b/neutron/newapi/hooks/__init__.py index 1a7add439..d2799b57d 100644 --- a/neutron/newapi/hooks/__init__.py +++ b/neutron/newapi/hooks/__init__.py @@ -15,6 +15,7 @@ from neutron.newapi.hooks import attribute_population from neutron.newapi.hooks import context +from neutron.newapi.hooks import ownership_validation from neutron.newapi.hooks import resource_identifier from neutron.newapi.hooks import translation @@ -23,3 +24,4 @@ ExceptionTranslationHook = translation.ExceptionTranslationHook ContextHook = context.ContextHook ResourceIdentifierHook = resource_identifier.ResourceIdentifierHook AttributePopulationHook = attribute_population.AttributePopulationHook +OwnershipValidationHook = ownership_validation.OwnershipValidationHook diff --git a/neutron/newapi/hooks/ownership_validation.py b/neutron/newapi/hooks/ownership_validation.py new file mode 100644 index 000000000..d121a03c7 --- /dev/null +++ b/neutron/newapi/hooks/ownership_validation.py @@ -0,0 +1,54 @@ +# Copyright (c) 2015 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from pecan import hooks +import webob + +from neutron import manager + + +class OwnershipValidationHook(hooks.PecanHook): + + priority = 125 + + def before(self, state): + if state.request.method != 'POST': + return + items = state.request.resources + for item in items: + self._validate_network_tenant_ownership(state.request, item) + + def _validate_network_tenant_ownership(self, request, resource_item): + # TODO(salvatore-orlando): consider whether this check can be folded + # in the policy engine + rtype = request.resource_type + if rtype not in ('port', 'subnet'): + return + plugin = manager.NeutronManager.get_plugin() + network = plugin.get_network(request.context, + resource_item['network_id']) + # do not perform the check on shared networks + if network.get('shared'): + return + + network_owner = network['tenant_id'] + + if network_owner != resource_item['tenant_id']: + msg = _("Tenant %(tenant_id)s not allowed to " + "create %(resource)s on this network") + raise webob.exc.HTTPForbidden(msg % { + "tenant_id": resource_item['tenant_id'], + "resource": rtype, + }) diff --git a/neutron/tests/functional/newapi/test_functional.py b/neutron/tests/functional/newapi/test_functional.py index 84620506a..47c01f7b2 100644 --- a/neutron/tests/functional/newapi/test_functional.py +++ b/neutron/tests/functional/newapi/test_functional.py @@ -200,3 +200,16 @@ class TestRequestPopulatingHooks(PecanFunctionalTest): self.assertEqual( manager.NeutronManager.get_service_plugins()['L3_ROUTER_NAT'], self.req_stash['plugin']) + + +class TestEnforcementHooks(PecanFunctionalTest): + + def test_network_ownership_check(self): + # TODO(kevinbenton): get a scenario that passes attribute population + self.skipTest("Attribute population blocks this test as-is") + response = self.app.post_json('/v2.0/ports.json', + params={'port': {'network_id': self.port['network_id'], + 'admin_state_up': True, + 'tenant_id': 'tenid2'}}, + headers={'X-Tenant-Id': 'tenid'}) + self.assertEqual(response.status_int, 200)