pool_id = allocation_pool['id']
break
if not pool_id:
- error_message = ("No allocation pool found for "
- "ip address:%s" % ip_address)
+ error_message = _("No allocation pool found for "
+ "ip address:%s" % ip_address)
raise q_exc.InvalidInput(error_message=error_message)
# Two requests will be done on the database. The first will be to
# search if an entry starts with ip_address + 1 (r1). The second
pool_2=r_range,
subnet_cidr=subnet_cidr)
- def _validate_host_route(self, route):
+ def _validate_host_route(self, route, ip_version):
try:
netaddr.IPNetwork(route['destination'])
netaddr.IPAddress(route['nexthop'])
raise q_exc.InvalidInput(error_message=err_msg)
except ValueError:
# netaddr.IPAddress would raise this
- err_msg = ("invalid route: %s" % (str(route)))
+ err_msg = _("invalid route: %s") % str(route)
raise q_exc.InvalidInput(error_message=err_msg)
+ self._validate_ip_version(ip_version, route['nexthop'], 'nexthop')
+ self._validate_ip_version(ip_version, route['destination'],
+ 'destination')
def _allocate_pools_for_subnet(self, context, subnet):
"""Create IP allocation pools for a given subnet
def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)
+ def _validate_ip_version(self, ip_version, addr, name):
+ """Check IP field of a subnet match specified ip version"""
+ ip = netaddr.IPNetwork(addr)
+ if ip.version != ip_version:
+ msg = _("%(name)s '%(addr)s' does not match "
+ "the ip_version '%(ip_version)s'") % locals()
+ raise q_exc.InvalidInput(error_message=msg)
+
def _validate_subnet(self, s):
- """a subroutine to validate a subnet spec"""
- # check if the number of DNS nameserver exceeds the quota
+ """Validate a subnet spec"""
+
+ # The method requires the subnet spec 's' has 'ip_version' field.
+ # If 's' dict does not have 'ip_version' field in an API call
+ # (e.g., update_subnet()), you need to set 'ip_version' field
+ # before calling this method.
+ ip_ver = s['ip_version']
+
+ if 'cidr' in s:
+ self._validate_ip_version(ip_ver, s['cidr'], 'cidr')
+ if ('gateway_ip' in s and
+ s['gateway_ip'] and
+ s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED):
+ self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
+
if 'dns_nameservers' in s and \
s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED:
if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers:
except Exception:
raise q_exc.InvalidInput(
error_message=("error parsing dns address %s" % dns))
+ self._validate_ip_version(ip_ver, dns, 'dns_nameserver')
- # check if the number of host routes exceeds the quota
if 'host_routes' in s and \
s['host_routes'] != attributes.ATTR_NOT_SPECIFIED:
if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes:
quota=cfg.CONF.max_subnet_host_routes)
# check if the routes are all valid
for rt in s['host_routes']:
- self._validate_host_route(rt)
+ self._validate_host_route(rt, ip_ver)
def create_subnet(self, context, subnet):
s = subnet['subnet']
gratuitous DHCP offers"""
s = subnet['subnet']
+ # Fill 'ip_version' field with the current value since
+ # _validate_subnet() expects subnet spec has 'ip_version' field.
+ s['ip_version'] = self._get_subnet(context, id).ip_version
self._validate_subnet(s)
with context.session.begin(subtransactions=True):
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
- cidr=cidr,
+ cidr=cidr, ip_version=6,
allocation_pools=allocation_pools)
def test_create_subnet_with_large_allocation_pool(self):
shared=True)
self.assertEquals(ctx_manager.exception.code, 422)
+ def test_create_subnet_inconsistent_ipv6_cidrv4(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 6,
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv4_cidrv6(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': 'fe80::0/80',
+ 'ip_version': 4,
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv4_gatewayv6(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 4,
+ 'gateway_ip': 'fe80::1',
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv6_gatewayv4(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': 'fe80::0/80',
+ 'ip_version': 6,
+ 'gateway_ip': '192.168.0.1',
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv6_dns_v4(self):
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': 'fe80::0/80',
+ 'ip_version': 6,
+ 'dns_nameservers': ['192.168.0.1'],
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv4_hostroute_dst_v6(self):
+ host_routes = [{'destination': 'fe80::0/48',
+ 'nexthop': '10.0.2.20'}]
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 4,
+ 'host_routes': host_routes,
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_inconsistent_ipv4_hostroute_np_v6(self):
+ host_routes = [{'destination': '172.16.0.0/24',
+ 'nexthop': 'fe80::1'}]
+ with self.network() as network:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 4,
+ 'host_routes': host_routes,
+ 'tenant_id': network['network']['tenant_id']}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
def test_update_subnet(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
res = req.get_response(self.api)
self.assertEqual(res.status_int, 422)
+ def test_update_subnet_inconsistent_ipv4_gatewayv6(self):
+ with self.network() as network:
+ with self.subnet(network=network) as subnet:
+ data = {'subnet': {'gateway_ip': 'fe80::1'}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
+ with self.network() as network:
+ with self.subnet(network=network,
+ ip_version=6, cidr='fe80::/48') as subnet:
+ data = {'subnet': {'gateway_ip': '10.1.1.1'}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_update_subnet_inconsistent_ipv4_dns_v6(self):
+ dns_nameservers = ['fe80::1']
+ with self.network() as network:
+ with self.subnet(network=network) as subnet:
+ data = {'subnet': {'dns_nameservers': dns_nameservers}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
+ host_routes = [{'destination': 'fe80::0/48',
+ 'nexthop': '10.0.2.20'}]
+ with self.network() as network:
+ with self.subnet(network=network,
+ ip_version=6, cidr='fe80::/48') as subnet:
+ data = {'subnet': {'host_routes': host_routes}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
+ host_routes = [{'destination': '172.16.0.0/24',
+ 'nexthop': 'fe80::1'}]
+ with self.network() as network:
+ with self.subnet(network=network,
+ ip_version=6, cidr='fe80::/48') as subnet:
+ data = {'subnet': {'host_routes': host_routes}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet: