message = _("Failed to allocate subnet: %(reason)s")
+class AddressScopePrefixConflict(Conflict):
+ message = _("Failed to associate address scope: subnetpools "
+ "within an address scope must have unique prefixes")
+
+
+class IllegalSubnetPoolAssociationToAddressScope(BadRequest):
+ message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
+ " cannot be associated with address scope"
+ " %(address_scope_id)s")
+
+
+class IllegalSubnetPoolUpdate(BadRequest):
+ message = _("Illegal subnetpool update : %(reason)s")
+
+
class MinPrefixSubnetAllocationError(BadRequest):
message = _("Unable to allocate subnet with prefix length %(prefixlen)s, "
"minimum allowed prefix is %(min_prefixlen)s")
except exc.NoResultFound:
raise ext_address_scope.AddressScopeNotFound(address_scope_id=id)
+ def is_address_scope_owned_by_tenant(self, context, id):
+ """Check if address scope id is owned by the tenant or not.
+
+ AddressScopeNotFound is raised if the
+ - address scope id doesn't exist or
+ - if the (unshared) address scope id is not owned by this tenant.
+
+ @return Returns true if the user is admin or tenant is owner
+ Returns false if the address scope id is shared and not
+ owned by the tenant.
+ """
+ address_scope = self._get_address_scope(context, id)
+ return context.is_admin or (
+ address_scope.tenant_id == context.tenant_id)
+
def create_address_scope(self, context, address_scope):
"""Create a address scope."""
a_s = address_scope['address_scope']
def delete_address_scope(self, context, id):
with context.session.begin(subtransactions=True):
+ if self._get_subnetpools_by_address_scope_id(context, id):
+ raise ext_address_scope.AddressScopeInUse(address_scope_id=id)
address_scope = self._get_address_scope(context, id)
context.session.delete(address_scope)
'prefixes': [prefix['cidr']
for prefix in subnetpool['prefixes']],
'ip_version': subnetpool['ip_version'],
- 'default_quota': subnetpool['default_quota']}
+ 'default_quota': subnetpool['default_quota'],
+ 'address_scope_id': subnetpool['address_scope_id']}
return self._fields(res, fields)
def _make_port_dict(self, port, fields=None,
# NOTE(tidwellr): see note in _get_all_subnets()
return context.session.query(models_v2.SubnetPool).all()
+ def _get_subnetpools_by_address_scope_id(self, context, address_scope_id):
+ # NOTE(vikram.choudhary): see note in _get_all_subnets()
+ subnetpool_qry = context.session.query(models_v2.SubnetPool)
+ return subnetpool_qry.filter_by(
+ address_scope_id=address_scope_id).all()
+
def _get_port(self, context, id):
try:
port = self._get_by_id(context, models_v2.Port, id)
subnetpool_prefix = models_v2.SubnetPoolPrefix(**prefix_args)
context.session.add(subnetpool_prefix)
+ def _validate_address_scope_id(self, context, address_scope_id,
+ subnetpool_id, sp_prefixes):
+ """Validate the address scope before associating.
+
+ Subnetpool can associate with an address scope if
+ - the tenant user is the owner of both the subnetpool and
+ address scope
+ - the admin is associating the subnetpool with the shared
+ address scope
+ - there is no prefix conflict with the existing subnetpools
+ associated with the address scope.
+ """
+ if not attributes.is_attr_set(address_scope_id):
+ return
+
+ if not self.is_address_scope_owned_by_tenant(context,
+ address_scope_id):
+ raise n_exc.IllegalSubnetPoolAssociationToAddressScope(
+ subnetpool_id=subnetpool_id, address_scope_id=address_scope_id)
+
+ subnetpools = self._get_subnetpools_by_address_scope_id(
+ context, address_scope_id)
+
+ new_set = netaddr.IPSet(sp_prefixes)
+ for sp in subnetpools:
+ if sp.id == subnetpool_id:
+ continue
+ sp_set = netaddr.IPSet([prefix['cidr'] for prefix in sp.prefixes])
+ if sp_set.intersection(new_set):
+ raise n_exc.AddressScopePrefixConflict()
+
+ def _check_subnetpool_update_allowed(self, context, subnetpool_id,
+ address_scope_id):
+ """Check if the subnetpool can be updated or not.
+
+ If the subnetpool is associated to a shared address scope not owned
+ by the tenant, then the subnetpool cannot be updated.
+ """
+
+ if not self.is_address_scope_owned_by_tenant(context,
+ address_scope_id):
+ msg = _("subnetpool %(subnetpool_id)s cannot be updated when"
+ " associated with shared address scope "
+ "%(address_scope_id)s") % {
+ 'subnetpool_id': subnetpool_id,
+ 'address_scope_id': address_scope_id}
+ raise n_exc.IllegalSubnetPoolUpdate(reason=msg)
+
def create_subnetpool(self, context, subnetpool):
"""Create a subnetpool"""
sp = subnetpool['subnetpool']
sp_reader = subnet_alloc.SubnetPoolReader(sp)
+ if sp_reader.address_scope_id is attributes.ATTR_NOT_SPECIFIED:
+ sp_reader.address_scope_id = None
+ self._validate_address_scope_id(context, sp_reader.address_scope_id,
+ id, sp_reader.prefixes)
tenant_id = self._get_tenant_id_for_create(context, sp)
with context.session.begin(subtransactions=True):
pool_args = {'tenant_id': tenant_id,
'min_prefixlen': sp_reader.min_prefixlen,
'max_prefixlen': sp_reader.max_prefixlen,
'shared': sp_reader.shared,
- 'default_quota': sp_reader.default_quota}
+ 'default_quota': sp_reader.default_quota,
+ 'address_scope_id': sp_reader.address_scope_id}
subnetpool = models_v2.SubnetPool(**pool_args)
context.session.add(subnetpool)
for prefix in sp_reader.prefixes:
for key in ['id', 'name', 'ip_version', 'min_prefixlen',
'max_prefixlen', 'default_prefixlen', 'shared',
- 'default_quota']:
+ 'default_quota', 'address_scope_id']:
self._write_key(key, updated, model, new_pool)
return updated
updated = self._updated_subnetpool_dict(orig_sp, new_sp)
updated['tenant_id'] = orig_sp.tenant_id
reader = subnet_alloc.SubnetPoolReader(updated)
+ if orig_sp.address_scope_id:
+ self._check_subnetpool_update_allowed(context, id,
+ orig_sp.address_scope_id)
+
+ self._validate_address_scope_id(context, reader.address_scope_id,
+ id, reader.prefixes)
orig_sp.update(self._filter_non_model_columns(
reader.subnetpool,
models_v2.SubnetPool))
-1c844d1677f7
+1b4c6e320f79
2a16083502f3
kilo
--- /dev/null
+# Copyright 2015 Huawei Technologies India Pvt. Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""address scope support in subnetpool
+
+Revision ID: 1b4c6e320f79
+Revises: 1c844d1677f7
+Create Date: 2015-07-03 09:48:39.491058
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '1b4c6e320f79'
+down_revision = '1c844d1677f7'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('subnetpools',
+ sa.Column('address_scope_id',
+ sa.String(length=36),
+ nullable=True))
shared = sa.Column(sa.Boolean, nullable=False)
default_quota = sa.Column(sa.Integer, nullable=True)
hash = sa.Column(sa.String(36), nullable=False, server_default='')
+ address_scope_id = sa.Column(sa.String(36), nullable=True)
prefixes = orm.relationship(SubnetPoolPrefix,
backref='subnetpools',
cascade='all, delete, delete-orphan',
ADDRESS_SCOPE = 'address_scope'
ADDRESS_SCOPES = '%ss' % ADDRESS_SCOPE
-
+ADDRESS_SCOPE_ID = 'address_scope_id'
# Attribute Map
RESOURCE_ATTRIBUTE_MAP = {
'is_visible': True,
'required_by_policy': True,
'enforce_policy': True},
+ },
+ attr.SUBNETPOOLS: {
+ ADDRESS_SCOPE_ID: {'allow_post': True,
+ 'allow_put': True,
+ 'default': attr.ATTR_NOT_SPECIFIED,
+ 'validate': {'type:uuid_or_none': None},
+ 'is_visible': True}
}
}
message = _("Address scope %(address_scope_id)s could not be found")
-class AddressScopeDeleteError(nexception.BadRequest):
- message = _("Unable to delete address scope %(address_scope_id)s : "
- "%(reason)s")
+class AddressScopeInUse(nexception.InUse):
+ message = _("Unable to complete operation on "
+ "address scope %(address_scope_id)s. There are one or more"
+ " subnet pools in use on the address scope")
class AddressScopeUpdateError(nexception.BadRequest):
self._read_prefix_bounds(subnetpool)
self._read_attrs(subnetpool,
['tenant_id', 'name', 'shared'])
+ self._read_address_scope(subnetpool)
self.subnetpool = {'id': self.id,
'name': self.name,
'tenant_id': self.tenant_id,
'default_prefix': self.default_prefix,
'default_prefixlen': self.default_prefixlen,
'default_quota': self.default_quota,
+ 'address_scope_id': self.address_scope_id,
'shared': self.shared}
def _read_attrs(self, subnetpool, keys):
self.ip_version = ip_version
self.prefixes = self._compact_subnetpool_prefix_list(prefix_list)
+ def _read_address_scope(self, subnetpool):
+ self.address_scope_id = subnetpool.get('address_scope_id',
+ attributes.ATTR_NOT_SPECIFIED)
+
def _compact_subnetpool_prefix_list(self, prefix_list):
"""Compact any overlapping prefixes in prefix_list and return the
result
self.assertRaises(lib_exc.BadRequest,
self.admin_client.update_address_scope,
address_scope['id'], name='new-name', shared=False)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('1e471e5c-6f9c-437a-9257-fd9bc4b6f0fb')
+ def test_delete_address_scope_associated_with_subnetpool(self):
+ address_scope = self._create_address_scope()
+ prefixes = [u'10.11.12.0/24']
+ subnetpool_data = {'subnetpool': {
+ 'name': 'foo-subnetpool',
+ 'min_prefixlen': '29', 'prefixes': prefixes,
+ 'address_scope_id': address_scope['id']}}
+ body = self.client.create_subnetpool(subnetpool_data)
+ subnetpool = body['subnetpool']
+ self.addCleanup(self.client.delete_subnetpool, subnetpool['id'])
+ self.assertRaises(lib_exc.Conflict, self.client.delete_address_scope,
+ address_scope['id'])
self.assertEqual(pool_id, subnet['subnetpool_id'])
self.assertTrue(cidr.endswith(str(self.max_prefixlen)))
+ @test.attr(type='smoke')
+ @test.idempotent_id('49b44c64-1619-4b29-b527-ffc3c3115dc4')
+ def test_create_subnetpool_associate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ name, pool_id = self._create_subnetpool(
+ self.client, pool_values={'address_scope_id': address_scope['id']})
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('910b6393-db24-4f6f-87dc-b36892ad6c8c')
+ def test_update_subnetpool_associate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ name, pool_id = self._create_subnetpool(self.client)
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertIsNone(body['subnetpool']['address_scope_id'])
+ subnetpool_data = {'subnetpool': {'address_scope_id':
+ address_scope['id']}}
+ self.client.update_subnetpool(pool_id, subnetpool_data)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('18302e80-46a3-4563-82ac-ccd1dd57f652')
+ def test_update_subnetpool_associate_another_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ another_address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ name, pool_id = self._create_subnetpool(
+ self.client, pool_values={'address_scope_id':
+ address_scope['id']})
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+
+ body = self.client.get_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+ subnetpool_data = {'subnetpool': {'address_scope_id':
+ another_address_scope['id']}}
+ self.client.update_subnetpool(pool_id, subnetpool_data)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertEqual(another_address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f8970048-e41b-42d6-934b-a1297b07706a')
+ def test_update_subnetpool_disassociate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ name, pool_id = self._create_subnetpool(
+ self.client, pool_values={'address_scope_id': address_scope['id']})
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+ subnetpool_data = {'subnetpool': {'address_scope_id': None}}
+ self.client.update_subnetpool(pool_id, subnetpool_data)
+ body = self.client.get_subnetpool(pool_id)
+ self.assertIsNone(body['subnetpool']['address_scope_id'])
+
class SubnetPoolsTestV6(SubnetPoolsTest):
# under the License.
import copy
+import uuid
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
network_id=network['id'],
ip_version=4,
subnetpool_id=pool_id)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('9589e332-638e-476e-81bd-013d964aa3cb')
+ def test_create_subnetpool_associate_invalid_address_scope(self):
+ subnetpool_data = copy.deepcopy(self._subnetpool_data)
+ subnetpool_data['subnetpool']['address_scope_id'] = 'foo-addr-scope'
+ self.assertRaises(lib_exc.BadRequest, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('3b6c5942-485d-4964-a560-55608af020b5')
+ def test_create_subnetpool_associate_non_exist_address_scope(self):
+ subnetpool_data = copy.deepcopy(self._subnetpool_data)
+ non_exist_address_scope_id = str(uuid.uuid4())
+ subnetpool_data['subnetpool']['address_scope_id'] = (
+ non_exist_address_scope_id)
+ self.assertRaises(lib_exc.NotFound, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('2dfb4269-8657-485a-a053-b022e911456e')
+ def test_create_subnetpool_associate_address_scope_prefix_intersect(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ addr_scope_id = address_scope['id']
+ pool_id = self._create_subnetpool(
+ self.client, pool_values={'address_scope_id': addr_scope_id})
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ subnetpool_data = {'subnetpool': {'name': 'foo-subnetpool',
+ 'prefixes': [u'10.11.12.13/24'],
+ 'min_prefixlen': '29',
+ 'address_scope_id': addr_scope_id}}
+ self.assertRaises(lib_exc.Conflict, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('83a19a13-5384-42e2-b579-43fc69c80914')
+ def test_create_sp_associate_address_scope_multiple_prefix_intersect(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ addr_scope_id = address_scope['id']
+ pool_values = {'address_scope_id': addr_scope_id,
+ 'prefixes': [u'20.0.0.0/18', u'30.0.0.0/18']}
+
+ pool_id = self._create_subnetpool(
+ self.client, pool_values=pool_values)
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ prefixes = [u'40.0.0.0/18', u'50.0.0.0/18', u'30.0.0.0/12']
+ subnetpool_data = {'subnetpool': {'name': 'foo-subnetpool',
+ 'prefixes': prefixes,
+ 'min_prefixlen': '29',
+ 'address_scope_id': addr_scope_id}}
+ self.assertRaises(lib_exc.Conflict, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('f06d8e7b-908b-4e94-b570-8156be6a4bf1')
+ def test_create_subnetpool_associate_address_scope_of_other_owner(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True)
+ address_scope_id = address_scope['id']
+ subnetpool_data = copy.deepcopy(self._subnetpool_data)
+ subnetpool_data['subnetpool']['address_scope_id'] = address_scope_id
+ self.assertRaises(lib_exc.NotFound, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('3396ec6c-cb80-4ebe-b897-84e904580bdf')
+ def test_tenant_create_subnetpool_associate_shared_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ shared=True)
+ subnetpool_data = copy.deepcopy(self._subnetpool_data)
+ subnetpool_data['subnetpool']['address_scope_id'] = (
+ address_scope['id'])
+ self.assertRaises(lib_exc.BadRequest, self.client.create_subnetpool,
+ subnetpool_data)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('6d3d9ad5-32d4-4d63-aa00-8c62f73e2881')
+ def test_update_subnetpool_associate_address_scope_of_other_owner(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True)
+ address_scope_id = address_scope['id']
+ pool_id = self._create_subnetpool(self.client)
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ subnetpool_data = {'subnetpool': {'address_scope_id':
+ address_scope_id}}
+ self.assertRaises(lib_exc.NotFound, self.client.update_subnetpool,
+ pool_id, subnetpool_data)
+
+ def _test_update_subnetpool_prefix_intersect_helper(
+ self, pool_1_prefixes, pool_2_prefixes, pool_1_updated_prefixes):
+ # create two subnet pools associating to an address scope.
+ # Updating the first subnet pool with the prefix intersecting
+ # with the second one should be a failure
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'))
+ addr_scope_id = address_scope['id']
+ pool_values = {'address_scope_id': addr_scope_id,
+ 'prefixes': pool_1_prefixes}
+ pool_id_1 = self._create_subnetpool(self.client,
+ pool_values=pool_values)
+ self.addCleanup(self.client.delete_subnetpool, pool_id_1)
+ pool_values = {'address_scope_id': addr_scope_id,
+ 'prefixes': pool_2_prefixes}
+ pool_id_2 = self._create_subnetpool(self.client,
+ pool_values=pool_values)
+
+ self.addCleanup(self.client.delete_subnetpool, pool_id_2)
+
+ # now update the pool_id_1 with the prefix interesecting with
+ # pool_id_2
+ subnetpool_data = {'subnetpool': {'prefixes':
+ pool_1_updated_prefixes}}
+ self.assertRaises(lib_exc.Conflict, self.client.update_subnetpool,
+ pool_id_1, subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('96006292-7214-40e0-a471-153fb76e6b31')
+ def test_update_subnetpool_prefix_intersect(self):
+ pool_1_prefix = [u'20.0.0.0/18']
+ pool_2_prefix = [u'20.10.0.0/24']
+ pool_1_updated_prefix = [u'20.0.0.0/12']
+ self._test_update_subnetpool_prefix_intersect_helper(
+ pool_1_prefix, pool_2_prefix, pool_1_updated_prefix)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('4d3f8a79-c530-4e59-9acf-6c05968adbfe')
+ def test_update_subnetpool_multiple_prefix_intersect(self):
+ pool_1_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18']
+ pool_2_prefixes = [u'20.10.0.0/24', u'40.0.0.0/18', '50.0.0.0/18']
+ pool_1_updated_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18',
+ u'50.0.0.0/12']
+ self._test_update_subnetpool_prefix_intersect_helper(
+ pool_1_prefixes, pool_2_prefixes, pool_1_updated_prefixes)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('7438e49e-1351-45d8-937b-892059fb97f5')
+ def test_tenant_update_sp_prefix_associated_with_shared_addr_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ shared=True)
+ addr_scope_id = address_scope['id']
+ pool_values = {'prefixes': [u'20.0.0.0/18', u'30.0.0.0/18']}
+
+ pool_id = self._create_subnetpool(
+ self.client, pool_values=pool_values)
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+
+ # associate the subnetpool to the address scope as an admin
+ subnetpool_data = {'subnetpool': {'address_scope_id':
+ addr_scope_id}}
+ self.admin_client.update_subnetpool(pool_id, subnetpool_data)
+ body = self.admin_client.get_subnetpool(pool_id)
+ self.assertEqual(addr_scope_id,
+ body['subnetpool']['address_scope_id'])
+
+ # updating the subnetpool prefix by the tenant user should fail
+ # since the tenant is not the owner of address scope
+ update_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18', u'40.0.0.0/18']
+ subnetpool_data = {'subnetpool': {'prefixes': update_prefixes}}
+ self.assertRaises(lib_exc.BadRequest, self.client.update_subnetpool,
+ pool_id, subnetpool_data)
+
+ # admin can update the prefixes
+ self.admin_client.update_subnetpool(pool_id, subnetpool_data)
+ body = self.admin_client.get_subnetpool(pool_id)
+ self.assertEqual(update_prefixes,
+ body['subnetpool']['prefixes'])
# under the License.
import contextlib
+import netaddr
import webob.exc
neutron_context=context.Context('', 'not-the-owner'))
self.assertEqual(1, len(admin_res['address_scopes']))
self.assertEqual(0, len(mortal_res['address_scopes']))
+
+
+class TestSubnetPoolsWithAddressScopes(AddressScopeTestCase):
+ def setUp(self):
+ plugin = DB_PLUGIN_KLASS
+ ext_mgr = AddressScopeTestExtensionManager()
+ super(TestSubnetPoolsWithAddressScopes, self).setUp(plugin=plugin,
+ ext_mgr=ext_mgr)
+
+ def _test_create_subnetpool(self, prefixes, expected=None,
+ admin=False, **kwargs):
+ keys = kwargs.copy()
+ keys.setdefault('tenant_id', self._tenant_id)
+ with self.subnetpool(prefixes, admin, **keys) as subnetpool:
+ self._validate_resource(subnetpool, keys, 'subnetpool')
+ if expected:
+ self._compare_resource(subnetpool, expected, 'subnetpool')
+ return subnetpool
+
+ def test_create_subnetpool_associate_address_scope(self):
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ expected = {'address_scope_id': address_scope_id}
+ self._test_create_subnetpool([subnet.cidr], expected=expected,
+ name='foo-subnetpool',
+ min_prefixlen='21',
+ address_scope_id=address_scope_id)
+
+ def test_create_subnetpool_associate_invalid_address_scope(self):
+ self.assertRaises(
+ webob.exc.HTTPClientError, self._test_create_subnetpool, [],
+ min_prefixlen='21', address_scope_id='foo-addr-scope-id')
+
+ def test_create_subnetpool_assoc_address_scope_with_prefix_intersect(self):
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ expected = {'address_scope_id': address_scope_id}
+ self._test_create_subnetpool([subnet.cidr], expected=expected,
+ name='foo-subnetpool',
+ min_prefixlen='21',
+ address_scope_id=address_scope_id)
+ overlap_subnet = netaddr.IPNetwork('10.10.10.10/24')
+ self.assertRaises(
+ webob.exc.HTTPClientError, self._test_create_subnetpool,
+ [overlap_subnet.cidr], min_prefixlen='21',
+ address_scope_id=address_scope_id)
+
+ def test_update_subnetpool_associate_address_scope(self):
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ initial_subnetpool = self._test_create_subnetpool([subnet.cidr],
+ name='foo-sp',
+ min_prefixlen='21')
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ data = {'subnetpool': {'address_scope_id': address_scope_id}}
+ req = self.new_update_request(
+ 'subnetpools', data, initial_subnetpool['subnetpool']['id'])
+ api = self._api_for_resource('subnetpools')
+ res = self.deserialize(self.fmt, req.get_response(api))
+ self._compare_resource(res, data['subnetpool'], 'subnetpool')
+
+ def test_update_subnetpool_associate_invalid_address_scope(self):
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ initial_subnetpool = self._test_create_subnetpool([subnet.cidr],
+ name='foo-sp',
+ min_prefixlen='21')
+ data = {'subnetpool': {'address_scope_id': 'foo-addr-scope-id'}}
+ req = self.new_update_request(
+ 'subnetpools', data, initial_subnetpool['subnetpool']['id'])
+ api = self._api_for_resource('subnetpools')
+ res = req.get_response(api)
+ self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
+
+ def test_update_subnetpool_disassociate_address_scope(self):
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ expected = {'address_scope_id': address_scope_id}
+ initial_subnetpool = self._test_create_subnetpool(
+ [subnet.cidr], expected=expected, name='foo-sp',
+ min_prefixlen='21', address_scope_id=address_scope_id)
+
+ data = {'subnetpool': {'address_scope_id': None}}
+ req = self.new_update_request(
+ 'subnetpools', data, initial_subnetpool['subnetpool']['id'])
+ api = self._api_for_resource('subnetpools')
+ res = self.deserialize(self.fmt, req.get_response(api))
+ self._compare_resource(res, data['subnetpool'], 'subnetpool')
+
+ def test_update_subnetpool_associate_another_address_scope(self):
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ expected = {'address_scope_id': address_scope_id}
+ initial_subnetpool = self._test_create_subnetpool(
+ [subnet.cidr], expected=expected, name='foo-sp',
+ min_prefixlen='21', address_scope_id=address_scope_id)
+
+ with self.address_scope(name='foo-address-scope') as other_a_s:
+ other_a_s_id = other_a_s['address_scope']['id']
+ update_data = {'subnetpool': {'address_scope_id':
+ other_a_s_id}}
+ req = self.new_update_request(
+ 'subnetpools', update_data,
+ initial_subnetpool['subnetpool']['id'])
+ api = self._api_for_resource('subnetpools')
+ res = self.deserialize(self.fmt, req.get_response(api))
+ self._compare_resource(res, update_data['subnetpool'],
+ 'subnetpool')
+
+ def test_delete_address_scope_in_use(self):
+ with self.address_scope(name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ expected = {'address_scope_id': address_scope_id}
+ self._test_create_subnetpool([subnet.cidr], expected=expected,
+ name='foo-subnetpool',
+ min_prefixlen='21',
+ address_scope_id=address_scope_id)
+ self._delete('address-scopes', address_scope_id,
+ expected_code=webob.exc.HTTPConflict.code)