'subnet_id': result['subnet_id']})
return ips
+ def _validate_subnet_cidr(self, network, new_subnet_cidr):
+ """Validate the CIDR for a subnet.
+
+ Verifies the specified CIDR does not overlap with the ones defined
+ for the other subnets specified for this network.
+
+ """
+ for subnet in network.subnets:
+ if (netaddr.IPSet([subnet.cidr]) &
+ netaddr.IPSet([new_subnet_cidr])):
+ err_msg = ("Requested subnet with cidr: %s "
+ "for network: %s "
+ "overlaps with subnet: %s)" % (new_subnet_cidr,
+ network.id,
+ subnet.cidr))
+ LOG.error(err_msg)
+ raise q_exc.InvalidInput(error_message=err_msg)
+
def _validate_allocation_pools(self, ip_pools, gateway_ip, subnet_cidr):
"""Validate IP allocation pools.
with context.session.begin():
network = self._get_network(context, s["network_id"])
+ self._validate_subnet_cidr(network, s['cidr'])
subnet = models_v2.Subnet(network_id=s['network_id'],
ip_version=s['ip_version'],
cidr=s['cidr'],
class TestSubnetsV2(QuantumDbPluginV2TestCase):
- def _test_create_subnet(self, **kwargs):
+ def _test_create_subnet(self, network=None, **kwargs):
keys = kwargs.copy()
keys.setdefault('cidr', '10.0.0.0/24')
keys.setdefault('ip_version', 4)
- with self.subnet(**keys) as subnet:
+ with self.subnet(network=network, **keys) as subnet:
# verify the response has each key with the correct value
for k in keys:
self.assertIn(k, subnet['subnet'])
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr)
+ def test_create_two_subnets(self):
+ gateway_ips = ['10.0.0.1', '10.0.1.1']
+ cidrs = ['10.0.0.0/24', '10.0.1.0/24']
+ with self.network() as network:
+ with self.subnet(network=network,
+ gateway_ip=gateway_ips[0],
+ cidr=cidrs[0]):
+ with self.subnet(network=network,
+ gateway_ip=gateway_ips[1],
+ cidr=cidrs[1]):
+ net_req = self.new_show_request('networks',
+ network['network']['id'])
+ raw_res = net_req.get_response(self.api)
+ net_res = self.deserialize('json', raw_res)
+ for subnet_id in net_res['network']['subnets']:
+ sub_req = self.new_show_request('subnets', subnet_id)
+ raw_res = sub_req.get_response(self.api)
+ sub_res = self.deserialize('json', raw_res)
+ self.assertIn(sub_res['subnet']['cidr'], cidrs)
+ self.assertIn(sub_res['subnet']['gateway_ip'],
+ gateway_ips)
+
+ def test_create_two_subnets_same_cidr_returns_400(self):
+ gateway_ip_1 = '10.0.0.1'
+ cidr_1 = '10.0.0.0/24'
+ gateway_ip_2 = '10.0.0.10'
+ cidr_2 = '10.0.0.0/24'
+ with self.network() as network:
+ with self.subnet(network=network,
+ gateway_ip=gateway_ip_1,
+ cidr=cidr_1):
+ with self.assertRaises(
+ webob.exc.HTTPClientError) as ctx_manager:
+ with self.subnet(network=network,
+ gateway_ip=gateway_ip_2,
+ cidr=cidr_2):
+ pass
+ self.assertEquals(ctx_manager.exception.code, 400)
+
def test_delete_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway_ip='10.0.0.1',
- cidr='10.0.1.0/24') as subnet:
+ cidr='10.0.0.0/24') as subnet:
with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request('subnets')