exceptions.MacAddressGenerationFailure:
webob.exc.HTTPServiceUnavailable,
exceptions.StateInvalid: webob.exc.HTTPBadRequest,
- exceptions.InvalidInput: webob.exc.HTTPBadRequest}
+ exceptions.InvalidInput: webob.exc.HTTPBadRequest,
+ exceptions.OverlappingAllocationPools: webob.exc.HTTPConflict,
+ exceptions.OutOfBoundsAllocationPool: webob.exc.HTTPBadRequest,
+ exceptions.InvalidAllocationPool: webob.exc.HTTPBadRequest,
+ }
def fields(request):
'cidr': {'allow_post': True, 'allow_put': False},
'gateway_ip': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
+ #TODO(salvatore-orlando): Enable PUT on allocation_pools
+ 'allocation_pools': {'allow_post': True, 'allow_put': False,
+ 'default': ATTR_NOT_SPECIFIED},
'dns_namesevers': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'additional_host_routes': {'allow_post': True, 'allow_put': True,
def subnet(subnet_data):
"""Represents a view for a subnet object"""
keys = ('id', 'network_id', 'tenant_id', 'gateway_ip', 'ip_version',
- 'cidr')
+ 'cidr', 'allocation_pools')
return resource(subnet_data, keys)
message = _("Invalid content type %(content_type)s.")
+class InvalidAllocationPool(QuantumException):
+ message = _("The allocation pool %(pool)s is not valid.")
+
+
+class OverlappingAllocationPools(QuantumException):
+ message = _("Found overlapping allocation pools:"
+ "%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.")
+
+
+class OutOfBoundsAllocationPool(QuantumException):
+ message = _("The allocation pool %(pool)s spans "
+ "beyond the subnet cidr %(subnet_cidr)s.")
+
+
class NotImplementedError(Error):
pass
"""Return an IP address to the pool of free IP's on the network
subnet.
"""
- range_qry = context.session.query(models_v2.IPAllocationRange)
+ # Grab all allocation pools for the subnet
+ pool_qry = context.session.query(models_v2.IPAllocationPool)
+ allocation_pools = pool_qry.filter_by(subnet_id=subnet_id).all()
+ # Find the allocation pool for the IP to recycle
+ pool_id = None
+ for allocation_pool in allocation_pools:
+ allocation_pool_range = netaddr.IPRange(
+ allocation_pool['first_ip'],
+ allocation_pool['last_ip'])
+ if netaddr.IPAddress(ip_address) in allocation_pool_range:
+ pool_id = allocation_pool['id']
+ break
+ if not pool_id:
+ error_message = ("No allocation pool found for "
+ "ip address:%s" % ip_address)
+ raise q_exc.InvalidInput(error_message=error_message)
# Two requests will be done on the database. The first will be to
# search if an entry starts with ip_address + 1 (r1). The second
# will be to see if an entry ends with ip_address -1 (r2).
# If 1 of the above holds true then the specific entry will be
# modified. If both hold true then the two ranges will be merged.
# If there are no entries then a single entry will be added.
+ range_qry = context.session.query(models_v2.IPAvailabilityRange)
ip_first = str(netaddr.IPAddress(ip_address) + 1)
ip_last = str(netaddr.IPAddress(ip_address) - 1)
LOG.debug("Recycle %s", ip_address)
-
try:
- r1 = range_qry.filter_by(subnet_id=subnet_id,
+ r1 = range_qry.filter_by(allocation_pool_id=pool_id,
first_ip=ip_first).one()
LOG.debug("Recycle: first match for %s-%s", r1['first_ip'],
r1['last_ip'])
except exc.NoResultFound:
r1 = []
try:
- r2 = range_qry.filter_by(subnet_id=subnet_id,
+ r2 = range_qry.filter_by(allocation_pool_id=pool_id,
last_ip=ip_last).one()
LOG.debug("Recycle: last match for %s-%s", r2['first_ip'],
r2['last_ip'])
if r1 and r2:
# Merge the two ranges
- ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
- first_ip=r2['first_ip'],
- last_ip=r1['last_ip'])
+ ip_range = models_v2.IPAvailabilityRange(
+ allocation_pool_id=pool_id,
+ first_ip=r2['first_ip'],
+ last_ip=r1['last_ip'])
context.session.add(ip_range)
LOG.debug("Recycle: merged %s-%s and %s-%s", r2['first_ip'],
r2['last_ip'], r1['first_ip'], r1['last_ip'])
r2['last_ip'])
else:
# Create a new range
- ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
- first_ip=ip_address,
- last_ip=ip_address)
+ ip_range = models_v2.IPAvailabilityRange(
+ allocation_pool_id=pool_id,
+ first_ip=ip_address,
+ last_ip=ip_address)
context.session.add(ip_range)
LOG.debug("Recycle: created new %s-%s", ip_address, ip_address)
The IP address will be generated from one of the subnets defined on
the network.
"""
- range_qry = context.session.query(models_v2.IPAllocationRange)
+ range_qry = context.session.query(
+ models_v2.IPAvailabilityRange).join(
+ models_v2.IPAllocationPool)
for subnet in subnets:
range = range_qry.filter_by(subnet_id=subnet['id']).first()
if not range:
def _allocate_specific_ip(context, subnet_id, ip_address):
"""Allocate a specific IP address on the subnet."""
ip = int(netaddr.IPAddress(ip_address))
- range_qry = context.session.query(models_v2.IPAllocationRange)
- ranges = range_qry.filter_by(subnet_id=subnet_id).all()
- for range in ranges:
+ range_qry = context.session.query(
+ models_v2.IPAvailabilityRange,
+ models_v2.IPAllocationPool).join(
+ models_v2.IPAllocationPool)
+ results = range_qry.filter_by(subnet_id=subnet_id).all()
+ for (range, pool) in results:
first = int(netaddr.IPAddress(range['first_ip']))
last = int(netaddr.IPAddress(range['last_ip']))
if first <= ip <= last:
new_first = str(netaddr.IPAddress(ip_address) + 1)
new_last = range['last_ip']
range['last_ip'] = str(netaddr.IPAddress(ip_address) - 1)
- ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
- first_ip=new_first,
- last_ip=new_last)
+ ip_range = models_v2.IPAvailabilityRange(
+ allocation_pool_id=pool['id'],
+ first_ip=new_first,
+ last_ip=new_last)
context.session.add(ip_range)
return
'subnet_id': result['subnet_id']})
return ips
+ def _validate_allocation_pools(self, ip_pools, gateway_ip, subnet_cidr):
+ """Validate IP allocation pools.
+
+ Verify start and end address for each allocation pool are valid,
+ ie: constituted by valid and appropriately ordered IP addresses.
+ Also, verify pools do not overlap among themselves and with the
+ gateway IP. Finally, verify that each range, and the gateway IP,
+ fall within the subnet's CIDR.
+
+ """
+
+ subnet = netaddr.IPNetwork(subnet_cidr)
+ subnet_first_ip = netaddr.IPAddress(subnet.first + 1)
+ subnet_last_ip = netaddr.IPAddress(subnet.last - 1)
+
+ LOG.debug("Performing IP validity checks on allocation pools")
+ ip_sets = []
+ for ip_pool in ip_pools:
+ try:
+ start_ip = netaddr.IPAddress(ip_pool['start'])
+ end_ip = netaddr.IPAddress(ip_pool['end'])
+ except netaddr.AddrFormatError:
+ LOG.error("Found invalid IP address in pool: %s - %s:",
+ ip_pool['start'],
+ ip_pool['end'])
+ raise q_exc.InvalidAllocationPool(pool=ip_pool)
+ if (start_ip.version != subnet.version or
+ end_ip.version != subnet.version):
+ LOG.error("Specified IP addresses do not match "
+ "the subnet IP version")
+ raise q_exc.InvalidAllocationPool(pool=ip_pool)
+ if end_ip < start_ip:
+ LOG.error("Start IP (%s) is greater than end IP (%s)",
+ ip_pool['start'],
+ ip_pool['end'])
+ raise q_exc.InvalidAllocationPool(pool=ip_pool)
+ if start_ip < subnet_first_ip or end_ip > subnet_last_ip:
+ LOG.error("Found pool larger than subnet CIDR:%s - %s",
+ ip_pool['start'],
+ ip_pool['end'])
+ raise q_exc.OutOfBoundsAllocationPool(
+ pool=ip_pool,
+ subnet_cidr=subnet_cidr)
+ # Valid allocation pool
+ # Create an IPSet for it for easily verifying overlaps
+ ip_sets.append(netaddr.IPSet(netaddr.IPRange(
+ ip_pool['start'],
+ ip_pool['end']).cidrs()))
+
+ LOG.debug("Checking for overlaps among allocation pools "
+ "and gateway ip")
+ ip_ranges = ip_pools[:]
+ # Treat gw as IPset as well
+ ip_ranges.append(gateway_ip)
+ ip_sets.append(netaddr.IPSet([gateway_ip]))
+ # Use integer cursors as an efficient way for implementing
+ # comparison and avoiding comparing the same pair twice
+ for l_cursor in range(len(ip_sets)):
+ for r_cursor in range(l_cursor + 1, len(ip_sets)):
+ if ip_sets[l_cursor] & ip_sets[r_cursor]:
+ l_range = ip_ranges[l_cursor]
+ r_range = ip_ranges[r_cursor]
+ LOG.error("Found overlapping ranges: %s and %s",
+ l_range, r_range)
+ raise q_exc.OverlappingAllocationPools(
+ pool_1=l_range,
+ pool_2=r_range,
+ subnet_cidr=subnet_cidr)
+
+ def _allocate_pools_for_subnet(self, context, subnet):
+ """Create IP allocation pools for a given subnet
+
+ Pools are defined by the 'allocation_pools' attribute,
+ a list of dict objects with 'start' and 'end' keys for
+ defining the pool range.
+
+ """
+
+ pools = []
+ if subnet['allocation_pools'] == api_router.ATTR_NOT_SPECIFIED:
+ # Auto allocate the pool around gateway
+ gw_ip = int(netaddr.IPAddress(subnet['gateway_ip']))
+ net = netaddr.IPNetwork(subnet['cidr'])
+ first_ip = net.first + 1
+ last_ip = net.last - 1
+ if gw_ip > first_ip:
+ pools.append({'start': str(netaddr.IPAddress(first_ip)),
+ 'end': str(netaddr.IPAddress(gw_ip - 1))})
+ if gw_ip < last_ip:
+ pools.append({'start': str(netaddr.IPAddress(gw_ip + 1)),
+ 'end': str(netaddr.IPAddress(last_ip))})
+ # return auto-generated pools
+ # no need to check for their validity
+ return pools
+ else:
+ pools = subnet['allocation_pools']
+ self._validate_allocation_pools(pools,
+ subnet['gateway_ip'],
+ subnet['cidr'])
+ return pools
+
def _make_network_dict(self, network, fields=None):
res = {'id': network['id'],
'name': network['name'],
'network_id': subnet['network_id'],
'ip_version': subnet['ip_version'],
'cidr': subnet['cidr'],
+ 'allocation_pools': [{'start': pool['first_ip'],
+ 'end': pool['last_ip']}
+ for pool in subnet['allocation_pools']],
'gateway_ip': subnet['gateway_ip']}
return self._fields(res, fields)
if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
- ip = netaddr.IPAddress(s['gateway_ip'])
- # Get the first and last indices for the subnet
- ranges = []
- # Gateway is the first address in the range
- if ip == net.network + 1:
- range = {'first': str(ip + 1),
- 'last': str(net.broadcast - 1)}
- ranges.append(range)
- # Gateway is the last address in the range
- elif ip == net.broadcast - 1:
- range = {'first': str(net.network + 1),
- 'last': str(ip - 1)}
- ranges.append(range)
- # Gateway is on IP in the subnet
- else:
- range = {'first': str(net.network + 1),
- 'last': str(ip - 1)}
- ranges.append(range)
- range = {'first': str(ip + 1),
- 'last': str(net.broadcast - 1)}
- ranges.append(range)
with context.session.begin():
network = self._get_network(context, s["network_id"])
subnet = models_v2.Subnet(network_id=s['network_id'],
cidr=s['cidr'],
gateway_ip=s['gateway_ip'])
context.session.add(subnet)
-
- with context.session.begin():
- for range in ranges:
- ip_range = models_v2.IPAllocationRange(subnet_id=subnet.id,
- first_ip=range['first'],
- last_ip=range['last'])
+ pools = self._allocate_pools_for_subnet(context, s)
+ for pool in pools:
+ ip_pool = models_v2.IPAllocationPool(subnet=subnet,
+ first_ip=pool['start'],
+ last_ip=pool['end'])
+ context.session.add(ip_pool)
+ ip_range = models_v2.IPAvailabilityRange(
+ ipallocationpool=ip_pool,
+ first_ip=pool['start'],
+ last_ip=pool['end'])
context.session.add(ip_range)
return self._make_subnet_dict(subnet)
id = sa.Column(sa.String(36), primary_key=True, default=utils.str_uuid)
-class IPAllocationRange(model_base.BASEV2, HasId):
- """Internal representation of a free IP address range in a Quantum
- subnet. The range of available ips is [first_ip..last_ip]. The
- allocation retrieves the first entry from the range. If the first
- entry is equal to the last entry then this row will be deleted.
+class IPAvailabilityRange(model_base.BASEV2):
+ """Internal representation of available IPs for Quantum subnets.
+
+ Allocation - first entry from the range will be allocated.
+ If the first entry is equal to the last entry then this row
+ will be deleted.
Recycling ips involves appending to existing ranges. This is
only done if the range is contiguous. If not, the first_ip will be
the same as the last_ip. When adjacent ips are recycled the ranges
will be merged.
+
"""
+ allocation_pool_id = sa.Column(sa.String(36),
+ sa.ForeignKey('ipallocationpools.id'),
+ nullable=True,
+ primary_key=True)
+ first_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
+ last_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
+
+ def __repr__(self):
+ return "%s - %s" % (self.first_ip, self.last_ip)
+
+
+class IPAllocationPool(model_base.BASEV2, HasId):
+ """Representation of an allocation pool in a Quantum subnet."""
+
subnet_id = sa.Column(sa.String(36), sa.ForeignKey('subnets.id'),
nullable=True)
first_ip = sa.Column(sa.String(64), nullable=False)
last_ip = sa.Column(sa.String(64), nullable=False)
+ available_ranges = orm.relationship(IPAvailabilityRange,
+ backref='ipallocationpool',
+ lazy="dynamic")
+
+ def __repr__(self):
+ return "%s - %s" % (self.first_ip, self.last_ip)
class IPAllocation(model_base.BASEV2):
ip_version = sa.Column(sa.Integer, nullable=False)
cidr = sa.Column(sa.String(64), nullable=False)
gateway_ip = sa.Column(sa.String(64))
-
+ allocation_pools = orm.relationship(IPAllocationPool,
+ backref='subnet',
+ lazy="dynamic")
#TODO(danwent):
# - dns_namservers
- # - excluded_ranges
# - additional_routes
import mock
import os
import random
-import unittest
+import unittest2
+import webob.exc
import quantum
from quantum.api.v2.router import APIRouter
return os.path.join(ETCDIR, *p)
-class QuantumDbPluginV2TestCase(unittest.TestCase):
+class QuantumDbPluginV2TestCase(unittest2.TestCase):
def setUp(self):
super(QuantumDbPluginV2TestCase, self).setUp()
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
- def _create_subnet(self, fmt, net_id, gateway_ip, cidr, ip_version=4):
+ def _create_subnet(self, fmt, net_id, gateway_ip, cidr,
+ allocation_pools=None, ip_version=4):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
-
+ if allocation_pools:
+ data['subnet']['allocation_pools'] = allocation_pools
subnet_req = self.new_create_request('subnets', data, fmt)
return subnet_req.get_response(self.api)
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'device_id', 'mac_address', 'fixed_ips'):
- if arg in kwargs:
+ # Arg must be present and not empty
+ if arg in kwargs and kwargs[arg]:
data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt)
return port_req.get_response(self.api)
- def _make_subnet(self, fmt, network, gateway, cidr, ip_version=4):
- res = self._create_subnet(fmt, network['network']['id'],
- gateway, cidr, ip_version)
+ def _make_subnet(self, fmt, network, gateway, cidr,
+ allocation_pools=None, ip_version=4):
+ res = self._create_subnet(fmt,
+ network['network']['id'],
+ gateway,
+ cidr,
+ allocation_pools=allocation_pools,
+ ip_version=ip_version)
+ # Things can go wrong - raise HTTP exc with res code only
+ # so it can be caught by unit tests
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, **kwargs):
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
- def subnet(self, network=None, gateway=None,
- cidr='10.0.0.0/24', fmt='json'):
+ def subnet(self, network=None,
+ gateway_ip=None,
+ cidr='10.0.0.0/24',
+ fmt='json',
+ ip_version=4,
+ allocation_pools=None):
# TODO(anyone) DRY this
+ # NOTE(salvatore-orlando): we can pass the network object
+ # to gen function anyway, and then avoid the repetition
if not network:
with self.network() as network:
- subnet = self._make_subnet(fmt, network, gateway, cidr)
+ subnet = self._make_subnet(fmt,
+ network,
+ gateway_ip,
+ cidr,
+ allocation_pools,
+ ip_version)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
else:
- subnet = self._make_subnet(fmt, network, gateway, cidr)
+ subnet = self._make_subnet(fmt,
+ network,
+ gateway_ip,
+ cidr,
+ allocation_pools,
+ ip_version)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
- def port(self, subnet=None, fmt='json'):
+ def port(self, subnet=None, fixed_ips=None, fmt='json'):
if not subnet:
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
- port = self._make_port(fmt, net_id)
+ port = self._make_port(fmt, net_id, fixed_ips=fixed_ips)
yield port
self._delete('ports', port['port']['id'])
else:
net_id = subnet['subnet']['network_id']
- port = self._make_port(fmt, net_id)
+ port = self._make_port(fmt, net_id, fixed_ips=fixed_ips)
yield port
self._delete('ports', port['port']['id'])
admin_status_up=True)
network2 = self.deserialize(fmt, res)
subnet2 = self._make_subnet(fmt, network2, "1.1.1.1",
- "1.1.1.0/24", 4)
+ "1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
def test_range_allocation(self):
fmt = 'json'
- with self.subnet(gateway='10.0.0.3',
+ with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
- with self.subnet(gateway='11.0.0.6',
+ with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
class TestSubnetsV2(QuantumDbPluginV2TestCase):
+ def _test_create_subnet(self, **kwargs):
+ keys = kwargs.copy()
+ keys.setdefault('cidr', '10.0.0.0/24')
+ keys.setdefault('ip_version', 4)
+ with self.subnet(**keys) as subnet:
+ # verify the response has each key with the correct value
+ for k in keys:
+ self.assertIn(k, subnet['subnet'])
+ self.assertEquals(subnet['subnet'][k], keys[k])
+ return subnet
+
def test_create_subnet(self):
- gateway = '10.0.0.1'
+ gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
- keys = [('ip_version', 4), ('gateway_ip', gateway),
- ('cidr', cidr)]
- with self.subnet(gateway=gateway, cidr=cidr) as subnet:
- for k, v in keys:
- self.assertEquals(subnet['subnet'][k], v)
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr)
def test_create_subnet_defaults(self):
- generated_gateway = '10.0.0.1'
+ gateway = '10.0.0.1'
cidr = '10.0.0.0/24'
- keys = [('ip_version', 4), ('gateway_ip', generated_gateway),
- ('cidr', cidr)]
- # intentionally not passing gateway in
- with self.subnet(cidr=cidr) as subnet:
- for k, v in keys:
- self.assertEquals(subnet['subnet'][k], v)
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.254'}]
+ subnet = self._test_create_subnet()
+ # verify cidr & gw have been correctly generated
+ self.assertEquals(subnet['subnet']['cidr'], cidr)
+ self.assertEquals(subnet['subnet']['gateway_ip'], gateway)
+ self.assertEquals(subnet['subnet']['allocation_pools'],
+ allocation_pools)
+
+ def test_create_subnet_with_allocation_pool(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.100'}]
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+
+ def test_create_subnet_with_v6_allocation_pool(self):
+ gateway_ip = 'fe80::1'
+ cidr = 'fe80::0/80'
+ allocation_pools = [{'start': 'fe80::2',
+ 'end': 'fe80::ffff:fffa:ffff'}]
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+
+ def test_create_subnet_with_large_allocation_pool(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/8'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.100'},
+ {'start': '10.1.0.0',
+ 'end': '10.200.0.100'}]
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+
+ def test_create_subnet_multiple_allocation_pools(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.100'},
+ {'start': '10.0.0.110',
+ 'end': '10.0.0.150'}]
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+
+ def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
+ gateway_ip = '10.0.0.50'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.1',
+ 'end': '10.0.0.100'}]
+ with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+ self.assertEquals(ctx_manager.exception.code, 409)
+
+ def test_create_subnet_overlapping_allocation_pools_returns_409(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.150'},
+ {'start': '10.0.0.140',
+ 'end': '10.0.0.180'}]
+ with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+ self.assertEquals(ctx_manager.exception.code, 409)
+
+ def test_create_subnet_invalid_allocation_pool_returns_400(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.256'}]
+ with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+ self.assertEquals(ctx_manager.exception.code, 400)
+
+ def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.1.6'}]
+ with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ self._test_create_subnet(gateway_ip=gateway_ip,
+ cidr=cidr,
+ allocation_pools=allocation_pools)
+ self.assertEquals(ctx_manager.exception.code, 400)
def test_update_subnet(self):
with self.subnet() as subnet:
# NOTE(jkoelker) This would be a good place to use contextlib.nested
# or just drop 2.6 support ;)
with self.network() as network:
- with self.subnet(network=network, gateway='10.0.0.1',
+ with self.subnet(network=network, gateway_ip='10.0.0.1',
cidr='10.0.1.0/24') as subnet:
- with self.subnet(network=network, gateway='10.0.1.1',
+ with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request('subnets')
res = self.deserialize('json',