ovs_restarted = False
while self._check_and_handle_signal():
port_info = {}
+ ancillary_port_info = {}
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
- ancillary_port_info = (ancillary_port_info if self.ancillary_brs
- else {})
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)
pass
self.assertTrue(all([x.called for x in reset_mocks]))
- def test_scan_ports_failure(self):
+ def _test_scan_ports_failure(self, scan_method_name):
with mock.patch.object(self.agent,
'check_ovs_status',
return_value=constants.OVS_RESTARTED),\
- mock.patch.object(self.agent, 'scan_ports',
+ mock.patch.object(self.agent, scan_method_name,
side_effect=TypeError('broken')),\
mock.patch.object(self.agent, '_agent_has_updates',
return_value=True),\
self.agent.state_rpc = mock.Mock()
self.agent.rpc_loop(polling_manager=mock.Mock())
+ def test_scan_ports_failure(self):
+ self._test_scan_ports_failure('scan_ports')
+
+ def test_scan_ancillary_ports_failure(self):
+ with mock.patch.object(self.agent, 'scan_ports'):
+ with mock.patch.object(self.agent, 'update_stale_ofport_rules'):
+ self.agent.ancillary_brs = mock.Mock()
+ self._test_scan_ports_failure('scan_ancillary_ports')
+
class TestOvsDvrNeutronAgentOFCtl(TestOvsDvrNeutronAgent,
ovs_test_base.OVSOFCtlTestBase):