'start_flag': True,
'agent_type': constants.AGENT_TYPE_DHCP}
report_interval = self.conf.AGENT.report_interval
- self.use_call = True
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.agent_state.get('configurations').update(
self.cache.get_state())
ctx = context.get_admin_context_without_session()
- self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
- self.use_call = False
+ agent_status = self.state_rpc.report_state(
+ ctx, self.agent_state, True)
+ if agent_status == constants.AGENT_REVIVED:
+ LOG.info(_LI("Agent has just been revived. "
+ "Scheduling full sync"))
+ self.schedule_resync("Agent has just been revived")
except AttributeError:
# This means the server does not support report_state
LOG.warn(_LW("Neutron server does not support state report."
class L3NATAgentWithStateReport(L3NATAgent):
def __init__(self, host, conf=None):
- self.use_call = True
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.agent_state = {
configurations['interfaces'] = num_interfaces
configurations['floating_ips'] = num_floating_ips
try:
- self.state_rpc.report_state(self.context, self.agent_state,
- self.use_call)
+ agent_status = self.state_rpc.report_state(self.context,
+ self.agent_state,
+ True)
+ if agent_status == l3_constants.AGENT_REVIVED:
+ LOG.info(_LI('Agent has just been revived. '
+ 'Doing a full sync.'))
+ self.fullsync = True
self.agent_state.pop('start_flag', None)
- self.use_call = False
except AttributeError:
# This means the server does not support report_state
LOG.warn(_LW("Neutron server does not support state report."
# Time format
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
+
+# Agent states as detected by server, used to reply on agent's state report
+# agent has just been registered
+AGENT_NEW = 'new'
+# agent is alive
+AGENT_ALIVE = 'alive'
+# agent has just returned to alive after being dead
+AGENT_REVIVED = 'revived'
'delta': delta})
def _create_or_update_agent(self, context, agent_state):
+ """Registers new agent in the database or updates existing.
+
+ Returns agent status from server point of view: alive, new or revived.
+ It could be used by agent to do some sync with the server if needed.
+ """
+ status = constants.AGENT_ALIVE
with context.session.begin(subtransactions=True):
res_keys = ['agent_type', 'binary', 'host', 'topic']
res = dict((k, agent_state[k]) for k in res_keys)
try:
agent_db = self._get_agent_by_type_and_host(
context, agent_state['agent_type'], agent_state['host'])
+ if not agent_db.is_active:
+ status = constants.AGENT_REVIVED
res['heartbeat_timestamp'] = current_time
if agent_state.get('start_flag'):
res['started_at'] = current_time
greenthread.sleep(0)
context.session.add(agent_db)
self._log_heartbeat(agent_state, agent_db, configurations_dict)
+ status = constants.AGENT_NEW
greenthread.sleep(0)
+ return status
def create_or_update_agent(self, context, agent):
"""Create or update agent according to report."""
self.plugin = plugin
def report_state(self, context, **kwargs):
- """Report state from agent to server."""
+ """Report state from agent to server.
+
+ Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE
+ """
time = kwargs['time']
time = timeutils.parse_strtime(time)
agent_state = kwargs['agent_state']['agent_state']
return
if not self.plugin:
self.plugin = manager.NeutronManager.get_plugin()
- self.plugin.create_or_update_agent(context, agent_state)
+ return self.plugin.create_or_update_agent(context, agent_state)
def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
"""Checks if the server and the agent times are in sync.
# stores received port_updates for processing by the main loop
self.updated_devices = set()
+ # flag to do a sync after revival
+ self.fullsync = False
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
try:
devices = len(self.br_mgr.get_tap_devices())
self.agent_state.get('configurations')['devices'] = devices
- self.state_rpc.report_state(self.context,
- self.agent_state)
+ agent_status = self.state_rpc.report_state(self.context,
+ self.agent_state,
+ True)
+ if agent_status == constants.AGENT_REVIVED:
+ LOG.info(_LI('Agent has just been revived. '
+ 'Doing a full sync.'))
+ self.fullsync = True
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
while True:
start = time.time()
- device_info = self.scan_devices(previous=device_info, sync=sync)
+ if self.fullsync:
+ sync = True
+ self.fullsync = False
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
- sync = False
+
+ device_info = self.scan_devices(previous=device_info, sync=sync)
+ sync = False
if (self._device_info_has_changes(device_info)
or self.sg_agent.firewall_refresh_needed()):
super(OVSNeutronAgent, self).__init__()
self.conf = conf or cfg.CONF
+ self.fullsync = True
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
self.veth_mtu = veth_mtu
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
p_const.MAX_VLAN_TAG))
- self.use_call = True
self.tunnel_types = tunnel_types or []
self.l2_pop = l2_population
# TODO(ethuleau): Change ARP responder so it's not dependent on the
self.dvr_agent.in_distributed_mode())
try:
- self.state_rpc.report_state(self.context,
- self.agent_state,
- self.use_call)
+ agent_status = self.state_rpc.report_state(self.context,
+ self.agent_state,
+ True)
+ if agent_status == n_const.AGENT_REVIVED:
+ LOG.info(_LI('Agent has just been revived. '
+ 'Doing a full sync.'))
+ self.fullsync = True
self.use_call = False
self.agent_state.pop('start_flag', None)
except Exception:
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
-
sync = True
ports = set()
updated_ports_copy = set()
ovs_restarted = False
consecutive_resyncs = 0
while self._check_and_handle_signal():
+ if self.fullsync:
+ LOG.info(_LI("rpc_loop doing a full sync."))
+ sync = True
+ self.fullsync = False
port_info = {}
ancillary_port_info = {}
start = time.time()
dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
+ def test_report_state_revival_logic(self):
+ dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
+ with mock.patch.object(dhcp.state_rpc,
+ 'report_state') as report_state,\
+ mock.patch.object(dhcp, "run"):
+ report_state.return_value = const.AGENT_ALIVE
+ dhcp._report_state()
+ self.assertEqual(dhcp.needs_resync_reasons, {})
+
+ report_state.return_value = const.AGENT_REVIVED
+ dhcp._report_state()
+ self.assertEqual(dhcp.needs_resync_reasons[None],
+ ['Agent has just been revived'])
+
def test_periodic_resync_helper(self):
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
conf=self.conf)
self.assertTrue(agent.agent_state['start_flag'])
- use_call_arg = agent.use_call
agent.after_start()
report_state.assert_called_once_with(agent.context,
agent.agent_state,
- use_call_arg)
+ True)
self.assertIsNone(agent.agent_state.get('start_flag'))
+ def test_report_state_revival_logic(self):
+ with mock.patch.object(agent_rpc.PluginReportStateAPI,
+ 'report_state') as report_state:
+ agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME,
+ conf=self.conf)
+ report_state.return_value = l3_constants.AGENT_REVIVED
+ agent._report_state()
+ self.assertTrue(agent.fullsync)
+
+ agent.fullsync = False
+ report_state.return_value = l3_constants.AGENT_ALIVE
+ agent._report_state()
+ self.assertFalse(agent.fullsync)
+
def test_periodic_sync_routers_task_call_clean_stale_namespaces(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.plugin_api.get_routers.return_value = []
self.agent.stop()
self.assertFalse(mock_set_rpc.called)
+ def test_report_state_revived(self):
+ with mock.patch.object(self.agent.state_rpc,
+ "report_state") as report_st:
+ report_st.return_value = constants.AGENT_REVIVED
+ self.agent._report_state()
+ self.assertTrue(self.agent.fullsync)
+
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):
return_value=[]):
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
**kwargs)
- # set back to true because initial report state will succeed due
- # to mocked out RPC calls
- self.agent.use_call = True
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
self.agent.sg_agent = mock.Mock()
report_st.assert_called_with(self.agent.context,
self.agent.agent_state, True)
self.assertNotIn("start_flag", self.agent.agent_state)
- self.assertFalse(self.agent.use_call)
self.assertEqual(
self.agent.agent_state["configurations"]["devices"],
self.agent.int_br_device_count
)
self.agent._report_state()
report_st.assert_called_with(self.agent.context,
- self.agent.agent_state, False)
+ self.agent.agent_state, True)
def test_report_state_fail(self):
with mock.patch.object(self.agent.state_rpc,
report_st.assert_called_with(self.agent.context,
self.agent.agent_state, True)
+ def test_report_state_revived(self):
+ with mock.patch.object(self.agent.state_rpc,
+ "report_state") as report_st:
+ report_st.return_value = n_const.AGENT_REVIVED
+ self.agent._report_state()
+ self.assertTrue(self.agent.fullsync)
+
def test_port_update(self):
port = {"id": TEST_PORT_ID1,
"network_id": TEST_NETWORK_ID1,
return_value=[]):
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
**kwargs)
- # set back to true because initial report state will succeed due
- # to mocked out RPC calls
- self.agent.use_call = True
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
self.agent.sg_agent = mock.Mock()