hooks.ExceptionTranslationHook(), # priority 100
hooks.ContextHook(), # priority 95
hooks.ResourceIdentifierHook(), # priority 95
- hooks.AttributePopulationHook(), # priority 120 (depends on 2 above)
+ hooks.AttributePopulationHook(), # priority 120
+ hooks.OwnershipValidationHook(), # priority 125
]
app = pecan.make_app(
from neutron.newapi.hooks import attribute_population
from neutron.newapi.hooks import context
+from neutron.newapi.hooks import ownership_validation
from neutron.newapi.hooks import resource_identifier
from neutron.newapi.hooks import translation
ContextHook = context.ContextHook
ResourceIdentifierHook = resource_identifier.ResourceIdentifierHook
AttributePopulationHook = attribute_population.AttributePopulationHook
+OwnershipValidationHook = ownership_validation.OwnershipValidationHook
--- /dev/null
+# Copyright (c) 2015 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from pecan import hooks
+import webob
+
+from neutron import manager
+
+
+class OwnershipValidationHook(hooks.PecanHook):
+
+ priority = 125
+
+ def before(self, state):
+ if state.request.method != 'POST':
+ return
+ items = state.request.resources
+ for item in items:
+ self._validate_network_tenant_ownership(state.request, item)
+
+ def _validate_network_tenant_ownership(self, request, resource_item):
+ # TODO(salvatore-orlando): consider whether this check can be folded
+ # in the policy engine
+ rtype = request.resource_type
+ if rtype not in ('port', 'subnet'):
+ return
+ plugin = manager.NeutronManager.get_plugin()
+ network = plugin.get_network(request.context,
+ resource_item['network_id'])
+ # do not perform the check on shared networks
+ if network.get('shared'):
+ return
+
+ network_owner = network['tenant_id']
+
+ if network_owner != resource_item['tenant_id']:
+ msg = _("Tenant %(tenant_id)s not allowed to "
+ "create %(resource)s on this network")
+ raise webob.exc.HTTPForbidden(msg % {
+ "tenant_id": resource_item['tenant_id'],
+ "resource": rtype,
+ })
self.assertEqual(
manager.NeutronManager.get_service_plugins()['L3_ROUTER_NAT'],
self.req_stash['plugin'])
+
+
+class TestEnforcementHooks(PecanFunctionalTest):
+
+ def test_network_ownership_check(self):
+ # TODO(kevinbenton): get a scenario that passes attribute population
+ self.skipTest("Attribute population blocks this test as-is")
+ response = self.app.post_json('/v2.0/ports.json',
+ params={'port': {'network_id': self.port['network_id'],
+ 'admin_state_up': True,
+ 'tenant_id': 'tenid2'}},
+ headers={'X-Tenant-Id': 'tenid'})
+ self.assertEqual(response.status_int, 200)