port, subnets, new_port = self._add_interface_by_subnet(
context, router, interface_info['subnet_id'], device_owner)
+ subnet = subnets[0]
+
if new_port:
if router.extra_attributes.distributed and router.gw_port:
try:
)
context.session.add(router_port)
+ # NOTE: For IPv6 additional subnets added to the same
+ # network we need to update the CSNAT port with respective
+ # IPv6 subnet
+ elif subnet and port:
+ fixed_ip = {'subnet_id': subnet['id']}
+ if subnet['ip_version'] == 6:
+ # Add new prefix to an existing ipv6 csnat port with the
+ # same network id if one exists
+ cs_port = self._find_router_port_by_network_and_device_owner(
+ router, subnet['network_id'],
+ l3_const.DEVICE_OWNER_ROUTER_SNAT)
+ if cs_port:
+ fixed_ips = list(cs_port['port']['fixed_ips'])
+ fixed_ips.append(fixed_ip)
+ updated_port = self._core_plugin.update_port(
+ context.elevated(),
+ cs_port['port_id'], {'port': {'fixed_ips': fixed_ips}})
+ LOG.debug("CSNAT port updated for IPv6 subnet: "
+ "%s", updated_port)
router_interface_info = self._make_router_interface_info(
- router_id, port['tenant_id'], port['id'], subnets[-1]['id'],
- [subnet['id'] for subnet in subnets])
+ router_id, port['tenant_id'], port['id'], subnet['id'],
+ [subnet['id']])
self.notify_router_interface_action(
context, router_interface_info, 'add')
return router_interface_info
- def _port_has_ipv6_address(self, port):
+ def _port_has_ipv6_address(self, port, csnat_port_check=True):
"""Overridden to return False if DVR SNAT port."""
- if port['device_owner'] == l3_const.DEVICE_OWNER_ROUTER_SNAT:
- return False
+ if csnat_port_check:
+ if port['device_owner'] == l3_const.DEVICE_OWNER_ROUTER_SNAT:
+ return False
return super(L3_NAT_with_dvr_db_mixin,
self)._port_has_ipv6_address(port)
+ def _find_router_port_by_network_and_device_owner(
+ self, router, net_id, device_owner):
+ for port in router.attached_ports:
+ p = port['port']
+ if (p['network_id'] == net_id and
+ p['device_owner'] == device_owner and
+ self._port_has_ipv6_address(p, csnat_port_check=False)):
+ return port
+
+ def _check_for_multiprefix_csnat_port_and_update(
+ self, context, router, subnet, port):
+ """Checks if the csnat port contains multiple ipv6 prefixes.
+
+ If the csnat port contains multiple ipv6 prefixes for the given
+ network when a router interface is deleted, make sure we don't
+ delete the port when a single subnet is deleted and just update
+ it with the right fixed_ip.
+ This function returns true if it is a multiprefix port.
+ """
+ subnet_id = subnet['id']
+ if router.gw_port and subnet_id:
+ # If router has a gateway port, check if it has IPV6 subnet
+ cs_port = (
+ self._find_router_port_by_network_and_device_owner(
+ router, subnet['network_id'],
+ l3_const.DEVICE_OWNER_ROUTER_SNAT))
+ if cs_port:
+ fixed_ips = (
+ [fixedip for fixedip in
+ cs_port['port']['fixed_ips']
+ if fixedip['subnet_id'] != subnet_id])
+ if fixed_ips:
+ # multiple prefix port - delete prefix from port
+ self._core_plugin.update_port(
+ context.elevated(),
+ cs_port['port_id'], {'port': {'fixed_ips': fixed_ips}})
+ return True
+ return False
+
def _check_dvr_router_remove_required_and_notify_agent(
- self, context, router, port, subnets):
+ self, context, router, port, subnet):
if router.extra_attributes.distributed:
- if router.gw_port and subnets[0]['id']:
+ is_multiple_prefix_csport = (
+ self._check_for_multiprefix_csnat_port_and_update(
+ context, router, subnet, port))
+ if not is_multiple_prefix_csport:
+ # Single prefix port - go ahead and delete the port
self.delete_csnat_router_interface_ports(
- context.elevated(), router, subnet_id=subnets[0]['id'])
+ context.elevated(), router, subnet_id=subnet['id'])
plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
l3_agents = plugin.get_l3_agents_hosting_routers(context,
plugin.remove_router_from_l3_agent(
context, l3_agent['id'], router['id'])
router_interface_info = self._make_router_interface_info(
- router['id'], port['tenant_id'], port['id'], subnets[0]['id'],
- [subnet['id'] for subnet in subnets])
+ router['id'], port['tenant_id'], port['id'], subnet['id'],
+ [subnet['id']])
self.notify_router_interface_action(
context, router_interface_info, 'remove')
return router_interface_info
else:
port, subnets = self._remove_interface_by_subnet(
context, router_id, subnet_id, device_owner)
+
+ subnet = subnets[0]
router_interface_info = (
self._check_dvr_router_remove_required_and_notify_agent(
- context, router, port, subnets))
+ context, router, port, subnet))
return router_interface_info
def _get_snat_sync_interfaces(self, context, router_ids):
def test_delete_floating_ip_agent_notification_non_dvr(self):
self._test_delete_floating_ip_agent_notification(dvr=False)
+ def test_router_with_ipv4_and_multiple_ipv6_on_same_network(self):
+ kwargs = {'arg_list': (external_net.EXTERNAL,),
+ external_net.EXTERNAL: True}
+ ext_net = self._make_network(self.fmt, '', True, **kwargs)
+ self._make_subnet(
+ self.fmt, ext_net, '10.0.0.1', '10.0.0.0/24',
+ ip_version=4, enable_dhcp=True)
+ self._make_subnet(
+ self.fmt, ext_net, '2001:db8::1', '2001:db8::/64',
+ ip_version=6, enable_dhcp=True)
+ router1 = self._create_router()
+ self.l3_plugin._update_router_gw_info(
+ self.context, router1['id'],
+ {'network_id': ext_net['network']['id']})
+ snat_router_intfs = self.l3_plugin._get_snat_sync_interfaces(
+ self.context, [router1['id']])
+ self.assertEqual(0, len(snat_router_intfs[router1['id']]))
+ private_net1 = self._make_network(self.fmt, 'net1', True)
+ private_ipv6_subnet1 = self._make_subnet(self.fmt,
+ private_net1, 'fd00::1',
+ cidr='fd00::1/64', ip_version=6,
+ ipv6_ra_mode='slaac',
+ ipv6_address_mode='slaac')
+ private_ipv6_subnet2 = self._make_subnet(self.fmt,
+ private_net1, 'fd01::1',
+ cidr='fd01::1/64', ip_version=6,
+ ipv6_ra_mode='slaac',
+ ipv6_address_mode='slaac')
+ # Add the first IPv6 subnet to the router
+ self.l3_plugin.add_router_interface(
+ self.context, router1['id'],
+ {'subnet_id': private_ipv6_subnet1['subnet']['id']})
+ # Check for the internal snat port interfaces
+ snat_router_intfs = self.l3_plugin._get_snat_sync_interfaces(
+ self.context, [router1['id']])
+ self.assertEqual(1, len(snat_router_intfs[router1['id']]))
+ # Add the second IPv6 subnet to the router
+ self.l3_plugin.add_router_interface(
+ self.context, router1['id'],
+ {'subnet_id': private_ipv6_subnet2['subnet']['id']})
+ # Check for the internal snat port interfaces
+ snat_router_intfs = self.l3_plugin._get_snat_sync_interfaces(
+ self.context, [router1['id']])
+ snat_intf_list = snat_router_intfs[router1['id']]
+ fixed_ips = snat_intf_list[0]['fixed_ips']
+ self.assertEqual(1, len(snat_router_intfs[router1['id']]))
+ self.assertEqual(2, len(fixed_ips))
+ # Now delete the router interface and it should update the
+ # SNAT port with the right fixed_ips instead of deleting it.
+ self.l3_plugin.remove_router_interface(
+ self.context, router1['id'],
+ {'subnet_id': private_ipv6_subnet2['subnet']['id']})
+ # Check for the internal snat port interfaces
+ snat_router_intfs = self.l3_plugin._get_snat_sync_interfaces(
+ self.context, [router1['id']])
+ snat_intf_list = snat_router_intfs[router1['id']]
+ fixed_ips = snat_intf_list[0]['fixed_ips']
+ self.assertEqual(1, len(snat_router_intfs[router1['id']]))
+ self.assertEqual(1, len(fixed_ips))
+
def test_update_vm_port_host_router_update(self):
# register l3 agent in dvr mode in addition to existing dvr_snat agent
HOST = 'host1'