self._assert_internal_devices(router)
self._assert_external_device(router)
self._assert_gateway(router)
- self._assert_floating_ips(router)
+ self.assertTrue(self._floating_ips_configured(router))
self._assert_snat_chains(router)
self._assert_floating_ip_chains(router)
self._assert_metadata_chains(router)
expected_gateway = external_port['subnet']['gateway_ip']
self.assertEqual(expected_gateway, existing_gateway)
- def _assert_floating_ips(self, router):
+ def _floating_ips_configured(self, router):
floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
- self.assertTrue(len(floating_ips))
external_port = self.agent._get_ex_gw_port(router)
- for fip in floating_ips:
- self.assertTrue(ip_lib.device_exists_with_ip_mac(
- self.agent.get_external_device_name(external_port['id']),
- '%s/32' % fip['floating_ip_address'],
- external_port['mac_address'],
- router.ns_name, self.root_helper))
+ return len(floating_ips) and all(ip_lib.device_exists_with_ip_mac(
+ self.agent.get_external_device_name(external_port['id']),
+ '%s/32' % fip['floating_ip_address'],
+ external_port['mac_address'],
+ router.ns_name, self.root_helper) for fip in floating_ips)
def _assert_ha_device(self, router):
self.assertTrue(self.device_exists_with_ip_mac(
device = ip_lib.IPDevice(interface, self.root_helper, router.ns_name)
self.assertEqual([], device.addr.list())
+ def test_ha_router_conf_on_restarted_agent(self):
+ router_info = self.generate_router_info(enable_ha=True)
+ router1 = self._create_router(self.agent, router_info)
+ self._add_fip(router1, '192.168.111.12')
+ restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
+ self.agent.conf)
+ self._create_router(restarted_agent, router1.router)
+ helpers.wait_until_true(lambda: self._floating_ips_configured(router1))
+
class L3HATestFramework(L3AgentTestFramework):
def setUp(self):