from neutron.tests import base
LOCAL_IP = '192.168.0.33'
+DEVICE_1 = 'tapabcdef01-12'
class FakeIpLinkCommand(object):
self.get_mac = self.get_mac_p.start()
self.get_mac.return_value = '00:00:00:00:00:01'
+ def test_treat_devices_removed_with_existed_device(self):
+ agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
+ 0,
+ None)
+ devices = [DEVICE_1]
+ with contextlib.nested(
+ mock.patch.object(agent.plugin_rpc, "update_device_down"),
+ mock.patch.object(agent, "remove_devices_filter")
+ ) as (fn_udd, fn_rdf):
+ fn_udd.return_value = {'device': DEVICE_1,
+ 'exists': True}
+ with mock.patch.object(linuxbridge_neutron_agent.LOG,
+ 'info') as log:
+ resync = agent.treat_devices_removed(devices)
+ self.assertEqual(2, log.call_count)
+ self.assertFalse(resync)
+ self.assertTrue(fn_udd.called)
+ self.assertTrue(fn_rdf.called)
+
+ def test_treat_devices_removed_with_not_existed_device(self):
+ agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
+ 0,
+ None)
+ devices = [DEVICE_1]
+ with contextlib.nested(
+ mock.patch.object(agent.plugin_rpc, "update_device_down"),
+ mock.patch.object(agent, "remove_devices_filter")
+ ) as (fn_udd, fn_rdf):
+ fn_udd.return_value = {'device': DEVICE_1,
+ 'exists': False}
+ with mock.patch.object(linuxbridge_neutron_agent.LOG,
+ 'debug') as log:
+ resync = agent.treat_devices_removed(devices)
+ self.assertEqual(1, log.call_count)
+ self.assertFalse(resync)
+ self.assertTrue(fn_udd.called)
+ self.assertTrue(fn_rdf.called)
+
+ def test_treat_devices_removed_failed(self):
+ agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
+ 0,
+ None)
+ devices = [DEVICE_1]
+ with contextlib.nested(
+ mock.patch.object(agent.plugin_rpc, "update_device_down"),
+ mock.patch.object(agent, "remove_devices_filter")
+ ) as (fn_udd, fn_rdf):
+ fn_udd.side_effect = Exception()
+ with mock.patch.object(linuxbridge_neutron_agent.LOG,
+ 'debug') as log:
+ resync = agent.treat_devices_removed(devices)
+ self.assertEqual(2, log.call_count)
+ self.assertTrue(resync)
+ self.assertTrue(fn_udd.called)
+ self.assertTrue(fn_rdf.called)
+
def test_update_devices_failed(self):
agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
0,