# under the License.
import datetime
+import itertools
import random
import netaddr
models_v2.IPAllocationPool).filter_by(subnet_id=subnet_id).
options(orm.joinedload('available_ranges', innerjoin=True)).
with_lockmode('update'))
+ # If there are no available ranges the previous query will return no
+ # results as it uses an inner join to avoid errors with the postgresql
+ # backend (see lp bug 1215350). In this case IP allocation pools must
+ # be loaded with a different query, which does not require lock for
+ # update as the allocation pools for a subnet are immutable.
+ # The 2nd query will be executed only if the first yields no results
+ unlocked_allocation_pools = (context.session.query(
+ models_v2.IPAllocationPool).filter_by(subnet_id=subnet_id))
# Find the allocation pool for the IP to recycle
pool_id = None
- for allocation_pool in allocation_pools:
+
+ for allocation_pool in itertools.chain(allocation_pools,
+ unlocked_allocation_pools):
allocation_pool_range = netaddr.IPRange(
- allocation_pool['first_ip'],
- allocation_pool['last_ip'])
+ allocation_pool['first_ip'], allocation_pool['last_ip'])
if netaddr.IPAddress(ip_address) in allocation_pool_range:
pool_id = allocation_pool['id']
break
for subnet in subnets:
range = range_qry.filter_by(subnet_id=subnet['id']).first()
if not range:
- LOG.debug(_("All IP's from subnet %(subnet_id)s (%(cidr)s) "
+ LOG.debug(_("All IPs from subnet %(subnet_id)s (%(cidr)s) "
"allocated"),
{'subnet_id': subnet['id'], 'cidr': subnet['cidr']})
continue
import random
import mock
+import netaddr
from oslo.config import cfg
from testtools import matchers
import webob.exc
120)
self.assertTrue(log.mock_calls)
- def test_recycle_ip_address_without_allocation_pool(self):
+ def _test_recycle_ip_address(self, ip_to_recycle, allocation_pools=None):
plugin = NeutronManager.get_plugin()
- allocation_pools = [{"start": '10.0.0.10',
- "end": '10.0.0.50'}]
+ if not allocation_pools:
+ allocation_pools = [{"start": '10.0.0.10',
+ "end": '10.0.0.50'}]
with self.subnet(cidr='10.0.0.0/24',
allocation_pools=allocation_pools) as subnet:
network_id = subnet['subnet']['network_id']
subnet_id = subnet['subnet']['id']
fixed_ips = [{"subnet_id": subnet_id,
- "ip_address": '10.0.0.100'}]
+ "ip_address": ip_to_recycle}]
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
- update_context = context.Context('', port['port']['tenant_id'])
+ ctx = context.Context('', port['port']['tenant_id'])
ip_address = port['port']['fixed_ips'][0]['ip_address']
- plugin._recycle_ip(update_context,
- network_id,
- subnet_id,
- ip_address)
+ plugin._recycle_ip(ctx, network_id, subnet_id, ip_address)
- q = update_context.session.query(models_v2.IPAllocation)
+ q = ctx.session.query(models_v2.IPAllocation)
q = q.filter_by(subnet_id=subnet_id)
self.assertEqual(q.count(), 0)
+ # If the IP address is in the allocation pool also verify the
+ # address is returned to the availability range
+ for allocation_pool in allocation_pools:
+ allocation_pool_range = netaddr.IPRange(
+ allocation_pool['start'], allocation_pool['end'])
+ if netaddr.IPAddress(ip_to_recycle) in allocation_pool_range:
+ # Do not worry about no result found exception
+ pool = ctx.session.query(
+ models_v2.IPAllocationPool).filter_by(
+ subnet_id=subnet_id).one()
+ ip_av_range = ctx.session.query(
+ models_v2.IPAvailabilityRange).filter_by(
+ allocation_pool_id=pool['id']).first()
+ self.assertIsNotNone(ip_av_range)
+ self.assertIn(netaddr.IPAddress(ip_to_recycle),
+ netaddr.IPRange(ip_av_range['first_ip'],
+ ip_av_range['last_ip']))
+
+ def test_recycle_ip_address_outside_allocation_pool(self):
+ self._test_recycle_ip_address('10.0.0.100')
+
+ def test_recycle_ip_address_in_allocation_pool(self):
+ self._test_recycle_ip_address('10.0.0.20')
+
+ def test_recycle_ip_address_on_exhausted_allocation_pool(self):
+ # Perform the recycle ip address on a subnet with a single address
+ # in the pool to verify the corner case exposed by bug 1240353
+ self._test_recycle_ip_address(
+ '10.0.0.20',
+ allocation_pools=[{'start': '10.0.0.20',
+ 'end': '10.0.0.20'}])
def test_max_fixed_ips_exceeded(self):
with self.subnet(gateway_ip='10.0.0.3',