def _create_subnet_from_pool(self, context, subnet, subnetpool_id):
s = subnet['subnet']
tenant_id = self._get_tenant_id_for_create(context, s)
- has_allocpool = attributes.is_attr_set(s['allocation_pools'])
- is_any_subnetpool_request = not attributes.is_attr_set(s['cidr'])
- if is_any_subnetpool_request and has_allocpool:
- reason = _("allocation_pools allowed only "
- "for specific subnet requests.")
- raise n_exc.BadRequest(resource='subnets', msg=reason)
+ self._validate_pools_with_subnetpool(s)
with context.session.begin(subtransactions=True):
subnetpool = self._get_subnetpool(context, subnetpool_id)
- ip_version = s.get('ip_version')
- has_ip_version = attributes.is_attr_set(ip_version)
- if has_ip_version and ip_version != subnetpool.ip_version:
- args = {'req_ver': str(s['ip_version']),
- 'pool_ver': str(subnetpool.ip_version)}
- reason = _("Cannot allocate IPv%(req_ver)s subnet from "
- "IPv%(pool_ver)s subnet pool") % args
- raise n_exc.BadRequest(resource='subnets', msg=reason)
+ self._validate_ip_version_with_subnetpool(s, subnetpool)
network = self._get_network(context, s["network_id"])
allocator = subnet_alloc.SubnetAllocator(subnetpool, context)
from oslo_db import exception as db_exc
from oslo_log import log as logging
+from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
"""
pass
+ def _validate_pools_with_subnetpool(self, subnet):
+ """Verifies that allocation pools are set correctly
+
+ Allocation pools can be set for specific subnet request only
+ """
+ has_allocpool = attributes.is_attr_set(subnet['allocation_pools'])
+ is_any_subnetpool_request = not attributes.is_attr_set(subnet['cidr'])
+ if is_any_subnetpool_request and has_allocpool:
+ reason = _("allocation_pools allowed only "
+ "for specific subnet requests.")
+ raise n_exc.BadRequest(resource='subnets', msg=reason)
+
+ def _validate_ip_version_with_subnetpool(self, subnet, subnetpool):
+ """Validates ip version for subnet_pool and requested subnet"""
+ ip_version = subnet.get('ip_version')
+ has_ip_version = attributes.is_attr_set(ip_version)
+ if has_ip_version and ip_version != subnetpool.ip_version:
+ args = {'req_ver': str(subnet['ip_version']),
+ 'pool_ver': str(subnetpool.ip_version)}
+ reason = _("Cannot allocate IPv%(req_ver)s subnet from "
+ "IPv%(pool_ver)s subnet pool") % args
+ raise n_exc.BadRequest(resource='subnets', msg=reason)
+
def _update_db_port(self, context, db_port, new_port, network_id, new_mac):
# Remove all attributes in new_port which are not in the port DB model
# and then update the port