# License for the specific language governing permissions and limitations
# under the License.
-import netaddr
-
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc
from neutron.common import utils
from neutron.db import common_db_mixin
from neutron.db import models_v2
-from neutron.ipam import utils as ipam_utils
LOG = logging.getLogger(__name__)
)
context.session.add(allocated)
- @classmethod
- def _check_gateway_in_subnet(cls, cidr, gateway):
- """Validate that the gateway is on the subnet."""
- ip = netaddr.IPAddress(gateway)
- if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
- return ipam_utils.check_subnet_ip(cidr, gateway)
- return True
-
def _make_subnet_dict(self, subnet, fields=None):
res = {'id': subnet['id'],
'name': subnet['name'],
if attributes.is_attr_set(s.get('gateway_ip')):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if (cfg.CONF.force_gateway_on_subnet and
- not self._check_gateway_in_subnet(
+ not ipam.utils.check_gateway_in_subnet(
s['cidr'], s['gateway_ip'])):
error_message = _("Gateway is not valid on subnet")
raise n_exc.InvalidInput(error_message=error_message)
from neutron.db import db_base_plugin_common
from neutron.db import models_v2
from neutron.i18n import _LI
+from neutron.ipam import utils as ipam_utils
LOG = logging.getLogger(__name__)
a list of dict objects with 'start' and 'end' keys for
defining the pool range.
"""
- pools = []
- # Auto allocate the pool around gateway_ip
- net = netaddr.IPNetwork(subnet['cidr'])
- first_ip = net.first + 1
- last_ip = net.last - 1
- gw_ip = int(netaddr.IPAddress(subnet['gateway_ip'] or net.last))
- # Use the gw_ip to find a point for splitting allocation pools
- # for this subnet
- split_ip = min(max(gw_ip, net.first), net.last)
- if split_ip > first_ip:
- pools.append({'start': str(netaddr.IPAddress(first_ip)),
- 'end': str(netaddr.IPAddress(split_ip - 1))})
- if split_ip < last_ip:
- pools.append({'start': str(netaddr.IPAddress(split_ip + 1)),
- 'end': str(netaddr.IPAddress(last_ip))})
- # return auto-generated pools
- # no need to check for their validity
- return pools
+ pools = ipam_utils.generate_pools(subnet['cidr'], subnet['gateway_ip'])
+ return [{'start': str(netaddr.IPAddress(pool.first)),
+ 'end': str(netaddr.IPAddress(pool.last))}
+ for pool in pools]
def _validate_subnet_cidr(self, context, network, new_subnet_cidr):
"""Validate the CIDR for a subnet.
"""
subnet = netaddr.IPNetwork(subnet_cidr)
subnet_first_ip = netaddr.IPAddress(subnet.first + 1)
- subnet_last_ip = netaddr.IPAddress(subnet.last - 1)
+ # last address is broadcast in v4
+ subnet_last_ip = netaddr.IPAddress(subnet.last - (subnet.version == 4))
LOG.debug("Performing IP validity checks on allocation pools")
ip_sets = []
ip = netaddr.IPAddress(ip_address)
net = netaddr.IPNetwork(cidr)
# Check that the IP is valid on subnet. This cannot be the
- # network or the broadcast address
- return (ip != net.network and ip != net.broadcast
+ # network or the broadcast address (which exists only in IPv4)
+ return (ip != net.network
+ and (net.version == 6 or ip != net.broadcast)
and net.netmask & ip == net.network)
+def check_gateway_in_subnet(cidr, gateway):
+ """Validate that the gateway is on the subnet."""
+ ip = netaddr.IPAddress(gateway)
+ if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
+ return check_subnet_ip(cidr, gateway)
+ return True
+
+
def generate_pools(cidr, gateway_ip):
"""Create IP allocation pools for a specified subnet
The Neutron API defines a subnet's allocation pools as a list of
IPRange objects for defining the pool range.
"""
- pools = []
# Auto allocate the pool around gateway_ip
net = netaddr.IPNetwork(cidr)
+ if net.first == net.last:
+ # handle single address subnet case
+ return [netaddr.IPRange(net.first, net.last)]
first_ip = net.first + 1
- last_ip = net.last - 1
- gw_ip = int(netaddr.IPAddress(gateway_ip or net.last))
- # Use the gw_ip to find a point for splitting allocation pools
- # for this subnet
- split_ip = min(max(gw_ip, net.first), net.last)
- if split_ip > first_ip:
- pools.append(netaddr.IPRange(first_ip, split_ip - 1))
- if split_ip < last_ip:
- pools.append(netaddr.IPRange(split_ip + 1, last_ip))
- return pools
+ # last address is broadcast in v4
+ last_ip = net.last - (net.version == 4)
+ if first_ip >= last_ip:
+ # /31 lands here
+ return []
+ ipset = netaddr.IPSet(netaddr.IPRange(first_ip, last_ip))
+ if gateway_ip:
+ ipset.remove(netaddr.IPAddress(gateway_ip))
+ return list(ipset.iter_ipranges())
data['port']['fixed_ips'])
def test_no_more_port_exception(self):
- with self.subnet(cidr='10.0.0.0/32', enable_dhcp=False) as subnet:
+ with self.subnet(cidr='10.0.0.0/31', enable_dhcp=False) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
def test_create_subnet_ipv6_gw_values(self):
cidr = '2001::/64'
# Gateway is last IP in IPv6 DHCPv6 stateful subnet
- gateway = '2001::ffff:ffff:ffff:fffe'
+ gateway = '2001::ffff:ffff:ffff:ffff'
allocation_pools = [{'start': '2001::1',
- 'end': '2001::ffff:ffff:ffff:fffd'}]
+ 'end': '2001::ffff:ffff:ffff:fffe'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
# Gateway is first IP in IPv6 DHCPv6 stateful subnet
gateway = '2001::1'
allocation_pools = [{'start': '2001::2',
- 'end': '2001::ffff:ffff:ffff:fffe'}]
+ 'end': '2001::ffff:ffff:ffff:ffff'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('10.1.2.254'))
+ def test_allocate_specific_ipv6_subnet_specific_gateway(self):
+ # Same scenario as described in bug #1466322
+ sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
+ ['2210::/64'],
+ 64, 6)
+ sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
+ with self.ctx.session.begin(subtransactions=True):
+ sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
+ req = ipam.SpecificSubnetRequest(self._tenant_id,
+ uuidutils.generate_uuid(),
+ '2210::/64',
+ '2210::ffff:ffff:ffff:ffff')
+ res = sa.allocate_subnet(req)
+ detail = res.get_details()
+ self.assertEqual(detail.gateway_ip,
+ netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'))
+
def test__allocation_value_for_tenant_no_allocations(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],
--- /dev/null
+# Copyright (c) 2015 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+
+from neutron.ipam import utils
+from neutron.tests import base
+
+
+class TestIpamUtils(base.BaseTestCase):
+
+ def test_check_subnet_ip_v4_network(self):
+ self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.0'))
+
+ def test_check_subnet_ip_v4_broadcast(self):
+ self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.255'))
+
+ def test_check_subnet_ip_v4_valid(self):
+ self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.1'))
+ self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.254'))
+
+ def test_check_subnet_ip_v6_network(self):
+ self.assertFalse(utils.check_subnet_ip('F111::0/64', 'F111::0'))
+
+ def test_check_subnet_ip_v6_valid(self):
+ self.assertTrue(utils.check_subnet_ip('F111::0/64', 'F111::1'))
+ self.assertTrue(utils.check_subnet_ip('F111::0/64',
+ 'F111::FFFF:FFFF:FFFF:FFFF'))
+
+ def test_generate_pools_v4_nogateway(self):
+ cidr = '192.168.0.0/24'
+ expected = [netaddr.IPRange('192.168.0.1', '192.168.0.254')]
+ self.assertEqual(expected, utils.generate_pools(cidr, None))
+
+ def test_generate_pools_v4_gateway_first(self):
+ cidr = '192.168.0.0/24'
+ gateway = '192.168.0.1'
+ expected = [netaddr.IPRange('192.168.0.2', '192.168.0.254')]
+ self.assertEqual(expected, utils.generate_pools(cidr, gateway))
+
+ def test_generate_pools_v4_gateway_last(self):
+ cidr = '192.168.0.0/24'
+ gateway = '192.168.0.254'
+ expected = [netaddr.IPRange('192.168.0.1', '192.168.0.253')]
+ self.assertEqual(expected, utils.generate_pools(cidr, gateway))
+
+ def test_generate_pools_v4_32(self):
+ # 32 is special because it should have 1 usable address
+ cidr = '192.168.0.0/32'
+ expected = [netaddr.IPRange('192.168.0.0', '192.168.0.0')]
+ self.assertEqual(expected, utils.generate_pools(cidr, None))
+
+ def test_generate_pools_v4_31(self):
+ cidr = '192.168.0.0/31'
+ expected = []
+ self.assertEqual(expected, utils.generate_pools(cidr, None))
+
+ def test_generate_pools_v4_gateway_middle(self):
+ cidr = '192.168.0.0/24'
+ gateway = '192.168.0.128'
+ expected = [netaddr.IPRange('192.168.0.1', '192.168.0.127'),
+ netaddr.IPRange('192.168.0.129', '192.168.0.254')]
+ self.assertEqual(expected, utils.generate_pools(cidr, gateway))
+
+ def test_generate_pools_v6_nogateway(self):
+ # other than the difference in the last address, the rest of the
+ # logic is the same as v4 so we only need one test
+ cidr = 'F111::0/64'
+ expected = [netaddr.IPRange('F111::1', 'F111::FFFF:FFFF:FFFF:FFFF')]
+ self.assertEqual(expected, utils.generate_pools(cidr, None))