def _make_subnet_args(self, shared, detail,
subnet, subnetpool_id=None):
+ gateway_ip = str(detail.gateway_ip) if detail.gateway_ip else None
args = {'tenant_id': detail.tenant_id,
'id': detail.subnet_id,
'name': subnet['name'],
'cidr': str(detail.subnet_cidr),
'subnetpool_id': subnetpool_id,
'enable_dhcp': subnet['enable_dhcp'],
- 'gateway_ip': self._gateway_ip_str(subnet, detail.subnet_cidr),
+ 'gateway_ip': gateway_ip,
'shared': shared}
if subnet['ip_version'] == 6 and subnet['enable_dhcp']:
if attributes.is_attr_set(subnet['ipv6_ra_mode']):
return [{'subnet_id': ip["subnet_id"],
'ip_address': ip["ip_address"]}
for ip in ips]
-
- def _gateway_ip_str(self, subnet, cidr_net):
- if subnet.get('gateway_ip') is attributes.ATTR_NOT_SPECIFIED:
- return str(cidr_net.network + 1)
- return subnet.get('gateway_ip')
"""
pass
+ @staticmethod
+ def _gateway_ip_str(subnet, cidr_net):
+ if subnet.get('gateway_ip') is attributes.ATTR_NOT_SPECIFIED:
+ return str(netaddr.IPNetwork(cidr_net).network + 1)
+ return subnet.get('gateway_ip')
+
def _validate_pools_with_subnetpool(self, subnet):
"""Verifies that allocation pools are set correctly
subnet.update(s)
return subnet, changes
- def _allocate_pools_for_subnet(self, context, subnet):
- """Create IP allocation pools for a given subnet
-
- Pools are defined by the 'allocation_pools' attribute,
- a list of dict objects with 'start' and 'end' keys for
- defining the pool range.
- """
- pools = ipam_utils.generate_pools(subnet['cidr'], subnet['gateway_ip'])
- return [{'start': str(netaddr.IPAddress(pool.first)),
- 'end': str(netaddr.IPAddress(pool.last))}
- for pool in pools]
-
def _validate_subnet_cidr(self, context, network, new_subnet_cidr):
"""Validate the CIDR for a subnet.
pool_2=r_range,
subnet_cidr=subnet_cidr)
- def _prepare_allocation_pools(self, context, allocation_pools, subnet):
+ def _prepare_allocation_pools(self, allocation_pools, cidr, gateway_ip):
+ """Returns allocation pools represented as list of IPRanges"""
if not attributes.is_attr_set(allocation_pools):
- return self._allocate_pools_for_subnet(context, subnet)
+ return ipam_utils.generate_pools(cidr, gateway_ip)
- self._validate_allocation_pools(allocation_pools, subnet['cidr'])
- if subnet['gateway_ip']:
- self._validate_gw_out_of_pools(subnet['gateway_ip'],
+ self._validate_allocation_pools(allocation_pools, cidr)
+ if gateway_ip:
+ self._validate_gw_out_of_pools(gateway_ip,
allocation_pools)
- return allocation_pools
+ return [netaddr.IPRange(p['start'], p['end'])
+ for p in allocation_pools]
def _validate_gw_out_of_pools(self, gateway_ip, pools):
for allocation_pool in pools:
subnet_args,
dns_nameservers,
host_routes,
- allocation_pools):
- allocation_pools = self._prepare_allocation_pools(context,
- allocation_pools,
- subnet_args)
+ subnet_request):
self._validate_subnet_cidr(context, network, subnet_args['cidr'])
self._validate_network_subnetpools(network,
subnet_args['subnetpool_id'],
nexthop=rt['nexthop'])
context.session.add(route)
- self._save_allocation_pools(context, subnet, allocation_pools)
+ self._save_allocation_pools(context, subnet,
+ subnet_request.allocation_pools)
return subnet
def _save_allocation_pools(self, context, subnet, allocation_pools):
for pool in allocation_pools:
+ first_ip = str(netaddr.IPAddress(pool.first, pool.version))
+ last_ip = str(netaddr.IPAddress(pool.last, pool.version))
ip_pool = models_v2.IPAllocationPool(subnet=subnet,
- first_ip=pool['start'],
- last_ip=pool['end'])
+ first_ip=first_ip,
+ last_ip=last_ip)
context.session.add(ip_pool)
ip_range = models_v2.IPAvailabilityRange(
ipallocationpool=ip_pool,
- first_ip=pool['start'],
- last_ip=pool['end'])
+ first_ip=first_ip,
+ last_ip=last_ip)
context.session.add(ip_range)
def _allocate_ips_for_port_and_store(self, context, port, port_id):
subnetpool = self._get_subnetpool(context, subnetpool_id)
self._validate_ip_version_with_subnetpool(subnet, subnetpool)
+ # gateway_ip and allocation pools should be validated or generated
+ # only for specific request
+ if subnet['cidr'] is not attributes.ATTR_NOT_SPECIFIED:
+ subnet['gateway_ip'] = self._gateway_ip_str(subnet,
+ subnet['cidr'])
+ # allocation_pools are converted to list of IPRanges
+ subnet['allocation_pools'] = self._prepare_allocation_pools(
+ subnet['allocation_pools'],
+ subnet['cidr'],
+ subnet['gateway_ip'])
subnet_request = ipam.SubnetRequestFactory.get_request(context,
subnet,
subnetpool)
subnetpool_id),
subnet['dns_nameservers'],
subnet['host_routes'],
- subnet['allocation_pools'])
+ subnet_request)
return subnet
else:
return SpecificSubnetRequest(subnet['tenant_id'],
subnet_id,
- cidr)
+ cidr,
+ subnet.get('gateway_ip'),
+ subnet.get('allocation_pools'))
const.IPv6: '2001:db8::/64'}
FAKE_IP = {const.IPv4: '10.0.0.1',
const.IPv6: 'fe80::1',
- 'IPv6_GLOBAL': '2001:0db8::1',
+ 'IPv6_GLOBAL': '2001:db8::1',
'IPv6_LLA': 'fe80::123',
'IPv6_DHCP': '2001:db8::3'}
set_context=False)
def test_create_subnet_nonzero_cidr(self):
+ # Pass None as gateway_ip to prevent ip auto allocation for gw
+ # Previously gateway ip was allocated after validations,
+ # so no errors were raised if gw ip was out of range.
with self.subnet(cidr='10.129.122.5/8') as v1,\
self.subnet(cidr='11.129.122.5/15') as v2,\
self.subnet(cidr='12.129.122.5/16') as v3,\
self.subnet(cidr='14.129.122.5/22') as v5,\
self.subnet(cidr='15.129.122.5/24') as v6,\
self.subnet(cidr='16.129.122.5/28') as v7,\
- self.subnet(cidr='17.129.122.5/32', enable_dhcp=False) as v8:
+ self.subnet(cidr='17.129.122.5/32', gateway_ip=None,
+ enable_dhcp=False) as v8:
subs = (v1, v2, v3, v4, v5, v6, v7, v8)
# the API should accept and correct these for users
self.assertEqual(subs[0]['subnet']['cidr'], '10.0.0.0/8')