LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"),
{'added': port_added, 'removed': port_removed})
- try:
- self.call(context,
- self.make_msg('update_ports',
- topic=topics.AGENT,
- agent_id=agent_id,
- datapath_id=datapath_id,
- port_added=port_added,
- port_removed=port_removed))
- except Exception:
- LOG.warn(_("update_ports() failed."))
- return
+ self.call(context,
+ self.make_msg('update_ports',
+ topic=topics.AGENT,
+ agent_id=agent_id,
+ datapath_id=datapath_id,
+ port_added=port_added,
+ port_removed=port_removed))
class NECAgentRpcCallback(object):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval
self.cur_ports = []
+ self.need_sync = True
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
if port_removed:
self.sg_agent.remove_devices_filter(port_removed)
- def daemon_loop(self):
- """Main processing loop for NEC Plugin Agent."""
- while True:
+ def loop_handler(self):
+ try:
+ # self.cur_ports will be kept until loop_handler succeeds.
+ cur_ports = [] if self.need_sync else self.cur_ports
new_ports = []
port_added = []
for vif_port in self.int_br.get_vif_ports():
port_id = vif_port.vif_id
new_ports.append(port_id)
- if port_id not in self.cur_ports:
+ if port_id not in cur_ports:
port_info = self._vif_port_to_port_info(vif_port)
port_added.append(port_info)
port_removed = []
- for port_id in self.cur_ports:
+ for port_id in cur_ports:
if port_id not in new_ports:
port_removed.append(port_id)
LOG.debug(_("No port changed."))
self.cur_ports = new_ports
+ self.need_sync = False
+ except Exception:
+ LOG.exception(_("Error in agent event loop"))
+ self.need_sync = True
+
+ def daemon_loop(self):
+ """Main processing loop for NEC Plugin Agent."""
+ while True:
+ self.loop_handler()
time.sleep(self.polling_interval)
class TestNecAgent(TestNecAgentBase):
+ def _setup_mock(self):
+ vif_ports = [ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
+ self.agent.int_br),
+ ovs_lib.VifPort('port2', '2', 'id-2', 'mac-2',
+ self.agent.int_br)]
+ self.get_vif_ports = mock.patch.object(
+ ovs_lib.OVSBridge, 'get_vif_ports',
+ return_value=vif_ports).start()
+ self.update_ports = mock.patch.object(
+ nec_neutron_agent.NECPluginApi, 'update_ports').start()
+ self.prepare_devices_filter = mock.patch.object(
+ self.agent.sg_agent, 'prepare_devices_filter').start()
+ self.remove_devices_filter = mock.patch.object(
+ self.agent.sg_agent, 'remove_devices_filter').start()
+
+ def _test_single_loop(self, with_exc=False, need_sync=False):
+ self.agent.cur_ports = ['id-0', 'id-1']
+ self.agent.need_sync = need_sync
+
+ self.agent.loop_handler()
+ if with_exc:
+ self.assertEqual(self.agent.cur_ports, ['id-0', 'id-1'])
+ self.assertTrue(self.agent.need_sync)
+ else:
+ self.assertEqual(self.agent.cur_ports, ['id-1', 'id-2'])
+ self.assertFalse(self.agent.need_sync)
+
+ def test_single_loop_normal(self):
+ self._setup_mock()
+ self._test_single_loop()
+ agent_id = 'nec-q-agent.dummy-host'
+ self.update_ports.assert_called_once_with(
+ mock.ANY, agent_id, OVS_DPID_0X,
+ [{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
+ ['id-0'])
+ self.prepare_devices_filter.assert_called_once_with(['id-2'])
+ self.remove_devices_filter.assert_called_once_with(['id-0'])
+
+ def test_single_loop_need_sync(self):
+ self._setup_mock()
+ self._test_single_loop(need_sync=True)
+ agent_id = 'nec-q-agent.dummy-host'
+ self.update_ports.assert_called_once_with(
+ mock.ANY, agent_id, OVS_DPID_0X,
+ [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
+ {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
+ [])
+ self.prepare_devices_filter.assert_called_once_with(['id-1', 'id-2'])
+ self.assertFalse(self.remove_devices_filter.call_count)
+
+ def test_single_loop_with_sg_exception_remove(self):
+ self._setup_mock()
+ self.update_ports.side_effect = Exception()
+ self._test_single_loop(with_exc=True)
+
+ def test_single_loop_with_sg_exception_prepare(self):
+ self._setup_mock()
+ self.prepare_devices_filter.side_effect = Exception()
+ self._test_single_loop(with_exc=True)
+
+ def test_single_loop_with_update_ports_exception(self):
+ self._setup_mock()
+ self.remove_devices_filter.side_effect = Exception()
+ self._test_single_loop(with_exc=True)
+
def test_daemon_loop(self):
def state_check(index):
def test_plugin_api(self):
self._test_plugin_api()
- def test_plugin_api_fail(self):
- self._test_plugin_api(expected_failure=True)
-
class TestNecAgentMain(base.BaseTestCase):
def test_main(self):