def _delete_router(self, agent, router_id):
agent._router_removed(router_id)
- def _add_fip(self, router, fip_address, fixed_address='10.0.0.2'):
+ def _add_fip(self, router, fip_address, fixed_address='10.0.0.2',
+ host=None):
fip = {'id': _uuid(),
'port_id': _uuid(),
'floating_ip_address': fip_address,
- 'fixed_ip_address': fixed_address}
+ 'fixed_ip_address': fixed_address,
+ 'host': host}
router.router[l3_constants.FLOATINGIP_KEY].append(fip)
def _namespace_exists(self, namespace):
assert_ovs_bridge_empty(self.agent.conf.ovs_integration_bridge)
assert_ovs_bridge_empty(self.agent.conf.external_network_bridge)
+ def floating_ips_configured(self, router):
+ floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
+ external_port = router.get_ex_gw_port()
+ return len(floating_ips) and all(ip_lib.device_exists_with_ip_mac(
+ self.agent.get_external_device_name(external_port['id']),
+ '%s/32' % fip['floating_ip_address'],
+ external_port['mac_address'],
+ namespace=router.ns_name) for fip in floating_ips)
+
class L3AgentTestCase(L3AgentTestFramework):
def test_observer_notifications_legacy_router(self):
# platform) is updated to 1.2.10 (or above).
# For more details: https://review.openstack.org/#/c/151284/
self._assert_gateway(router)
- self.assertTrue(self._floating_ips_configured(router))
+ self.assertTrue(self.floating_ips_configured(router))
self._assert_snat_chains(router)
self._assert_floating_ip_chains(router)
self._assert_extra_routes(router)
expected_gateway = external_port['subnet']['gateway_ip']
self.assertEqual(expected_gateway, existing_gateway)
- def _floating_ips_configured(self, router):
- floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
- external_port = router.get_ex_gw_port()
- return len(floating_ips) and all(ip_lib.device_exists_with_ip_mac(
- self.agent.get_external_device_name(external_port['id']),
- '%s/32' % fip['floating_ip_address'],
- external_port['mac_address'],
- namespace=router.ns_name) for fip in floating_ips)
-
def _assert_ha_device(self, router):
self.assertTrue(self.device_exists_with_ip_mac(
router.router[l3_constants.HA_INTERFACE_KEY],
restarted_agent = neutron_l3_agent.L3NATAgentWithStateReport(
self.agent.host, self.agent.conf)
self._create_router(restarted_agent, router1.router)
- utils.wait_until_true(lambda: self._floating_ips_configured(router1))
+ utils.wait_until_true(lambda: self.floating_ips_configured(router1))
class L3HATestFramework(L3AgentTestFramework):
device_name = fip_ns.get_rtr_ext_device_name(router.router_id)
self.assertTrue(ip_lib.device_exists(
device_name, namespace=router.ns_name))
+
+ def test_dvr_router_rem_fips_on_restarted_agent(self):
+ self.agent.conf.agent_mode = 'dvr_snat'
+ router_info = self.generate_dvr_router_info()
+ router1 = self._create_router(self.agent, router_info)
+ self._add_fip(router1, '192.168.111.12', self.agent.conf.host)
+ fip_ns = router1.fip_ns.get_name()
+ restarted_agent = neutron_l3_agent.L3NATAgentWithStateReport(
+ self.agent.host, self.agent.conf)
+ router1.router[l3_constants.FLOATINGIP_KEY] = []
+ self._create_router(restarted_agent, router1.router)
+ self._assert_dvr_snat_gateway(router1)
+ self.assertFalse(self._namespace_exists(fip_ns))