]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Validate that network_id in port/subnet POST belong to the same tenant
authorJuliano Martinez <juliano@martinez.io>
Thu, 5 Jul 2012 02:32:59 +0000 (23:32 -0300)
committerJuliano Martinez <juliano.martinez@locaweb.com.br>
Thu, 5 Jul 2012 17:58:11 +0000 (14:58 -0300)
Bug 1014989

Change-Id: I17b619c502afb35fe0829e41a7d0f997d60998fa

quantum/api/v2/base.py
quantum/tests/unit/test_api_v2.py
quantum/tests/unit/test_db_plugin.py

index a35d29a2ae942273ad3c78209ae3ad7c7d1a64e4..6269bb341f0eaeead849a735590f60805b984606 100644 (file)
@@ -167,9 +167,20 @@ class Controller(object):
             if self._collection in body:
                 # Have to account for bulk create
                 for item in body[self._collection]:
-                    policy.enforce(request.context, action,
-                                   item[self._resource])
+                    self._validate_network_tenant_ownership(
+                        request,
+                        item[self._resource],
+                    )
+                    policy.enforce(
+                        request.context,
+                        action,
+                        item[self._resource],
+                    )
             else:
+                self._validate_network_tenant_ownership(
+                    request,
+                    body[self._resource]
+                )
                 policy.enforce(request.context, action, body[self._resource])
         except exceptions.PolicyNotAuthorized:
             raise webob.exc.HTTPForbidden()
@@ -297,6 +308,23 @@ class Controller(object):
 
         return body
 
+    def _validate_network_tenant_ownership(self, request, resource_item):
+        if self._resource not in ('port', 'subnet'):
+            return
+
+        network_owner = self._plugin.get_network(
+            request.context,
+            resource_item['network_id'],
+        )['tenant_id']
+
+        if network_owner != resource_item['tenant_id']:
+            msg = _("Tenant %(tenant_id)s not allowed to "
+                    "create %(resource)s on this network")
+            raise webob.exc.HTTPForbidden(msg % {
+                "tenant_id": resource_item['tenant_id'],
+                "resource": self._resource,
+            })
+
 
 def create_resource(collection, resource, plugin, params):
     controller = Controller(plugin, collection, resource, params)
index e42626f3351b59d0611135a17e97bea670ab55c0..a0a375efdfb124e467676c552bbb7b25eca9d26b 100644 (file)
@@ -560,6 +560,7 @@ class JSONV2TestCase(APIv2TestCase):
         return_value.update(initial_input['port'])
 
         instance = self.plugin.return_value
+        instance.get_network.return_value = {'tenant_id': unicode(tenant_id)}
         instance.create_port.return_value = return_value
         res = self.api.post_json(_get_path('ports'), initial_input)
 
index 4430ccb94807b6bd450afb404fc52950787e565b..c54f70e24aaca52579b9ee8955949b34be87cae6 100644 (file)
@@ -116,7 +116,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
                        allocation_pools=None, ip_version=4):
         data = {'subnet': {'network_id': net_id,
                            'cidr': cidr,
-                           'ip_version': ip_version}}
+                           'ip_version': ip_version,
+                           'tenant_id': self._tenant_id}}
         if gateway_ip:
             data['subnet']['gateway_ip'] = gateway_ip
         if allocation_pools:
@@ -249,7 +250,6 @@ class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
 
 
 class TestPortsV2(QuantumDbPluginV2TestCase):
-
     def test_create_port_json(self):
         keys = [('admin_state_up', True), ('status', 'ACTIVE')]
         with self.port() as port:
@@ -260,6 +260,18 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
             self.assertEquals(len(ips), 1)
             self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
 
+    def test_create_port_bad_tenant(self):
+        with self.network() as network:
+            data = {'port': {'network_id': network['network']['id'],
+                             'tenant_id': 'bad_tenant_id',
+                             'admin_state_up': True,
+                             'device_id': 'fake_device',
+                             'fixed_ips': []}}
+
+            port_req = self.new_create_request('ports', data)
+            res = port_req.get_response(self.api)
+            self.assertEquals(res.status_int, 403)
+
     def test_list_ports(self):
         with contextlib.nested(self.port(), self.port()) as (port1, port2):
             req = self.new_list_request('ports', 'json')
@@ -746,6 +758,18 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
         res = req.get_response(self.api)
         self.assertEquals(res.status_int, 204)
 
+    def test_create_subnet_bad_tenant(self):
+        with self.network() as network:
+            data = {'subnet': {'network_id': network['network']['id'],
+                               'cidr': '10.0.2.0/24',
+                               'ip_version': 4,
+                               'tenant_id': 'bad_tenant_id',
+                               'gateway_ip': '10.0.2.1'}}
+
+            subnet_req = self.new_create_request('subnets', data)
+            res = subnet_req.get_response(self.api)
+            self.assertEquals(res.status_int, 403)
+
     def test_create_subnet_defaults(self):
         gateway = '10.0.0.1'
         cidr = '10.0.0.0/24'