# under the License.
import datetime
-import itertools
import random
import netaddr
"""Return an IP address to the pool of free IP's on the network
- # Grab all allocation pools for the subnet
- allocation_pools = (context.session.query(
- models_v2.IPAllocationPool).filter_by(subnet_id=subnet_id).
- options(orm.joinedload('available_ranges', innerjoin=True)).
- with_lockmode('update'))
- # If there are no available ranges the previous query will return no
- # results as it uses an inner join to avoid errors with the postgresql
- # backend (see lp bug 1215350). In this case IP allocation pools must
- # be loaded with a different query, which does not require lock for
- # update as the allocation pools for a subnet are immutable.
- # The 2nd query will be executed only if the first yields no results
- unlocked_allocation_pools = (context.session.query(
- models_v2.IPAllocationPool).filter_by(subnet_id=subnet_id))
- # Find the allocation pool for the IP to recycle
- pool_id = None
- for allocation_pool in itertools.chain(allocation_pools,
- unlocked_allocation_pools):
- allocation_pool_range = netaddr.IPRange(
- allocation_pool['first_ip'], allocation_pool['last_ip'])
- if netaddr.IPAddress(ip_address) in allocation_pool_range:
- pool_id = allocation_pool['id']
- break
- if not pool_id:
- NeutronDbPluginV2._delete_ip_allocation(
- context, network_id, subnet_id, ip_address)
- return
- # Two requests will be done on the database. The first will be to
- # search if an entry starts with ip_address + 1 (r1). The second
- # will be to see if an entry ends with ip_address -1 (r2).
- # If 1 of the above holds true then the specific entry will be
- # modified. If both hold true then the two ranges will be merged.
- # If there are no entries then a single entry will be added.
- range_qry = context.session.query(
- models_v2.IPAvailabilityRange).with_lockmode('update')
- ip_first = str(netaddr.IPAddress(ip_address) + 1)
- ip_last = str(netaddr.IPAddress(ip_address) - 1)
- LOG.debug(_("Recycle %s"), ip_address)
- try:
- r1 = range_qry.filter_by(allocation_pool_id=pool_id,
- first_ip=ip_first).one()
- LOG.debug(_("Recycle: first match for %(first_ip)s-%(last_ip)s"),
- {'first_ip': r1['first_ip'], 'last_ip': r1['last_ip']})
- except exc.NoResultFound:
- r1 = []
- try:
- r2 = range_qry.filter_by(allocation_pool_id=pool_id,
- last_ip=ip_last).one()
- LOG.debug(_("Recycle: last match for %(first_ip)s-%(last_ip)s"),
- {'first_ip': r2['first_ip'], 'last_ip': r2['last_ip']})
- except exc.NoResultFound:
- r2 = []
- if r1 and r2:
- # Merge the two ranges
- ip_range = models_v2.IPAvailabilityRange(
- allocation_pool_id=pool_id,
- first_ip=r2['first_ip'],
- last_ip=r1['last_ip'])
- context.session.add(ip_range)
- LOG.debug(_("Recycle: merged %(first_ip1)s-%(last_ip1)s and "
- "%(first_ip2)s-%(last_ip2)s"),
- {'first_ip1': r2['first_ip'], 'last_ip1': r2['last_ip'],
- 'first_ip2': r1['first_ip'], 'last_ip2': r1['last_ip']})
- context.session.delete(r1)
- context.session.delete(r2)
- elif r1:
- # Update the range with matched first IP
- r1['first_ip'] = ip_address
- LOG.debug(_("Recycle: updated first %(first_ip)s-%(last_ip)s"),
- {'first_ip': r1['first_ip'], 'last_ip': r1['last_ip']})
- elif r2:
- # Update the range with matched last IP
- r2['last_ip'] = ip_address
- LOG.debug(_("Recycle: updated last %(first_ip)s-%(last_ip)s"),
- {'first_ip': r2['first_ip'], 'last_ip': r2['last_ip']})
- else:
- # Create a new range
- ip_range = models_v2.IPAvailabilityRange(
- allocation_pool_id=pool_id,
- first_ip=ip_address,
- last_ip=ip_address)
- context.session.add(ip_range)
- LOG.debug(_("Recycle: created new %(first_ip)s-%(last_ip)s"),
- {'first_ip': ip_address, 'last_ip': ip_address})
NeutronDbPluginV2._delete_ip_allocation(context, network_id, subnet_id,
def _generate_ip(context, subnets):
+ try:
+ return NeutronDbPluginV2._try_generate_ip(context, subnets)
+ except q_exc.IpAddressGenerationFailure:
+ NeutronDbPluginV2._rebuild_availability_ranges(context, subnets)
+ return NeutronDbPluginV2._try_generate_ip(context, subnets)
+ @staticmethod
+ def _try_generate_ip(context, subnets):
"""Generate an IP address.
The IP address will be generated from one of the subnets defined on
return {'ip_address': ip_address, 'subnet_id': subnet['id']}
raise q_exc.IpAddressGenerationFailure(net_id=subnets[0]['network_id'])
+ @staticmethod
+ def _rebuild_availability_ranges(context, subnets):
+ ip_qry = context.session.query(
+ models_v2.IPAllocation).with_lockmode('update')
+ # PostgreSQL does not support select...for update with an outer join.
+ # No join is needed here.
+ pool_qry = context.session.query(
+ models_v2.IPAllocationPool).options(
+ orm.noload('available_ranges')).with_lockmode('update')
+ for subnet in sorted(subnets):
+ LOG.debug(_("Rebuilding availability ranges for subnet %s")
+ % subnet)
+ # Create a set of all currently allocated addresses
+ ip_qry_results = ip_qry.filter_by(subnet_id=subnet['id'])
+ allocations = netaddr.IPSet([netaddr.IPAddress(i['ip_address'])
+ for i in ip_qry_results])
+ for pool in pool_qry.filter_by(subnet_id=subnet['id']):
+ # Create a set of all addresses in the pool
+ poolset = netaddr.IPSet(netaddr.iter_iprange(pool['first_ip'],
+ pool['last_ip']))
+ # Use set difference to find free addresses in the pool
+ available = poolset - allocations
+ # Generator compacts an ip set into contiguous ranges
+ def ipset_to_ranges(ipset):
+ first, last = None, None
+ for cidr in ipset.iter_cidrs():
+ if last and last + 1 != cidr.first:
+ yield netaddr.IPRange(first, last)
+ first = None
+ first, last = first if first else cidr.first, cidr.last
+ if first:
+ yield netaddr.IPRange(first, last)
+ # Write the ranges to the db
+ for range in ipset_to_ranges(available):
+ available_range = models_v2.IPAvailabilityRange(
+ allocation_pool_id=pool['id'],
+ first_ip=str(netaddr.IPAddress(range.first)),
+ last_ip=str(netaddr.IPAddress(range.last)))
+ context.session.add(available_range)
def _allocate_specific_ip(context, subnet_id, ip_address):
"""Allocate a specific IP address on the subnet."""
import contextlib
import copy
-import datetime
import os
-import random
import mock
-import netaddr
from oslo.config import cfg
from testtools import matchers
import webob.exc
from neutron.db import models_v2
from neutron.manager import NeutronManager
from neutron.openstack.common import importutils
-from neutron.openstack.common import timeutils
from neutron.tests import base
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
- self.assertEqual(ips[0]['ip_address'], '')
+ self.assertEqual(ips[0]['ip_address'], '')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
- self.assertEqual(ips[1]['ip_address'], '')
+ self.assertEqual(ips[1]['ip_address'], '')
self.assertEqual(ips[1]['subnet_id'], subnet['subnet']['id'])
def test_requested_duplicate_mac(self):
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
- def test_recycling(self):
- # set expirations to past so that recycling is checked
- reference = datetime.datetime(2012, 8, 13, 23, 11, 0)
- cfg.CONF.set_override('dhcp_lease_duration', 0)
- with self.subnet(cidr='') as subnet:
- with self.port(subnet=subnet) as port:
- with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
- mock_utcnow.return_value = reference
- ips = port['port']['fixed_ips']
- self.assertEqual(len(ips), 1)
- self.assertEqual(ips[0]['ip_address'], '')
- self.assertEqual(ips[0]['subnet_id'],
- subnet['subnet']['id'])
- net_id = port['port']['network_id']
- ports = []
- for i in range(16 - 3):
- res = self._create_port(self.fmt, net_id=net_id)
- p = self.deserialize(self.fmt, res)
- ports.append(p)
- for i in range(16 - 3):
- x = random.randrange(0, len(ports), 1)
- p = ports.pop(x)
- self._delete('ports', p['port']['id'])
- res = self._create_port(self.fmt, net_id=net_id)
- port = self.deserialize(self.fmt, res)
- ips = port['port']['fixed_ips']
- self.assertEqual(len(ips), 1)
- self.assertEqual(ips[0]['ip_address'], '')
- self.assertEqual(ips[0]['subnet_id'],
- subnet['subnet']['id'])
- self._delete('ports', port['port']['id'])
def test_invalid_admin_state(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
- def _test_recycle_ip_address(self, ip_to_recycle, allocation_pools=None):
- plugin = NeutronManager.get_plugin()
- if not allocation_pools:
- allocation_pools = [{"start": '',
- "end": ''}]
- with self.subnet(cidr='',
- allocation_pools=allocation_pools) as subnet:
- network_id = subnet['subnet']['network_id']
- subnet_id = subnet['subnet']['id']
- fixed_ips = [{"subnet_id": subnet_id,
- "ip_address": ip_to_recycle}]
- with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
- ctx = context.Context('', port['port']['tenant_id'])
- ip_address = port['port']['fixed_ips'][0]['ip_address']
- plugin._recycle_ip(ctx, network_id, subnet_id, ip_address)
- q = ctx.session.query(models_v2.IPAllocation)
- q = q.filter_by(subnet_id=subnet_id)
- self.assertEqual(q.count(), 0)
- # If the IP address is in the allocation pool also verify the
- # address is returned to the availability range
- for allocation_pool in allocation_pools:
- allocation_pool_range = netaddr.IPRange(
- allocation_pool['start'], allocation_pool['end'])
- if netaddr.IPAddress(ip_to_recycle) in allocation_pool_range:
- # Do not worry about no result found exception
- pool = ctx.session.query(
- models_v2.IPAllocationPool).filter_by(
- subnet_id=subnet_id).one()
- ip_av_range = ctx.session.query(
- models_v2.IPAvailabilityRange).filter_by(
- allocation_pool_id=pool['id']).first()
- self.assertIsNotNone(ip_av_range)
- self.assertIn(netaddr.IPAddress(ip_to_recycle),
- netaddr.IPRange(ip_av_range['first_ip'],
- ip_av_range['last_ip']))
- def test_recycle_ip_address_outside_allocation_pool(self):
- self._test_recycle_ip_address('')
- def test_recycle_ip_address_in_allocation_pool(self):
- self._test_recycle_ip_address('')
- def test_recycle_ip_address_on_exhausted_allocation_pool(self):
- # Perform the recycle ip address on a subnet with a single address
- # in the pool to verify the corner case exposed by bug 1240353
- self._test_recycle_ip_address(
- '',
- allocation_pools=[{'start': '',
- 'end': ''}])
def test_max_fixed_ips_exceeded(self):
with self.subnet(gateway_ip='',
cidr='') as subnet:
self.assertEqual(actual_repr_output, final_exp)
+class TestNeutronDbPluginV2(base.BaseTestCase):
+ """Unit Tests for NeutronDbPluginV2 IPAM Logic."""
+ def test_generate_ip(self):
+ with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
+ '_try_generate_ip') as generate:
+ with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
+ '_rebuild_availability_ranges') as rebuild:
+ db_base_plugin_v2.NeutronDbPluginV2._generate_ip('c', 's')
+ generate.assert_called_once_with('c', 's')
+ self.assertEqual(0, rebuild.call_count)
+ def test_generate_ip_exhausted_pool(self):
+ with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
+ '_try_generate_ip') as generate:
+ with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
+ '_rebuild_availability_ranges') as rebuild:
+ exception = q_exc.IpAddressGenerationFailure(net_id='n')
+ generate.side_effect = exception
+ # I want the side_effect to throw an exception once but I
+ # didn't see a way to do this. So, let it throw twice and
+ # catch the second one. Check below to ensure that
+ # _try_generate_ip was called twice.
+ try:
+ db_base_plugin_v2.NeutronDbPluginV2._generate_ip('c', 's')
+ except q_exc.IpAddressGenerationFailure:
+ pass
+ self.assertEqual(2, generate.call_count)
+ rebuild.assert_called_once_with('c', 's')
+ def test_rebuild_availability_ranges(self):
+ pools = [{'id': 'a',
+ 'first_ip': '',
+ 'last_ip': ''},
+ {'id': 'b',
+ 'first_ip': '',
+ 'last_ip': ''}]
+ allocations = [{'ip_address': ''},
+ {'ip_address': ''},
+ {'ip_address': ''},
+ {'ip_address': ''},
+ {'ip_address': ''},
+ {'ip_address': ''},
+ {'ip_address': ''}]
+ ip_qry = mock.Mock()
+ ip_qry.with_lockmode.return_value = ip_qry
+ ip_qry.filter_by.return_value = allocations
+ pool_qry = mock.Mock()
+ pool_qry.options.return_value = pool_qry
+ pool_qry.with_lockmode.return_value = pool_qry
+ pool_qry.filter_by.return_value = pools
+ def return_queries_side_effect(*args, **kwargs):
+ if args[0] == models_v2.IPAllocation:
+ return ip_qry
+ if args[0] == models_v2.IPAllocationPool:
+ return pool_qry
+ context = mock.Mock()
+ context.session.query.side_effect = return_queries_side_effect
+ subnets = [mock.MagicMock()]
+ db_base_plugin_v2.NeutronDbPluginV2._rebuild_availability_ranges(
+ context, subnets)
+ actual = [[args[0].allocation_pool_id,
+ args[0].first_ip, args[0].last_ip]
+ for _name, args, _kwargs in context.session.add.mock_calls]
+ self.assertEqual([['a', '', ''],
+ ['a', '', ''],
+ ['b', '', ''],
+ ['b', '', '']], actual)
class NeutronDbPluginV2AsMixinTestCase(base.BaseTestCase):
"""Tests for NeutronDbPluginV2 as Mixin.