from neutron.common import ipv6_utils
from neutron.db import ipam_backend_mixin
from neutron.db import models_v2
-from neutron import ipam
+from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
subnet['allocation_pools'],
subnet['cidr'],
subnet['gateway_ip'])
- subnet_request = ipam.SubnetRequestFactory.get_request(context,
- subnet,
- subnetpool)
+
+ subnet_request = ipam_req.SubnetRequestFactory.get_request(context,
+ subnet,
+ subnetpool)
if subnetpool_id:
driver = subnet_alloc.SubnetAllocator(subnetpool, context)
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-import netaddr
-
-from oslo_config import cfg
-from oslo_utils import uuidutils
-import six
-
-from neutron.api.v2 import attributes
-from neutron.common import constants
-from neutron.common import ipv6_utils
-from neutron.common import utils as common_utils
-from neutron.ipam import exceptions as ipam_exc
-
-
-@six.add_metaclass(abc.ABCMeta)
-class SubnetPool(object):
- """Represents a pool of IPs available inside an address scope."""
-
-
-@six.add_metaclass(abc.ABCMeta)
-class SubnetRequest(object):
- """Carries the data needed to make a subnet request
-
- The data validated and carried by an instance of this class is the data
- that is common to any type of request. This class shouldn't be
- instantiated on its own. Rather, a subclass of this class should be used.
- """
- def __init__(self, tenant_id, subnet_id,
- gateway_ip=None, allocation_pools=None):
- """Initialize and validate
-
- :param tenant_id: The tenant id who will own the subnet
- :type tenant_id: str uuid
- :param subnet_id: Neutron's subnet ID
- :type subnet_id: str uuid
- :param gateway_ip: An IP to reserve for the subnet gateway.
- :type gateway_ip: None or convertible to netaddr.IPAddress
- :param allocation_pools: The pool from which IPAM should allocate
- addresses. The allocator *may* allow allocating addresses outside
- of this range if specifically requested.
- :type allocation_pools: A list of netaddr.IPRange. None if not
- specified.
- """
- self._tenant_id = tenant_id
- self._subnet_id = subnet_id
- self._gateway_ip = None
- self._allocation_pools = None
-
- if gateway_ip is not None:
- self._gateway_ip = netaddr.IPAddress(gateway_ip)
-
- if allocation_pools is not None:
- allocation_pools = sorted(allocation_pools)
- previous = None
- for pool in allocation_pools:
- if not isinstance(pool, netaddr.ip.IPRange):
- raise TypeError("Ranges must be netaddr.IPRange")
- if previous and pool.first <= previous.last:
- raise ValueError("Ranges must not overlap")
- previous = pool
- if 1 < len(allocation_pools):
- # Checks that all the ranges are in the same IP version.
- # IPRange sorts first by ip version so we can get by with just
- # checking the first and the last range having sorted them
- # above.
- first_version = allocation_pools[0].version
- last_version = allocation_pools[-1].version
- if first_version != last_version:
- raise ValueError("Ranges must be in the same IP version")
- self._allocation_pools = allocation_pools
-
- if self.gateway_ip and self.allocation_pools:
- if self.gateway_ip.version != self.allocation_pools[0].version:
- raise ValueError("Gateway IP version inconsistent with "
- "allocation pool version")
-
- @property
- def tenant_id(self):
- return self._tenant_id
-
- @property
- def subnet_id(self):
- return self._subnet_id
-
- @property
- def gateway_ip(self):
- return self._gateway_ip
-
- @property
- def allocation_pools(self):
- return self._allocation_pools
-
- def _validate_with_subnet(self, subnet_cidr):
- if self.gateway_ip and cfg.CONF.force_gateway_on_subnet:
- gw_ip = netaddr.IPAddress(self.gateway_ip)
- if (gw_ip.version == 4 or (gw_ip.version == 6
- and not gw_ip.is_link_local())):
- if self.gateway_ip not in subnet_cidr:
- raise ValueError("gateway_ip is not in the subnet")
-
- if self.allocation_pools:
- if subnet_cidr.version != self.allocation_pools[0].version:
- raise ValueError("allocation_pools use the wrong ip version")
- for pool in self.allocation_pools:
- if pool not in subnet_cidr:
- raise ValueError("allocation_pools are not in the subnet")
-
-
-class AnySubnetRequest(SubnetRequest):
- """A template for allocating an unspecified subnet from IPAM
-
- A driver may not implement this type of request. For example, The initial
- reference implementation will not support this. The API has no way of
- creating a subnet without a specific address until subnet-allocation is
- implemented.
- """
- WILDCARDS = {constants.IPv4: '0.0.0.0',
- constants.IPv6: '::'}
-
- def __init__(self, tenant_id, subnet_id, version, prefixlen,
- gateway_ip=None, allocation_pools=None):
- """
- :param version: Either constants.IPv4 or constants.IPv6
- :param prefixlen: The prefix len requested. Must be within the min and
- max allowed.
- :type prefixlen: int
- """
- super(AnySubnetRequest, self).__init__(
- tenant_id=tenant_id,
- subnet_id=subnet_id,
- gateway_ip=gateway_ip,
- allocation_pools=allocation_pools)
-
- net = netaddr.IPNetwork(self.WILDCARDS[version] + '/' + str(prefixlen))
- self._validate_with_subnet(net)
-
- self._prefixlen = prefixlen
-
- @property
- def prefixlen(self):
- return self._prefixlen
-
-
-class SpecificSubnetRequest(SubnetRequest):
- """A template for allocating a specified subnet from IPAM
-
- The initial reference implementation will probably just allow any
- allocation, even overlapping ones. This can be expanded on by future
- blueprints.
- """
- def __init__(self, tenant_id, subnet_id, subnet_cidr,
- gateway_ip=None, allocation_pools=None):
- """
- :param subnet: The subnet requested. Can be IPv4 or IPv6. However,
- when IPAM tries to fulfill this request, the IP version must match
- the version of the address scope being used.
- :type subnet: netaddr.IPNetwork or convertible to one
- """
- super(SpecificSubnetRequest, self).__init__(
- tenant_id=tenant_id,
- subnet_id=subnet_id,
- gateway_ip=gateway_ip,
- allocation_pools=allocation_pools)
-
- self._subnet_cidr = netaddr.IPNetwork(subnet_cidr)
- self._validate_with_subnet(self._subnet_cidr)
-
- @property
- def subnet_cidr(self):
- return self._subnet_cidr
-
- @property
- def prefixlen(self):
- return self._subnet_cidr.prefixlen
-
-
-@six.add_metaclass(abc.ABCMeta)
-class AddressRequest(object):
- """Abstract base class for address requests"""
-
-
-class SpecificAddressRequest(AddressRequest):
- """For requesting a specified address from IPAM"""
- def __init__(self, address):
- """
- :param address: The address being requested
- :type address: A netaddr.IPAddress or convertible to one.
- """
- super(SpecificAddressRequest, self).__init__()
- self._address = netaddr.IPAddress(address)
-
- @property
- def address(self):
- return self._address
-
-
-class AnyAddressRequest(AddressRequest):
- """Used to request any available address from the pool."""
-
-
-class AutomaticAddressRequest(SpecificAddressRequest):
- """Used to create auto generated addresses, such as EUI64"""
- EUI64 = 'eui64'
-
- def _generate_eui64_address(self, **kwargs):
- if set(kwargs) != set(['prefix', 'mac']):
- raise ipam_exc.AddressCalculationFailure(
- address_type='eui-64',
- reason='must provide exactly 2 arguments - cidr and MAC')
- prefix = kwargs['prefix']
- mac_address = kwargs['mac']
- return ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac_address)
-
- _address_generators = {EUI64: _generate_eui64_address}
-
- def __init__(self, address_type=EUI64, **kwargs):
- """
- This constructor builds an automatic IP address. Parameter needed for
- generating it can be passed as optional keyword arguments.
-
- :param address_type: the type of address to generate.
- It could be a eui-64 address, a random IPv6 address, or
- a ipv4 link-local address.
- For the Kilo release only eui-64 addresses will be supported.
- """
- address_generator = self._address_generators.get(address_type)
- if not address_generator:
- raise ipam_exc.InvalidAddressType(address_type=address_type)
- address = address_generator(self, **kwargs)
- super(AutomaticAddressRequest, self).__init__(address)
-
-
-class RouterGatewayAddressRequest(AddressRequest):
- """Used to request allocating the special router gateway address."""
-
-
-class AddressRequestFactory(object):
- """Builds request using ip info
-
- Additional parameters(port and context) are not used in default
- implementation, but planned to be used in sub-classes
- provided by specific ipam driver,
- """
-
- @classmethod
- def get_request(cls, context, port, ip):
- if not ip:
- return AnyAddressRequest()
- else:
- return SpecificAddressRequest(ip)
-
-
-class SubnetRequestFactory(object):
- """Builds request using subnet info"""
-
- @classmethod
- def get_request(cls, context, subnet, subnetpool):
- cidr = subnet.get('cidr')
- subnet_id = subnet.get('id', uuidutils.generate_uuid())
- is_any_subnetpool_request = not attributes.is_attr_set(cidr)
-
- if is_any_subnetpool_request:
- prefixlen = subnet['prefixlen']
- if not attributes.is_attr_set(prefixlen):
- prefixlen = int(subnetpool['default_prefixlen'])
-
- return AnySubnetRequest(
- subnet['tenant_id'],
- subnet_id,
- common_utils.ip_version_from_int(subnetpool['ip_version']),
- prefixlen)
- else:
- return SpecificSubnetRequest(subnet['tenant_id'],
- subnet_id,
- cidr,
- subnet.get('gateway_ip'),
- subnet.get('allocation_pools'))
from oslo_log import log
import six
-from neutron import ipam
+from neutron.ipam import requests as ipam_req
from neutron import manager
LOG = log.getLogger(__name__)
Can be overridden on driver level to return custom factory
"""
- return ipam.SubnetRequestFactory
+ return ipam_req.SubnetRequestFactory
def get_address_request_factory(self):
"""Returns default AddressRequestFactory
Can be overridden on driver level to return custom factory
"""
- return ipam.AddressRequestFactory
+ return ipam_req.AddressRequestFactory
@six.add_metaclass(abc.ABCMeta)
from neutron.common import ipv6_utils
from neutron.db import api as db_api
from neutron.i18n import _LE
-from neutron import ipam
from neutron.ipam import driver as ipam_base
from neutron.ipam.drivers.neutrondb_ipam import db_api as ipam_db_api
from neutron.ipam import exceptions as ipam_exc
+from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
from neutron import manager
# NOTE(salv-orlando): It would probably better to have a simpler
# model for address requests and just check whether there is a
# specific IP address specified in address_request
- if isinstance(address_request, ipam.SpecificAddressRequest):
+ if isinstance(address_request, ipam_req.SpecificAddressRequest):
# This handles both specific and automatic address requests
# Check availability of requested IP
ip_address = str(address_request.address)
def get_details(self):
"""Return subnet data as a SpecificSubnetRequest"""
- return ipam.SpecificSubnetRequest(
+ return ipam_req.SpecificSubnetRequest(
self._tenant_id, self.subnet_manager.neutron_id,
self._cidr, self._gateway_ip, self._pools)
subnet_request = subnet.get_details()
# SubnetRequest must be an instance of SpecificSubnet
- if not isinstance(subnet_request, ipam.SpecificSubnetRequest):
+ if not isinstance(subnet_request, ipam_req.SpecificSubnetRequest):
raise ipam_exc.InvalidSubnetRequestType(
subnet_type=type(subnet_request))
return NeutronDbSubnet.create_from_subnet_request(subnet_request,
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import netaddr
+
+from oslo_config import cfg
+from oslo_utils import uuidutils
+import six
+
+from neutron.api.v2 import attributes
+from neutron.common import constants
+from neutron.common import ipv6_utils
+from neutron.common import utils as common_utils
+from neutron.ipam import exceptions as ipam_exc
+
+
+@six.add_metaclass(abc.ABCMeta)
+class SubnetPool(object):
+ """Represents a pool of IPs available inside an address scope."""
+
+
+@six.add_metaclass(abc.ABCMeta)
+class SubnetRequest(object):
+ """Carries the data needed to make a subnet request
+
+ The data validated and carried by an instance of this class is the data
+ that is common to any type of request. This class shouldn't be
+ instantiated on its own. Rather, a subclass of this class should be used.
+ """
+ def __init__(self, tenant_id, subnet_id,
+ gateway_ip=None, allocation_pools=None):
+ """Initialize and validate
+
+ :param tenant_id: The tenant id who will own the subnet
+ :type tenant_id: str uuid
+ :param subnet_id: Neutron's subnet ID
+ :type subnet_id: str uuid
+ :param gateway_ip: An IP to reserve for the subnet gateway.
+ :type gateway_ip: None or convertible to netaddr.IPAddress
+ :param allocation_pools: The pool from which IPAM should allocate
+ addresses. The allocator *may* allow allocating addresses outside
+ of this range if specifically requested.
+ :type allocation_pools: A list of netaddr.IPRange. None if not
+ specified.
+ """
+ self._tenant_id = tenant_id
+ self._subnet_id = subnet_id
+ self._gateway_ip = None
+ self._allocation_pools = None
+
+ if gateway_ip is not None:
+ self._gateway_ip = netaddr.IPAddress(gateway_ip)
+
+ if allocation_pools is not None:
+ allocation_pools = sorted(allocation_pools)
+ previous = None
+ for pool in allocation_pools:
+ if not isinstance(pool, netaddr.ip.IPRange):
+ raise TypeError("Ranges must be netaddr.IPRange")
+ if previous and pool.first <= previous.last:
+ raise ValueError("Ranges must not overlap")
+ previous = pool
+ if 1 < len(allocation_pools):
+ # Checks that all the ranges are in the same IP version.
+ # IPRange sorts first by ip version so we can get by with just
+ # checking the first and the last range having sorted them
+ # above.
+ first_version = allocation_pools[0].version
+ last_version = allocation_pools[-1].version
+ if first_version != last_version:
+ raise ValueError("Ranges must be in the same IP version")
+ self._allocation_pools = allocation_pools
+
+ if self.gateway_ip and self.allocation_pools:
+ if self.gateway_ip.version != self.allocation_pools[0].version:
+ raise ValueError("Gateway IP version inconsistent with "
+ "allocation pool version")
+
+ @property
+ def tenant_id(self):
+ return self._tenant_id
+
+ @property
+ def subnet_id(self):
+ return self._subnet_id
+
+ @property
+ def gateway_ip(self):
+ return self._gateway_ip
+
+ @property
+ def allocation_pools(self):
+ return self._allocation_pools
+
+ def _validate_with_subnet(self, subnet_cidr):
+ if self.gateway_ip and cfg.CONF.force_gateway_on_subnet:
+ gw_ip = netaddr.IPAddress(self.gateway_ip)
+ if (gw_ip.version == 4 or (gw_ip.version == 6
+ and not gw_ip.is_link_local())):
+ if self.gateway_ip not in subnet_cidr:
+ raise ValueError("gateway_ip is not in the subnet")
+
+ if self.allocation_pools:
+ if subnet_cidr.version != self.allocation_pools[0].version:
+ raise ValueError("allocation_pools use the wrong ip version")
+ for pool in self.allocation_pools:
+ if pool not in subnet_cidr:
+ raise ValueError("allocation_pools are not in the subnet")
+
+
+class AnySubnetRequest(SubnetRequest):
+ """A template for allocating an unspecified subnet from IPAM
+
+ A driver may not implement this type of request. For example, The initial
+ reference implementation will not support this. The API has no way of
+ creating a subnet without a specific address until subnet-allocation is
+ implemented.
+ """
+ WILDCARDS = {constants.IPv4: '0.0.0.0',
+ constants.IPv6: '::'}
+
+ def __init__(self, tenant_id, subnet_id, version, prefixlen,
+ gateway_ip=None, allocation_pools=None):
+ """
+ :param version: Either constants.IPv4 or constants.IPv6
+ :param prefixlen: The prefix len requested. Must be within the min and
+ max allowed.
+ :type prefixlen: int
+ """
+ super(AnySubnetRequest, self).__init__(
+ tenant_id=tenant_id,
+ subnet_id=subnet_id,
+ gateway_ip=gateway_ip,
+ allocation_pools=allocation_pools)
+
+ net = netaddr.IPNetwork(self.WILDCARDS[version] + '/' + str(prefixlen))
+ self._validate_with_subnet(net)
+
+ self._prefixlen = prefixlen
+
+ @property
+ def prefixlen(self):
+ return self._prefixlen
+
+
+class SpecificSubnetRequest(SubnetRequest):
+ """A template for allocating a specified subnet from IPAM
+
+ The initial reference implementation will probably just allow any
+ allocation, even overlapping ones. This can be expanded on by future
+ blueprints.
+ """
+ def __init__(self, tenant_id, subnet_id, subnet_cidr,
+ gateway_ip=None, allocation_pools=None):
+ """
+ :param subnet: The subnet requested. Can be IPv4 or IPv6. However,
+ when IPAM tries to fulfill this request, the IP version must match
+ the version of the address scope being used.
+ :type subnet: netaddr.IPNetwork or convertible to one
+ """
+ super(SpecificSubnetRequest, self).__init__(
+ tenant_id=tenant_id,
+ subnet_id=subnet_id,
+ gateway_ip=gateway_ip,
+ allocation_pools=allocation_pools)
+
+ self._subnet_cidr = netaddr.IPNetwork(subnet_cidr)
+ self._validate_with_subnet(self._subnet_cidr)
+
+ @property
+ def subnet_cidr(self):
+ return self._subnet_cidr
+
+ @property
+ def prefixlen(self):
+ return self._subnet_cidr.prefixlen
+
+
+@six.add_metaclass(abc.ABCMeta)
+class AddressRequest(object):
+ """Abstract base class for address requests"""
+
+
+class SpecificAddressRequest(AddressRequest):
+ """For requesting a specified address from IPAM"""
+ def __init__(self, address):
+ """
+ :param address: The address being requested
+ :type address: A netaddr.IPAddress or convertible to one.
+ """
+ super(SpecificAddressRequest, self).__init__()
+ self._address = netaddr.IPAddress(address)
+
+ @property
+ def address(self):
+ return self._address
+
+
+class AnyAddressRequest(AddressRequest):
+ """Used to request any available address from the pool."""
+
+
+class AutomaticAddressRequest(SpecificAddressRequest):
+ """Used to create auto generated addresses, such as EUI64"""
+ EUI64 = 'eui64'
+
+ def _generate_eui64_address(self, **kwargs):
+ if set(kwargs) != set(['prefix', 'mac']):
+ raise ipam_exc.AddressCalculationFailure(
+ address_type='eui-64',
+ reason='must provide exactly 2 arguments - cidr and MAC')
+ prefix = kwargs['prefix']
+ mac_address = kwargs['mac']
+ return ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac_address)
+
+ _address_generators = {EUI64: _generate_eui64_address}
+
+ def __init__(self, address_type=EUI64, **kwargs):
+ """
+ This constructor builds an automatic IP address. Parameter needed for
+ generating it can be passed as optional keyword arguments.
+
+ :param address_type: the type of address to generate.
+ It could be a eui-64 address, a random IPv6 address, or
+ a ipv4 link-local address.
+ For the Kilo release only eui-64 addresses will be supported.
+ """
+ address_generator = self._address_generators.get(address_type)
+ if not address_generator:
+ raise ipam_exc.InvalidAddressType(address_type=address_type)
+ address = address_generator(self, **kwargs)
+ super(AutomaticAddressRequest, self).__init__(address)
+
+
+class RouterGatewayAddressRequest(AddressRequest):
+ """Used to request allocating the special router gateway address."""
+
+
+class AddressRequestFactory(object):
+ """Builds request using ip info
+
+ Additional parameters(port and context) are not used in default
+ implementation, but planned to be used in sub-classes
+ provided by specific ipam driver,
+ """
+
+ @classmethod
+ def get_request(cls, context, port, ip):
+ if not ip:
+ return AnyAddressRequest()
+ else:
+ return SpecificAddressRequest(ip)
+
+
+class SubnetRequestFactory(object):
+ """Builds request using subnet info"""
+
+ @classmethod
+ def get_request(cls, context, subnet, subnetpool):
+ cidr = subnet.get('cidr')
+ subnet_id = subnet.get('id', uuidutils.generate_uuid())
+ is_any_subnetpool_request = not attributes.is_attr_set(cidr)
+
+ if is_any_subnetpool_request:
+ prefixlen = subnet['prefixlen']
+ if not attributes.is_attr_set(prefixlen):
+ prefixlen = int(subnetpool['default_prefixlen'])
+
+ return AnySubnetRequest(
+ subnet['tenant_id'],
+ subnet_id,
+ common_utils.ip_version_from_int(subnetpool['ip_version']),
+ prefixlen)
+ else:
+ return SpecificSubnetRequest(subnet['tenant_id'],
+ subnet_id,
+ cidr,
+ subnet.get('gateway_ip'),
+ subnet.get('allocation_pools'))
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.db import models_v2
-import neutron.ipam as ipam
from neutron.ipam import driver
+from neutron.ipam import requests as ipam_req
from neutron.ipam import utils as ipam_utils
prefixlen=request.prefixlen,
min_prefixlen=min_prefixlen)
- if isinstance(request, ipam.AnySubnetRequest):
+ if isinstance(request, ipam_req.AnySubnetRequest):
return self._allocate_any_subnet(request)
- elif isinstance(request, ipam.SpecificSubnetRequest):
+ elif isinstance(request, ipam_req.SpecificSubnetRequest):
return self._allocate_specific_subnet(request)
else:
msg = _("Unsupported request type")
cidr,
gateway_ip=None,
allocation_pools=None):
- self._req = ipam.SpecificSubnetRequest(
+ self._req = ipam_req.SpecificSubnetRequest(
tenant_id,
subnet_id,
cidr,
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
-from neutron import ipam
from neutron.ipam.drivers.neutrondb_ipam import driver
from neutron.ipam import exceptions as ipam_exc
+from neutron.ipam import requests as ipam_req
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_db_plugin
cidr = '10.0.0.0/24'
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
None,
cidr,
def _prepare_specific_subnet_request(self, cidr):
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
self.assertRaises(
ipam_exc.InvalidSubnetRequestType,
self.ipam_pool.allocate_subnet,
- ipam.AnySubnetRequest(self._tenant_id, 'meh', constants.IPv4, 24))
+ ipam_req.AnySubnetRequest(self._tenant_id, 'meh',
+ constants.IPv4, 24))
def test_update_subnet_pools(self):
cidr = '10.0.0.0/24'
ipam_subnet.associate_neutron_subnet(subnet['id'])
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
- update_subnet_req = ipam.SpecificSubnetRequest(
+ update_subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
allocation_pool_ranges = [netaddr.IPRange(
pool['start'], pool['end']) for pool in
subnet['allocation_pools']]
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
tenant_id,
subnet['id'],
cidr,
cidr = '10.0.0.0/24'
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', subnet, cidr, gateway_ip=subnet['gateway_ip'])
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
with self.ctx.session.begin():
def test_allocate_any_v4_address_succeeds(self):
ip_address = self._allocate_address(
- '10.0.0.0/24', 4, ipam.AnyAddressRequest)
+ '10.0.0.0/24', 4, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
def test_allocate_any_v6_address_succeeds(self):
ip_address = self._allocate_address(
- 'fde3:abcd:4321:1::/64', 6, ipam.AnyAddressRequest)
+ 'fde3:abcd:4321:1::/64', 6, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
def test_allocate_specific_v4_address_succeeds(self):
ip_address = self._allocate_address(
- '10.0.0.0/24', 4, ipam.SpecificAddressRequest('10.0.0.33'))
+ '10.0.0.0/24', 4, ipam_req.SpecificAddressRequest('10.0.0.33'))
self.assertEqual('10.0.0.33', ip_address)
def test_allocate_specific_v6_address_succeeds(self):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6,
- ipam.SpecificAddressRequest('fde3:abcd:4321:1::33'))
+ ipam_req.SpecificAddressRequest('fde3:abcd:4321:1::33'))
self.assertEqual('fde3:abcd:4321:1::33', ip_address)
def test_allocate_specific_v4_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'10.0.0.0/24', 4,
- ipam.SpecificAddressRequest('192.168.0.1'))
+ ipam_req.SpecificAddressRequest('192.168.0.1'))
def test_allocate_specific_v6_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'fde3:abcd:4321:1::/64', 6,
- ipam.SpecificAddressRequest(
+ ipam_req.SpecificAddressRequest(
'fde3:abcd:eeee:1::33'))
def test_allocate_specific_address_in_use_fails(self):
ipam_subnet = self._create_and_allocate_ipam_subnet(
'fde3:abcd:4321:1::/64', ip_version=6)[0]
- addr_req = ipam.SpecificAddressRequest('fde3:abcd:4321:1::33')
+ addr_req = ipam_req.SpecificAddressRequest('fde3:abcd:4321:1::33')
ipam_subnet.allocate(addr_req)
self.assertRaises(ipam_exc.IpAddressAlreadyAllocated,
ipam_subnet.allocate,
# Same as above, the ranges will be recalculated always
ipam_subnet = self._create_and_allocate_ipam_subnet(
'192.168.0.0/30', ip_version=4)[0]
- ipam_subnet.allocate(ipam.AnyAddressRequest)
+ ipam_subnet.allocate(ipam_req.AnyAddressRequest)
# The second address generation request on a /30 for v4 net must fail
self.assertRaises(ipam_exc.IpAddressGenerationFailure,
ipam_subnet.allocate,
- ipam.AnyAddressRequest)
+ ipam_req.AnyAddressRequest)
def _test_deallocate_address(self, cidr, ip_version):
ipam_subnet = self._create_and_allocate_ipam_subnet(
cidr, ip_version=ip_version)[0]
- ip_address = ipam_subnet.allocate(ipam.AnyAddressRequest)
+ ip_address = ipam_subnet.allocate(ipam_req.AnyAddressRequest)
ipam_subnet.deallocate(ip_address)
def test_deallocate_v4_address(self):
pass
def _test_allocate_subnet(self, subnet_id):
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', subnet_id, '192.168.0.0/24')
return self.ipam_pool.allocate_subnet(subnet_req)
def test_allocate_subnet_for_non_existent_subnet_pass(self):
# This test should pass because neutron subnet is not checked
# until associate neutron subnet step
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
self.ipam_pool.allocate_subnet(subnet_req)
self.assertEqual(subnet['id'], details.subnet_id)
def test_associate_non_existing_neutron_subnet_fails(self):
- subnet_req = ipam.SpecificSubnetRequest(
+ subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
self.assertRaises(n_exc.SubnetNotFound,
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron import context
-from neutron import ipam
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
+from neutron.ipam import requests as ipam_req
from neutron import manager
from neutron.tests import base
from neutron.tests.unit.ipam import fake_driver
class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
def test_subnet_request(self):
- pool = ipam.SubnetRequest(self.tenant_id,
+ pool = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id)
self.assertEqual(self.tenant_id, pool.tenant_id)
self.assertEqual(self.subnet_id, pool.subnet_id)
self.assertEqual(None, pool.allocation_pools)
def test_subnet_request_gateway(self):
- request = ipam.SubnetRequest(self.tenant_id,
+ request = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id,
gateway_ip='1.2.3.1')
self.assertEqual('1.2.3.1', str(request.gateway_ip))
def test_subnet_request_bad_gateway(self):
self.assertRaises(netaddr.core.AddrFormatError,
- ipam.SubnetRequest,
+ ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
gateway_ip='1.2.3.')
def test_subnet_request_with_range(self):
allocation_pools = [netaddr.IPRange('1.2.3.4', '1.2.3.5'),
netaddr.IPRange('1.2.3.7', '1.2.3.9')]
- request = ipam.SubnetRequest(self.tenant_id,
+ request = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id,
allocation_pools=allocation_pools)
self.assertEqual(allocation_pools, request.allocation_pools)
def test_subnet_request_range_not_list(self):
self.assertRaises(TypeError,
- ipam.SubnetRequest,
+ ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=1)
def test_subnet_request_bad_range(self):
self.assertRaises(TypeError,
- ipam.SubnetRequest,
+ ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=['1.2.3.4'])
pools = [netaddr.IPRange('0.0.0.1', '0.0.0.2'),
netaddr.IPRange('::1', '::2')]
self.assertRaises(ValueError,
- ipam.SubnetRequest,
+ ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=pools)
pools = [netaddr.IPRange('0.0.0.10', '0.0.0.20'),
netaddr.IPRange('0.0.0.8', '0.0.0.10')]
self.assertRaises(ValueError,
- ipam.SubnetRequest,
+ ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=pools)
class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request(self):
- request = ipam.AnySubnetRequest(self.tenant_id,
+ request = ipam_req.AnySubnetRequest(self.tenant_id,
self.subnet_id,
constants.IPv4,
24,
def test_subnet_request_bad_prefix_type(self):
self.assertRaises(netaddr.core.AddrFormatError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
def test_subnet_request_bad_prefix(self):
self.assertRaises(netaddr.core.AddrFormatError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
33)
self.assertRaises(netaddr.core.AddrFormatError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
def test_subnet_request_bad_gateway(self):
self.assertRaises(ValueError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
def test_subnet_request_allocation_pool_wrong_version(self):
pools = [netaddr.IPRange('0.0.0.4', '0.0.0.5')]
self.assertRaises(ValueError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
def test_subnet_request_allocation_pool_not_in_net(self):
pools = [netaddr.IPRange('0.0.0.64', '0.0.0.128')]
self.assertRaises(ValueError,
- ipam.AnySubnetRequest,
+ ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
class TestIpamSpecificSubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request(self):
- request = ipam.SpecificSubnetRequest(self.tenant_id,
+ request = ipam_req.SpecificSubnetRequest(self.tenant_id,
self.subnet_id,
'1.2.3.0/24',
gateway_ip='1.2.3.1')
def test_subnet_request_bad_gateway(self):
self.assertRaises(ValueError,
- ipam.SpecificSubnetRequest,
+ ipam_req.SpecificSubnetRequest,
self.tenant_id,
self.subnet_id,
'2001::1',
# This class doesn't test much. At least running through all of the
# constructors may shake out some trivial bugs.
- EUI64 = ipam.AutomaticAddressRequest.EUI64
+ EUI64 = ipam_req.AutomaticAddressRequest.EUI64
def setUp(self):
super(TestAddressRequest, self).setUp()
def test_specific_address_ipv6(self):
- request = ipam.SpecificAddressRequest('2000::45')
+ request = ipam_req.SpecificAddressRequest('2000::45')
self.assertEqual(netaddr.IPAddress('2000::45'), request.address)
def test_specific_address_ipv4(self):
- request = ipam.SpecificAddressRequest('1.2.3.32')
+ request = ipam_req.SpecificAddressRequest('1.2.3.32')
self.assertEqual(netaddr.IPAddress('1.2.3.32'), request.address)
def test_any_address(self):
- ipam.AnyAddressRequest()
+ ipam_req.AnyAddressRequest()
def test_automatic_address_request_eui64(self):
subnet_cidr = '2607:f0d0:1002:51::/64'
port_mac = 'aa:bb:cc:dd:ee:ff'
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
- request = ipam.AutomaticAddressRequest(
+ request = ipam_req.AutomaticAddressRequest(
address_type=self.EUI64,
prefix=subnet_cidr,
mac=port_mac)
def test_automatic_address_request_invalid_address_type_raises(self):
self.assertRaises(ipam_exc.InvalidAddressType,
- ipam.AutomaticAddressRequest,
+ ipam_req.AutomaticAddressRequest,
address_type='kaboom')
def test_automatic_address_request_eui64_no_mac_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
- ipam.AutomaticAddressRequest,
+ ipam_req.AutomaticAddressRequest,
address_type=self.EUI64,
prefix='meh')
def test_automatic_address_request_eui64_alien_param_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
- ipam.AutomaticAddressRequest,
+ ipam_req.AutomaticAddressRequest,
address_type=self.EUI64,
mac='meh',
alien='et',
def test_ipam_driver_raises_import_error(self):
self._verify_import_error_is_generated(
- 'neutron.tests.unit.ipam.SomeNonExistentClass')
+ 'neutron.tests.unit.ipam_req.SomeNonExistentClass')
def test_ipam_driver_raises_import_error_for_none(self):
self._verify_import_error_is_generated(None)
def test_specific_address_request_is_loaded(self):
for address in ('10.12.0.15', 'fffe::1'):
self.assertIsInstance(
- ipam.AddressRequestFactory.get_request(None,
- None,
- address),
- ipam.SpecificAddressRequest)
+ ipam_req.AddressRequestFactory.get_request(None,
+ None,
+ address),
+ ipam_req.SpecificAddressRequest)
def test_any_address_request_is_loaded(self):
for addr in [None, '']:
self.assertIsInstance(
- ipam.AddressRequestFactory.get_request(None,
+ ipam_req.AddressRequestFactory.get_request(None,
None,
addr),
- ipam.AnyAddressRequest)
+ ipam_req.AnyAddressRequest)
class TestSubnetRequestFactory(IpamSubnetRequestTestCase):
for address in addresses:
subnet, subnetpool = self._build_subnet_dict(cidr=address)
self.assertIsInstance(
- ipam.SubnetRequestFactory.get_request(None,
+ ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
- ipam.SpecificSubnetRequest)
+ ipam_req.SpecificSubnetRequest)
def test_any_address_request_is_loaded_for_ipv4(self):
subnet, subnetpool = self._build_subnet_dict(cidr=None, ip_version=4)
self.assertIsInstance(
- ipam.SubnetRequestFactory.get_request(None,
+ ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
- ipam.AnySubnetRequest)
+ ipam_req.AnySubnetRequest)
def test_any_address_request_is_loaded_for_ipv6(self):
subnet, subnetpool = self._build_subnet_dict(cidr=None, ip_version=6)
self.assertIsInstance(
- ipam.SubnetRequestFactory.get_request(None,
+ ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
- ipam.AnySubnetRequest)
+ ipam_req.AnySubnetRequest)
def test_args_are_passed_to_specific_request(self):
subnet, subnetpool = self._build_subnet_dict()
- request = ipam.SubnetRequestFactory.get_request(None,
+ request = ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool)
self.assertIsInstance(request,
- ipam.SpecificSubnetRequest)
+ ipam_req.SpecificSubnetRequest)
self.assertEqual(self.tenant_id, request.tenant_id)
self.assertEqual(self.subnet_id, request.subnet_id)
self.assertEqual(None, request.gateway_ip)
def test_get_subnet_request_factory(self):
self.assertEqual(
self.driver.get_subnet_request_factory(),
- ipam.SubnetRequestFactory)
+ ipam_req.SubnetRequestFactory)
def test_get_address_request_factory(self):
self.assertEqual(
self.driver.get_address_request_factory(),
- ipam.AddressRequestFactory)
+ ipam_req.AddressRequestFactory)
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
-import neutron.ipam as ipam
+from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.AnySubnetRequest(self._tenant_id,
+ req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(req)
with self.ctx.session.begin(subtransactions=True):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.SpecificSubnetRequest(self._tenant_id,
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24')
res = sa.allocate_subnet(req)
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.AnySubnetRequest(self._tenant_id,
+ req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4,
21)
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.SpecificSubnetRequest(self._tenant_id,
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.0.0/21')
self.assertRaises(n_exc.SubnetAllocationError,
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.AnySubnetRequest(self._tenant_id,
+ req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(req)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.SpecificSubnetRequest(self._tenant_id,
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24',
gateway_ip='10.1.2.254')
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.SpecificSubnetRequest(self._tenant_id,
- uuidutils.generate_uuid(),
- '2210::/64',
- '2210::ffff:ffff:ffff:ffff')
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
+ uuidutils.generate_uuid(),
+ '2210::/64',
+ '2210::ffff:ffff:ffff:ffff')
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
48, 6, default_quota=1)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
- req = ipam.SpecificSubnetRequest(self._tenant_id,
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'fe80::/63')
self.assertRaises(n_exc.SubnetPoolQuotaExceeded,