self.assertTrue(self._namespace_exists(router1.ns_name))
self.assertTrue(self._namespace_exists(fip_ns))
self._assert_snat_namespace_does_not_exist(router1)
+
+ def _assert_snat_namespace_exists(self, router):
+ namespace = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
+ router.router_id)
+ self.assertTrue(self._namespace_exists(namespace))
+
+ def _get_dvr_snat_namespace_device_status(
+ self, router, internal_dev_name=None):
+ """Function returns the internal and external device status."""
+ snat_ns = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
+ router.router_id)
+ external_port = router.get_ex_gw_port()
+ external_device_name = router.get_external_device_name(
+ external_port['id'])
+ qg_device_created_successfully = ip_lib.device_exists(
+ external_device_name, namespace=snat_ns)
+ sg_device_created_successfully = ip_lib.device_exists(
+ internal_dev_name, namespace=snat_ns)
+ return qg_device_created_successfully, sg_device_created_successfully
+
+ def test_dvr_router_snat_namespace_with_interface_remove(self):
+ """Test to validate the snat namespace with interface remove.
+
+ This test validates the snat namespace for all the external
+ and internal devices. It also validates if the internal
+ device corresponding to the router interface is removed
+ when the router interface is deleted.
+ """
+ self.agent.conf.agent_mode = 'dvr_snat'
+ router_info = self.generate_dvr_router_info()
+ snat_internal_port = router_info[l3_constants.SNAT_ROUTER_INTF_KEY]
+ router1 = self.manage_router(self.agent, router_info)
+ csnat_internal_port = (
+ router1.router[l3_constants.SNAT_ROUTER_INTF_KEY])
+ # Now save the internal device name to verify later
+ internal_device_name = router1._get_snat_int_device_name(
+ csnat_internal_port[0]['id'])
+ self._assert_snat_namespace_exists(router1)
+ qg_device, sg_device = self._get_dvr_snat_namespace_device_status(
+ router1, internal_dev_name=internal_device_name)
+ self.assertTrue(qg_device)
+ self.assertTrue(sg_device)
+ self.assertEqual(router1.snat_ports, snat_internal_port)
+ # Now let us not pass INTERFACE_KEY, to emulate
+ # the interface has been removed.
+ router1.router[l3_constants.INTERFACE_KEY] = []
+ # Now let us not pass the SNAT_ROUTER_INTF_KEY, to emulate
+ # that the server did not send it, since the interface has been
+ # removed.
+ router1.router[l3_constants.SNAT_ROUTER_INTF_KEY] = []
+ self.agent._process_updated_router(router1.router)
+ router_updated = self.agent.router_info[router_info['id']]
+ self._assert_snat_namespace_exists(router_updated)
+ qg_device, sg_device = self._get_dvr_snat_namespace_device_status(
+ router_updated, internal_dev_name=internal_device_name)
+ self.assertFalse(sg_device)
+ self.assertTrue(qg_device)