if attributes.is_attr_set(s.get('cidr')):
self._validate_ip_version(ip_ver, s['cidr'], 'cidr')
+ # TODO(watanabe.isao): After we found a way to avoid the re-sync
+ # from the agent side, this restriction could be removed.
+ if cur_subnet:
+ dhcp_was_enabled = cur_subnet.enable_dhcp
+ else:
+ dhcp_was_enabled = False
+ if s.get('enable_dhcp') and not dhcp_was_enabled:
+ subnet_prefixlen = netaddr.IPNetwork(s['cidr']).prefixlen
+ error_message = _("Subnet has a prefix length that is "
+ "incompatible with DHCP service enabled.")
+ if ((ip_ver == 4 and subnet_prefixlen > 30) or
+ (ip_ver == 6 and subnet_prefixlen > 126)):
+ raise n_exc.InvalidInput(error_message=error_message)
+ else:
+ # NOTE(watanabe.isao): The following restriction is necessary
+ # only when updating subnet.
+ if cur_subnet:
+ range_qry = context.session.query(models_v2.
+ IPAvailabilityRange).join(models_v2.IPAllocationPool)
+ ip_range = range_qry.filter_by(subnet_id=s['id']).first()
+ if not ip_range:
+ raise n_exc.IpAddressGenerationFailure(
+ net_id=cur_subnet.network_id)
+
if attributes.is_attr_set(s.get('gateway_ip')):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if (cfg.CONF.force_gateway_on_subnet and
data['port']['fixed_ips'])
def test_no_more_port_exception(self):
- with self.subnet(cidr='10.0.0.0/32', gateway_ip=None) as subnet:
+ with self.subnet(cidr='10.0.0.0/32', enable_dhcp=False) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
self.subnet(cidr='14.129.122.5/22'),
self.subnet(cidr='15.129.122.5/24'),
self.subnet(cidr='16.129.122.5/28'),
- self.subnet(cidr='17.129.122.5/32', gateway_ip=None)
+ self.subnet(cidr='17.129.122.5/32', enable_dhcp=False)
) as subs:
# the API should accept and correct these for users
self.assertEqual(subs[0]['subnet']['cidr'], '10.0.0.0/8')
self.assertEqual(subs[6]['subnet']['cidr'], '16.129.122.0/28')
self.assertEqual(subs[7]['subnet']['cidr'], '17.129.122.5/32')
+ def _test_create_subnet_with_invalid_netmask_returns_400(self, *args):
+ with self.network() as network:
+ for cidr in args:
+ ip_version = netaddr.IPNetwork(cidr).version
+ self._create_subnet(self.fmt,
+ network['network']['id'],
+ cidr,
+ webob.exc.HTTPClientError.code,
+ ip_version=ip_version)
+
+ def test_create_subnet_with_invalid_netmask_returns_400_ipv4(self):
+ self._test_create_subnet_with_invalid_netmask_returns_400(
+ '10.0.0.0/31', '10.0.0.0/32')
+
+ def test_create_subnet_with_invalid_netmask_returns_400_ipv6(self):
+ self._test_create_subnet_with_invalid_netmask_returns_400(
+ 'cafe:cafe::/127', 'cafe:cafe::/128')
+
def test_create_subnet_bad_ip_version(self):
with self.network() as network:
# Check bad IP version
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
+ def _test_subnet_update_enable_dhcp_no_ip_available_returns_409(
+ self, allocation_pools, cidr):
+ ip_version = netaddr.IPNetwork(cidr).version
+ with self.network() as network:
+ with self.subnet(network=network,
+ allocation_pools=allocation_pools,
+ enable_dhcp=False,
+ cidr=cidr,
+ ip_version=ip_version) as subnet:
+ id = subnet['subnet']['network_id']
+ self._create_port(self.fmt, id)
+ data = {'subnet': {'enable_dhcp': True}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int,
+ webob.exc.HTTPConflict.code)
+
+ def test_subnet_update_enable_dhcp_no_ip_available_returns_409_ipv4(self):
+ allocation_pools = [{'start': '10.0.0.2', 'end': '10.0.0.2'}]
+ cidr = '10.0.0.0/30'
+ self._test_subnet_update_enable_dhcp_no_ip_available_returns_409(
+ allocation_pools, cidr)
+
+ def test_subnet_update_enable_dhcp_no_ip_available_returns_409_ipv6(self):
+ allocation_pools = [{'start': '2001:db8::2', 'end': '2001:db8::2'}]
+ cidr = '2001:db8::/126'
+ self._test_subnet_update_enable_dhcp_no_ip_available_returns_409(
+ allocation_pools, cidr)
+
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet: