pass
self.assertTrue(all([x.called for x in reset_mocks]))
+ def test_scan_ports_failure(self):
+ with mock.patch.object(self.agent,
+ 'check_ovs_status',
+ return_value=constants.OVS_RESTARTED),\
+ mock.patch.object(self.agent, 'scan_ports',
+ side_effect=TypeError('broken')),\
+ mock.patch.object(self.agent, '_agent_has_updates',
+ return_value=True),\
+ mock.patch.object(self.agent, '_check_and_handle_signal',
+ side_effect=[True, False]):
+ # block RPC calls and bridge calls
+ self.agent.setup_physical_bridges = mock.Mock()
+ self.agent.setup_integration_br = mock.Mock()
+ self.agent.reset_tunnel_br = mock.Mock()
+ self.agent.state_rpc = mock.Mock()
+ self.agent.rpc_loop(polling_manager=mock.Mock())
+
class TestOvsDvrNeutronAgentOFCtl(TestOvsDvrNeutronAgent,
ovs_test_base.OVSOFCtlTestBase):