if self._collection in body:
# Have to account for bulk create
for item in body[self._collection]:
- policy.enforce(request.context, action,
- item[self._resource])
+ self._validate_network_tenant_ownership(
+ request,
+ item[self._resource],
+ )
+ policy.enforce(
+ request.context,
+ action,
+ item[self._resource],
+ )
else:
+ self._validate_network_tenant_ownership(
+ request,
+ body[self._resource]
+ )
policy.enforce(request.context, action, body[self._resource])
except exceptions.PolicyNotAuthorized:
raise webob.exc.HTTPForbidden()
return body
+ def _validate_network_tenant_ownership(self, request, resource_item):
+ if self._resource not in ('port', 'subnet'):
+ return
+
+ network_owner = self._plugin.get_network(
+ request.context,
+ resource_item['network_id'],
+ )['tenant_id']
+
+ if network_owner != resource_item['tenant_id']:
+ msg = _("Tenant %(tenant_id)s not allowed to "
+ "create %(resource)s on this network")
+ raise webob.exc.HTTPForbidden(msg % {
+ "tenant_id": resource_item['tenant_id'],
+ "resource": self._resource,
+ })
+
def create_resource(collection, resource, plugin, params):
controller = Controller(plugin, collection, resource, params)
allocation_pools=None, ip_version=4):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
- 'ip_version': ip_version}}
+ 'ip_version': ip_version,
+ 'tenant_id': self._tenant_id}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
if allocation_pools:
class TestPortsV2(QuantumDbPluginV2TestCase):
-
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', 'ACTIVE')]
with self.port() as port:
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ def test_create_port_bad_tenant(self):
+ with self.network() as network:
+ data = {'port': {'network_id': network['network']['id'],
+ 'tenant_id': 'bad_tenant_id',
+ 'admin_state_up': True,
+ 'device_id': 'fake_device',
+ 'fixed_ips': []}}
+
+ port_req = self.new_create_request('ports', data)
+ res = port_req.get_response(self.api)
+ self.assertEquals(res.status_int, 403)
+
def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
+ def test_create_subnet_bad_tenant(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 4,
+ 'tenant_id': 'bad_tenant_id',
+ 'gateway_ip': '10.0.2.1'}}
+
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 403)
+
def test_create_subnet_defaults(self):
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'