port_info.get('removed') or
port_info.get('updated'))
- def check_ovs_restart(self):
+ def check_ovs_status(self):
# Check for the canary flow
canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE)
- return not canary_flow
+ if canary_flow == '':
+ LOG.warn(_LW("OVS is restarted. OVSNeutronAgent will reset "
+ "bridges and recover ports."))
+ return constants.OVS_RESTARTED
+ elif canary_flow is None:
+ LOG.warn(_LW("OVS is dead. OVSNeutronAgent will keep running "
+ "and checking OVS status periodically."))
+ return constants.OVS_DEAD
+ else:
+ # OVS is in normal status
+ return constants.OVS_NORMAL
+
+ def loop_count_and_wait(self, start_time, port_stats):
+ # sleep till end of polling interval
+ elapsed = time.time() - start_time
+ LOG.debug("Agent rpc_loop - iteration:%(iter_num)d "
+ "completed. Processed ports statistics: "
+ "%(port_stats)s. Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'port_stats': port_stats,
+ 'elapsed': elapsed})
+ if elapsed < self.polling_interval:
+ time.sleep(self.polling_interval - elapsed)
+ else:
+ LOG.debug("Loop iteration exceeded interval "
+ "(%(polling_interval)s vs. %(elapsed)s)!",
+ {'polling_interval': self.polling_interval,
+ 'elapsed': elapsed})
+ self.iter_num = self.iter_num + 1
def rpc_loop(self, polling_manager=None):
if not polling_manager:
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
- ovs_restarted = False
+ ovs_status = constants.OVS_NORMAL
while self.run_daemon_loop:
start = time.time()
port_stats = {'regular': {'added': 0,
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
- ovs_restarted = self.check_ovs_restart()
- if ovs_restarted:
+ ovs_status = self.check_ovs_status()
+ if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows_on_integ_tun_br()
+ elif ovs_status == constants.OVS_DEAD:
+ # Agent doesn't apply any operations when ovs is dead, to
+ # prevent unexpected failure or crash. Sleep and continue
+ # loop in which ovs status will be checked periodically.
+ self.loop_count_and_wait(start, port_stats)
+ continue
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_LI("Agent tunnel out of sync with plugin!"))
except Exception:
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
+ ovs_restarted = (ovs_status == constants.OVS_RESTARTED)
if self._agent_has_updates(polling_manager) or ovs_restarted:
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
self.updated_ports |= updated_ports_copy
sync = True
- # sleep till end of polling interval
- elapsed = (time.time() - start)
- LOG.debug("Agent rpc_loop - iteration:%(iter_num)d "
- "completed. Processed ports statistics: "
- "%(port_stats)s. Elapsed:%(elapsed).3f",
- {'iter_num': self.iter_num,
- 'port_stats': port_stats,
- 'elapsed': elapsed})
- if (elapsed < self.polling_interval):
- time.sleep(self.polling_interval - elapsed)
- else:
- LOG.debug("Loop iteration exceeded interval "
- "(%(polling_interval)s vs. %(elapsed)s)!",
- {'polling_interval': self.polling_interval,
- 'elapsed': elapsed})
- self.iter_num = self.iter_num + 1
+ self.loop_count_and_wait(start, port_stats)
def daemon_loop(self):
with polling.get_polling_manager(
import contextlib
import sys
+import time
import mock
import netaddr
mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
- def test_ovs_restart(self):
+ def test_ovs_status(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),
'removed': set([])}
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'process_network_ports'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
- 'check_ovs_restart'),
+ 'check_ovs_status'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'setup_integration_br'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
- 'setup_physical_bridges')
+ 'setup_physical_bridges'),
+ mock.patch.object(time, 'sleep')
) as (spawn_fn, log_exception, scan_ports, process_network_ports,
- check_ovs_restart, setup_int_br, setup_phys_br):
+ check_ovs_status, setup_int_br, setup_phys_br, time_sleep):
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
- check_ovs_restart.side_effect = [False, True]
+ check_ovs_status.side_effect = [constants.OVS_NORMAL,
+ constants.OVS_DEAD,
+ constants.OVS_RESTARTED]
- # This will exit after the second loop
+ # This will exit after the third loop
try:
self.agent.daemon_loop()
except Exception: