"%(address_scope_id)s.")
+class IllegalSubnetPoolIpVersionAssociationToAddressScope(BadRequest):
+ message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
+ "cannot associate with address scope %(address_scope_id)s "
+ "because subnetpool ip_version is not %(ip_version)s.")
+
+
class IllegalSubnetPoolUpdate(BadRequest):
message = _("Illegal subnetpool update : %(reason)s.")
name = sa.Column(sa.String(attr.NAME_MAX_LEN), nullable=False)
shared = sa.Column(sa.Boolean, nullable=False)
+ ip_version = sa.Column(sa.Integer(), nullable=False)
class AddressScopeDbMixin(ext_address_scope.AddressScopePluginBase):
res = {'id': address_scope['id'],
'name': address_scope['name'],
'tenant_id': address_scope['tenant_id'],
- 'shared': address_scope['shared']}
+ 'shared': address_scope['shared'],
+ 'ip_version': address_scope['ip_version']}
return self._fields(res, fields)
def _get_address_scope(self, context, id):
return context.is_admin or (
address_scope.tenant_id == context.tenant_id)
+ def get_ip_version_for_address_scope(self, context, id):
+ address_scope = self._get_address_scope(context, id)
+ return address_scope.ip_version
+
def create_address_scope(self, context, address_scope):
"""Create a address scope."""
a_s = address_scope['address_scope']
pool_args = {'tenant_id': tenant_id,
'id': address_scope_id,
'name': a_s['name'],
- 'shared': a_s['shared']}
+ 'shared': a_s['shared'],
+ 'ip_version': a_s['ip_version']}
address_scope = AddressScope(**pool_args)
context.session.add(address_scope)
context.session.add(subnetpool_prefix)
def _validate_address_scope_id(self, context, address_scope_id,
- subnetpool_id, sp_prefixes):
+ subnetpool_id, sp_prefixes, ip_version):
"""Validate the address scope before associating.
Subnetpool can associate with an address scope if
address scope
- there is no prefix conflict with the existing subnetpools
associated with the address scope.
+ - the address family of the subnetpool and address scope
+ are the same
"""
if not attributes.is_attr_set(address_scope_id):
return
raise n_exc.IllegalSubnetPoolAssociationToAddressScope(
subnetpool_id=subnetpool_id, address_scope_id=address_scope_id)
+ as_ip_version = self.get_ip_version_for_address_scope(context,
+ address_scope_id)
+
+ if ip_version != as_ip_version:
+ raise n_exc.IllegalSubnetPoolIpVersionAssociationToAddressScope(
+ subnetpool_id=subnetpool_id, address_scope_id=address_scope_id,
+ ip_version=as_ip_version)
+
subnetpools = self._get_subnetpools_by_address_scope_id(
context, address_scope_id)
self._check_default_subnetpool_exists(context,
sp_reader.ip_version)
self._validate_address_scope_id(context, sp_reader.address_scope_id,
- id, sp_reader.prefixes)
+ id, sp_reader.prefixes,
+ sp_reader.ip_version)
tenant_id = self._get_tenant_id_for_create(context, sp)
with context.session.begin(subtransactions=True):
pool_args = {'tenant_id': tenant_id,
orig_sp.address_scope_id)
self._validate_address_scope_id(context, reader.address_scope_id,
- id, reader.prefixes)
+ id, reader.prefixes,
+ reader.ip_version)
orig_sp.update(self._filter_non_model_columns(
reader.subnetpool,
models_v2.SubnetPool))
-dce3ec7a25c9
+c3a73f615e4
--- /dev/null
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Add ip_version to AddressScope
+
+Revision ID: c3a73f615e4
+Revises: 13cfb89f881a
+Create Date: 2015-10-08 17:34:32.231256
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'c3a73f615e4'
+down_revision = 'dce3ec7a25c9'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('address_scopes',
+ sa.Column('ip_version', sa.Integer(), nullable=False))
'is_visible': True,
'required_by_policy': True,
'enforce_policy': True},
+ 'ip_version': {'allow_post': True, 'allow_put': False,
+ 'convert_to': attr.convert_to_int,
+ 'validate': {'type:values': [4, 6]},
+ 'is_visible': True},
},
attr.SUBNETPOOLS: {
ADDRESS_SCOPE_ID: {'allow_post': True,
**kwargs)
def _test_update_address_scope_helper(self, is_admin=False, shared=None):
- address_scope = self._create_address_scope(is_admin=is_admin)
+ address_scope = self._create_address_scope(is_admin=is_admin,
+ ip_version=4)
if is_admin:
client = self.admin_client
@test.attr(type='smoke')
@test.idempotent_id('045f9294-8b1a-4848-b6a8-edf1b41e9d06')
def test_tenant_create_list_address_scope(self):
- address_scope = self._create_address_scope()
+ address_scope = self._create_address_scope(ip_version=4)
body = self.client.list_address_scopes()
returned_address_scopes = body['address_scopes']
self.assertIn(address_scope['id'],
@test.attr(type='smoke')
@test.idempotent_id('85e0326b-4c75-4b92-bd6e-7c7de6aaf05c')
def test_show_address_scope(self):
- address_scope = self._create_address_scope()
+ address_scope = self._create_address_scope(ip_version=4)
body = self.client.show_address_scope(
address_scope['id'])
returned_address_scope = body['address_scope']
@test.attr(type='smoke')
@test.idempotent_id('22b3b600-72a8-4b60-bc94-0f29dd6271df')
def test_delete_address_scope(self):
- address_scope = self._create_address_scope()
+ address_scope = self._create_address_scope(ip_version=4)
self.client.delete_address_scope(address_scope['id'])
self.assertRaises(lib_exc.NotFound, self.client.show_address_scope,
address_scope['id'])
@test.attr(type='smoke')
@test.idempotent_id('5a06c287-8036-4d04-9d78-def8e06d43df')
def test_admin_create_shared_address_scope(self):
- address_scope = self._create_address_scope(is_admin=True, shared=True)
+ address_scope = self._create_address_scope(is_admin=True, shared=True,
+ ip_version=4)
body = self.admin_client.show_address_scope(
address_scope['id'])
returned_address_scope = body['address_scope']
@test.idempotent_id('9c92ec34-0c50-4104-aa47-9ce98d5088df')
def test_tenant_create_shared_address_scope(self):
self.assertRaises(lib_exc.Forbidden, self._create_address_scope,
- shared=True)
+ shared=True, ip_version=4)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('a857b61e-bf53-4fab-b21a-b0daaf81b5bd')
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('ef213552-f2da-487d-bf4a-e1705d115ff1')
def test_tenant_get_not_shared_admin_address_scope(self):
- address_scope = self._create_address_scope(is_admin=True)
+ address_scope = self._create_address_scope(is_admin=True,
+ ip_version=4)
# None-shared admin address scope cannot be retrieved by tenant user.
self.assertRaises(lib_exc.NotFound, self.client.show_address_scope,
address_scope['id'])
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('702d0515-82cb-4207-b0d9-703336e54665')
def test_update_shared_address_scope_to_unshare(self):
- address_scope = self._create_address_scope(is_admin=True, shared=True)
+ address_scope = self._create_address_scope(is_admin=True, shared=True,
+ ip_version=4)
self.assertRaises(lib_exc.BadRequest,
self.admin_client.update_address_scope,
address_scope['id'], name='new-name', shared=False)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('1e471e5c-6f9c-437a-9257-fd9bc4b6f0fb')
def test_delete_address_scope_associated_with_subnetpool(self):
- address_scope = self._create_address_scope()
+ address_scope = self._create_address_scope(ip_version=4)
prefixes = [u'10.11.12.0/24']
subnetpool_data = {'subnetpool': {
'name': 'foo-subnetpool',
@test.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
name, pool_id = self._create_subnetpool(
self.client, pool_values={'address_scope_id': address_scope['id']})
self.addCleanup(self.client.delete_subnetpool, pool_id)
@test.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_address_scope(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
name, pool_id = self._create_subnetpool(self.client)
self.addCleanup(self.client.delete_subnetpool, pool_id)
body = self.client.get_subnetpool(pool_id)
@test.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_another_address_scope(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
another_address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
name, pool_id = self._create_subnetpool(
self.client, pool_values={'address_scope_id':
address_scope['id']})
@test.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_disassociate_address_scope(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
name, pool_id = self._create_subnetpool(
self.client, pool_values={'address_scope_id': address_scope['id']})
self.addCleanup(self.client.delete_subnetpool, pool_id)
@test.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope_prefix_intersect(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=4)
addr_scope_id = address_scope['id']
pool_id = self._create_subnetpool(
self.client, pool_values={'address_scope_id': addr_scope_id})
@test.requires_ext(extension='address-scope', service='network')
def test_create_sp_associate_address_scope_multiple_prefix_intersect(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=4)
addr_scope_id = address_scope['id']
pool_values = {'address_scope_id': addr_scope_id,
'prefixes': [u'20.0.0.0/18', u'30.0.0.0/18']}
@test.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope_of_other_owner(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'), is_admin=True)
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ ip_version=4)
address_scope_id = address_scope['id']
subnetpool_data = copy.deepcopy(self._subnetpool_data)
subnetpool_data['subnetpool']['address_scope_id'] = address_scope_id
def test_tenant_create_subnetpool_associate_shared_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
- shared=True)
+ shared=True, ip_version=4)
subnetpool_data = copy.deepcopy(self._subnetpool_data)
subnetpool_data['subnetpool']['address_scope_id'] = (
address_scope['id'])
@test.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_address_scope_of_other_owner(self):
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'), is_admin=True)
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ ip_version=4)
address_scope_id = address_scope['id']
pool_id = self._create_subnetpool(self.client)
self.addCleanup(self.client.delete_subnetpool, pool_id)
# Updating the first subnet pool with the prefix intersecting
# with the second one should be a failure
address_scope = self.create_address_scope(
- name=data_utils.rand_name('smoke-address-scope'))
+ name=data_utils.rand_name('smoke-address-scope'), ip_version=4)
addr_scope_id = address_scope['id']
pool_values = {'address_scope_id': addr_scope_id,
'prefixes': pool_1_prefixes}
def test_tenant_update_sp_prefix_associated_with_shared_addr_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
- shared=True)
+ shared=True, ip_version=4)
addr_scope_id = address_scope['id']
pool_values = {'prefixes': [u'20.0.0.0/18', u'30.0.0.0/18']}
body = self.admin_client.get_subnetpool(pool_id)
self.assertEqual(update_prefixes,
body['subnetpool']['prefixes'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('648fee7d-a909-4ced-bad3-3a169444c0a8')
+ def test_update_subnetpool_associate_address_scope_wrong_ip_version(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=6)
+ pool_id = self._create_subnetpool(self.client)
+ self.addCleanup(self.client.delete_subnetpool, pool_id)
+ subnetpool_data = {'subnetpool': {'address_scope_id':
+ address_scope['id']}}
+ self.assertRaises(lib_exc.BadRequest, self.client.update_subnetpool,
+ pool_id, subnetpool_data)
import webob.exc
from neutron.api.v2 import attributes as attr
+from neutron.common import constants
from neutron import context
from neutron.db import address_scope_db
from neutron.db import db_base_plugin_v2
class AddressScopeTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
- def _create_address_scope(self, fmt, expected_res_status=None,
- admin=False, **kwargs):
+ def _create_address_scope(self, fmt, ip_version=constants.IP_VERSION_4,
+ expected_res_status=None, admin=False, **kwargs):
address_scope = {'address_scope': {}}
+ address_scope['address_scope']['ip_version'] = ip_version
for k, v in kwargs.items():
address_scope['address_scope'][k] = str(v)
self.assertEqual(address_scope_res.status_int, expected_res_status)
return address_scope_res
- def _make_address_scope(self, fmt, admin=False, **kwargs):
- res = self._create_address_scope(fmt, admin=admin, **kwargs)
+ def _make_address_scope(self, fmt, ip_version, admin=False, **kwargs):
+ res = self._create_address_scope(fmt, ip_version,
+ admin=admin, **kwargs)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
- def address_scope(self, admin=False, **kwargs):
- addr_scope = self._make_address_scope(self.fmt, admin, **kwargs)
+ def address_scope(self, ip_version=constants.IP_VERSION_4,
+ admin=False, **kwargs):
+ addr_scope = self._make_address_scope(self.fmt, ip_version,
+ admin, **kwargs)
yield addr_scope
- def _test_create_address_scope(self, admin=False, expected=None, **kwargs):
+ def _test_create_address_scope(self, ip_version=constants.IP_VERSION_4,
+ admin=False, expected=None, **kwargs):
keys = kwargs.copy()
keys.setdefault('tenant_id', self._tenant_id)
- with self.address_scope(admin=admin, **keys) as addr_scope:
+ with self.address_scope(ip_version,
+ admin=admin, **keys) as addr_scope:
+ keys['ip_version'] = ip_version
self._validate_resource(addr_scope, keys, 'address_scope')
if expected:
self._compare_resource(addr_scope, expected, 'address_scope')
ext_mgr = AddressScopeTestExtensionManager()
super(TestAddressScope, self).setUp(plugin=plugin, ext_mgr=ext_mgr)
- def test_create_address_scope(self):
+ def test_create_address_scope_ipv4(self):
expected_addr_scope = {'name': 'foo-address-scope',
'tenant_id': self._tenant_id,
- 'shared': False}
+ 'shared': False,
+ 'ip_version': constants.IP_VERSION_4}
self._test_create_address_scope(name='foo-address-scope',
expected=expected_addr_scope)
+ def test_create_address_scope_ipv6(self):
+ expected_addr_scope = {'name': 'foo-address-scope',
+ 'tenant_id': self._tenant_id,
+ 'shared': False,
+ 'ip_version': constants.IP_VERSION_6}
+ self._test_create_address_scope(constants.IP_VERSION_6,
+ name='foo-address-scope',
+ expected=expected_addr_scope)
+
def test_create_address_scope_empty_name(self):
expected_addr_scope = {'name': '',
'tenant_id': self._tenant_id,
def test_list_address_scopes(self):
self._test_create_address_scope(name='foo-address-scope')
- self._test_create_address_scope(name='bar-address-scope')
+ self._test_create_address_scope(constants.IP_VERSION_6,
+ name='bar-address-scope')
res = self._list('address-scopes')
self.assertEqual(2, len(res['address_scopes']))
self.assertEqual(1, len(mortal_res['address_scopes']))
def test_list_address_scopes_different_tenants_not_shared(self):
- self._test_create_address_scope(name='foo-address-scope')
+ self._test_create_address_scope(constants.IP_VERSION_6,
+ name='foo-address-scope')
admin_res = self._list('address-scopes')
mortal_res = self._list(
'address-scopes',
address_scope_id=address_scope_id)
self._delete('address-scopes', address_scope_id,
expected_code=webob.exc.HTTPConflict.code)
+
+ def test_add_subnetpool_address_scope_wrong_address_family(self):
+ with self.address_scope(constants.IP_VERSION_6,
+ name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('10.10.10.0/24')
+ self.assertRaises(webob.exc.HTTPClientError,
+ self._test_create_subnetpool,
+ [subnet.cidr], name='foo-subnetpool',
+ min_prefixlen='21',
+ address_scope_id=address_scope_id)
+
+ def test_update_subnetpool_associate_address_scope_wrong_family(self):
+ with self.address_scope(constants.IP_VERSION_6,
+ name='foo-address-scope') as addr_scope:
+ address_scope_id = addr_scope['address_scope']['id']
+ subnet = netaddr.IPNetwork('2001:db8::/64')
+ expected = {'address_scope_id': address_scope_id}
+ initial_subnetpool = self._test_create_subnetpool(
+ [subnet.cidr], expected=expected, name='foo-sp',
+ min_prefixlen='64', address_scope_id=address_scope_id)
+
+ with self.address_scope(name='foo-address-scope') as other_a_s:
+ other_a_s_id = other_a_s['address_scope']['id']
+ update_data = {'subnetpool': {'address_scope_id':
+ other_a_s_id}}
+ req = self.new_update_request(
+ 'subnetpools', update_data,
+ initial_subnetpool['subnetpool']['id'])
+ api = self._api_for_resource('subnetpools')
+ res = req.get_response(api)
+ self.assertEqual(webob.exc.HTTPBadRequest.code,
+ res.status_int)