return port_stats
def cleanup_stale_flows(self):
- if self.iter_num == 0:
- bridges = [self.int_br]
- if self.enable_tunneling:
- bridges.append(self.tun_br)
- for bridge in bridges:
- LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
- bridge.cleanup_flows()
+ bridges = [self.int_br]
+ if self.enable_tunneling:
+ bridges.append(self.tun_br)
+ for bridge in bridges:
+ LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
+ bridge.cleanup_flows()
def rpc_loop(self, polling_manager=None):
if not polling_manager:
tunnel_sync = True
ovs_restarted = False
consecutive_resyncs = 0
+ need_clean_stale_flow = True
while self._check_and_handle_signal():
port_info = {}
ancillary_port_info = {}
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info,
ovs_restarted)
- self.cleanup_stale_flows()
+ if not sync and need_clean_stale_flow:
+ self.cleanup_stale_flows()
+ need_clean_stale_flow = False
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
self._test_ovs_status(constants.OVS_NORMAL,
constants.OVS_RESTARTED)
+ def test_rpc_loop_fail_to_process_network_ports_keep_flows(self):
+ with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
+ mock.patch.object(async_process.AsyncProcess, "start"),\
+ mock.patch.object(async_process.AsyncProcess, "stop"),\
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ 'process_network_ports') as process_network_ports,\
+ mock.patch.object(self.mod_agent.OVSNeutronAgent,
+ 'check_ovs_status') as check_ovs_status,\
+ mock.patch.object(time, 'sleep'),\
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ 'update_stale_ofport_rules') as update_stale, \
+ mock.patch.object(self.mod_agent.OVSNeutronAgent,
+ 'cleanup_stale_flows') as cleanup,\
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ '_check_and_handle_signal') as check_and_handle_signal:
+ process_network_ports.return_value = True
+ check_ovs_status.return_value = constants.OVS_NORMAL
+ check_and_handle_signal.side_effect = [True, False]
+ self.agent.daemon_loop()
+ self.assertTrue(update_stale.called)
+ self.assertFalse(cleanup.called)
+
def test_set_rpc_timeout(self):
self.agent._handle_sigterm(None, None)
for rpc_client in (self.agent.plugin_rpc.client,
class TestOvsNeutronAgentOFCtl(TestOvsNeutronAgent,
ovs_test_base.OVSOFCtlTestBase):
- def test_cleanup_stale_flows_iter_0(self):
+ def test_cleanup_stale_flows(self):
with mock.patch.object(self.agent.int_br, 'agent_uuid_stamp',
new=1234),\
mock.patch.object(self.agent.int_br,
'cookie=0x2345, duration=50.125s, table=2, priority=0',
'cookie=0x4d2, duration=52.112s, table=3, actions=drop',
]
+ self.agent.iter_num = 3
self.agent.cleanup_stale_flows()
expected = [
mock.call(cookie='0x4321/-1', table='2'),
class TestOvsNeutronAgentRyu(TestOvsNeutronAgent,
ovs_test_base.OVSRyuTestBase):
- def test_cleanup_stale_flows_iter_0(self):
+ def test_cleanup_stale_flows(self):
uint64_max = (1 << 64) - 1
with mock.patch.object(self.agent.int_br, 'agent_uuid_stamp',
new=1234),\
mock.Mock(cookie=9029, table_id=2),
mock.Mock(cookie=1234, table_id=3),
]
+ self.agent.iter_num = 3
self.agent.cleanup_stale_flows()
expected = [mock.call(cookie=17185,
cookie_mask=uint64_max),