# under the License.
import collections
+import itertools
import netaddr
from oslo_config import cfg
pool=pool_range,
ip_address=gateway_ip)
+ def _is_ip_required_by_subnet(self, context, subnet_id, device_owner):
+ # For ports that are not router ports, retain any automatic
+ # (non-optional, e.g. IPv6 SLAAC) addresses.
+ if device_owner in constants.ROUTER_INTERFACE_OWNERS:
+ return True
+
+ subnet = self._get_subnet(context, subnet_id)
+ return not ipv6_utils.is_auto_address_subnet(subnet)
+
def _get_changed_ips_for_port(self, context, original_ips,
new_ips, device_owner):
"""Calculate changes in IPs for the port."""
msg = _('Exceeded maximum amount of fixed ips per port')
raise n_exc.InvalidInput(error_message=msg)
- # These ips are still on the port and haven't been removed
- prev_ips = []
-
- # Remove all of the intersecting elements
- for original_ip in original_ips[:]:
- for new_ip in new_ips[:]:
- if ('ip_address' in new_ip and
- original_ip['ip_address'] == new_ip['ip_address']):
- original_ips.remove(original_ip)
- new_ips.remove(new_ip)
- prev_ips.append(original_ip)
- break
+ add_ips = []
+ remove_ips = []
+ ips_map = {ip['ip_address']: ip
+ for ip in itertools.chain(new_ips, original_ips)
+ if 'ip_address' in ip}
+
+ new = set()
+ for ip in new_ips:
+ if 'ip_address' in ip:
+ new.add(ip['ip_address'])
+ else:
+ add_ips.append(ip)
+
+ # Convert original ip addresses to sets
+ orig = set(ip['ip_address'] for ip in original_ips)
+
+ add = new - orig
+ unchanged = new & orig
+ remove = orig - new
+
+ # Convert results back to list of dicts
+ add_ips += [ips_map[ip] for ip in add]
+ prev_ips = [ips_map[ip] for ip in unchanged]
+
+ # Mark ip for removing if it is not found in new_ips
+ # and subnet requires ip to be set manually.
+ # For auto addresses leave ip unchanged
+ for ip in remove:
+ subnet_id = ips_map[ip]['subnet_id']
+ if self._is_ip_required_by_subnet(context, subnet_id,
+ device_owner):
+ remove_ips.append(ips_map[ip])
else:
- # For ports that are not router ports, retain any automatic
- # (non-optional, e.g. IPv6 SLAAC) addresses.
- if device_owner not in constants.ROUTER_INTERFACE_OWNERS:
- subnet = self._get_subnet(context,
- original_ip['subnet_id'])
- if (ipv6_utils.is_auto_address_subnet(subnet)):
- original_ips.remove(original_ip)
- prev_ips.append(original_ip)
- return self.Changes(add=new_ips,
+ prev_ips.append(ips_map[ip])
+
+ return self.Changes(add=add_ips,
original=prev_ips,
- remove=original_ips)
+ remove=remove_ips)
def _delete_port(self, context, port_id):
query = (context.session.query(models_v2.Port).
('id-2', '192.168.1.2'))
self.default_original_ips = (('id-1', '192.168.1.1'),
('id-5', '172.20.16.5'))
+ self.owner_non_router = constants.DEVICE_OWNER_DHCP
+ self.owner_router = constants.DEVICE_OWNER_ROUTER_INTF
def _prepare_ips(self, ips):
return [{'ip_address': ip[1],
'subnet_id': ip[0]} for ip in ips]
+ def _mock_slaac_subnet_on(self):
+ slaac_subnet = {'ipv6_address_mode': constants.IPV6_SLAAC,
+ 'ipv6_ra_mode': constants.IPV6_SLAAC}
+ self.mixin._get_subnet = mock.Mock(return_value=slaac_subnet)
+
+ def _mock_slaac_subnet_off(self):
+ non_slaac_subnet = {'ipv6_address_mode': None,
+ 'ipv6_ra_mode': None}
+ self.mixin._get_subnet = mock.Mock(return_value=non_slaac_subnet)
+
def _test_get_changed_ips_for_port(self, expected_change, original_ips,
new_ips, owner):
change = self.mixin._get_changed_ips_for_port(self.ctx,
self.assertEqual(expected_change, change)
def test__get_changed_ips_for_port(self):
- owner_router = constants.DEVICE_OWNER_ROUTER_INTF
new_ips = self._prepare_ips(self.default_new_ips)
original_ips = self._prepare_ips(self.default_original_ips)
- # generate changes before calling _get_changed_ips_for_port
- # because new_ips and original_ips are affected during call
expected_change = self.mixin.Changes(add=[new_ips[1]],
original=[original_ips[0]],
remove=[original_ips[1]])
self._test_get_changed_ips_for_port(expected_change, original_ips,
- new_ips, owner_router)
+ new_ips, self.owner_router)
def test__get_changed_ips_for_port_autoaddress(self):
- owner_not_router = constants.DEVICE_OWNER_DHCP
new_ips = self._prepare_ips(self.default_new_ips)
original = (('id-1', '192.168.1.1'),
('id-5', '2000:1234:5678::12FF:FE34:5678'))
original_ips = self._prepare_ips(original)
- # mock to test auto address part
- slaac_subnet = {'ipv6_address_mode': constants.IPV6_SLAAC,
- 'ipv6_ra_mode': constants.IPV6_SLAAC}
- self.mixin._get_subnet = mock.Mock(return_value=slaac_subnet)
+ self._mock_slaac_subnet_on()
- # make a copy of original_ips
- # since it is changed by _get_changed_ips_for_port
expected_change = self.mixin.Changes(add=[new_ips[1]],
- original=original_ips[:],
+ original=original_ips,
remove=[])
+ self._test_get_changed_ips_for_port(expected_change, original_ips,
+ new_ips, self.owner_non_router)
+
+ def _test_get_changed_ips_for_port_no_ip_address(self):
+ # IP address should be added if only subnet_id is provided,
+ # independently from auto_address status for subnet
+ new_ips = [{'subnet_id': 'id-3'}]
+ original_ips = []
+ expected_change = self.mixin.Changes(add=[new_ips[0]],
+ original=[],
+ remove=[])
self._test_get_changed_ips_for_port(expected_change, original_ips,
- new_ips, owner_not_router)
+ new_ips, self.owner_non_router)
+
+ def test__get_changed_ips_for_port_no_ip_address_no_slaac(self):
+ self._mock_slaac_subnet_off()
+ self._test_get_changed_ips_for_port_no_ip_address()
+
+ def test__get_changed_ips_for_port_no_ip_address_slaac(self):
+ self._mock_slaac_subnet_on()
+ self._test_get_changed_ips_for_port_no_ip_address()
+
+ def test__is_ip_required_by_subnet_for_router_port(self):
+ # Owner -> router:
+ # _get_subnet should not be called,
+ # expected True
+ self._mock_slaac_subnet_off()
+
+ result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
+ self.owner_router)
+ self.assertTrue(result)
+ self.assertFalse(self.mixin._get_subnet.called)
+
+ def test__is_ip_required_by_subnet_for_non_router_port(self):
+ # Owner -> not router:
+ # _get_subnet should be called,
+ # expected True, because subnet is not slaac
+ self._mock_slaac_subnet_off()
+
+ result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
+ self.owner_non_router)
+ self.assertTrue(result)
+ self.assertTrue(self.mixin._get_subnet.called)
+
+ def test__is_ip_required_by_subnet_for_non_router_port_and_slaac(self):
+ # Owner -> not router:
+ # _get_subnet should be called,
+ # expected False, because subnet is slaac
+ self._mock_slaac_subnet_on()
+
+ result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
+ self.owner_non_router)
+ self.assertFalse(result)
+ self.assertTrue(self.mixin._get_subnet.called)