]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Resync L3, DHCP and OVS/LB agents upon revival
authorEugene Nikanorov <enikanorov@mirantis.com>
Mon, 12 Oct 2015 09:59:01 +0000 (13:59 +0400)
committerEugene Nikanorov <enikanorov@mirantis.com>
Wed, 18 Nov 2015 10:42:35 +0000 (14:42 +0400)
In big and busy clusters there could be a condition when
rabbitmq clustering mechanism synchronizes queues and during
this period agents connected to that instance of rabbitmq
can't communicate with the server and server considers them
dead moving resources away. After agent become active again,
it needs to cleanup state entries and synchronize its state
with neutron-server.
The solution is to make agents aware of their state from
neutron-server point of view. This is done by changing state
reports from cast to call that would return agent's status.
When agent was dead and becomes alive, it would receive special
AGENT_REVIVED status indicating that it should refresh its
local data which it would not do otherwise.

Closes-Bug: #1505166
Change-Id: Id28248f4f75821fbacf46e2c44e40f27f59172a9

neutron/agent/dhcp/agent.py
neutron/agent/l3/agent.py
neutron/common/constants.py
neutron/db/agents_db.py
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/agent/dhcp/test_agent.py
neutron/tests/unit/agent/l3/test_agent.py
neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py

index dead17f489160aaf8b86293e41258546780b21ce..5fa5a78494849de575ccb7aa76467db831c494c2 100644 (file)
@@ -551,7 +551,6 @@ class DhcpAgentWithStateReport(DhcpAgent):
             'start_flag': True,
             'agent_type': constants.AGENT_TYPE_DHCP}
         report_interval = self.conf.AGENT.report_interval
-        self.use_call = True
         if report_interval:
             self.heartbeat = loopingcall.FixedIntervalLoopingCall(
                 self._report_state)
@@ -562,8 +561,12 @@ class DhcpAgentWithStateReport(DhcpAgent):
             self.agent_state.get('configurations').update(
                 self.cache.get_state())
             ctx = context.get_admin_context_without_session()
-            self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
-            self.use_call = False
+            agent_status = self.state_rpc.report_state(
+                ctx, self.agent_state, True)
+            if agent_status == constants.AGENT_REVIVED:
+                LOG.info(_LI("Agent has just been revived. "
+                             "Scheduling full sync"))
+                self.schedule_resync("Agent has just been revived")
         except AttributeError:
             # This means the server does not support report_state
             LOG.warn(_LW("Neutron server does not support state report."
index 8191c5a814b176a68d2b15cf43ac721f8d1faa3b..95d7e2a05a5ec227a916ec4b7bac2f1837fbe3c7 100644 (file)
@@ -610,7 +610,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
 class L3NATAgentWithStateReport(L3NATAgent):
 
     def __init__(self, host, conf=None):
-        self.use_call = True
         super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
         self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
         self.agent_state = {
@@ -657,10 +656,14 @@ class L3NATAgentWithStateReport(L3NATAgent):
         configurations['interfaces'] = num_interfaces
         configurations['floating_ips'] = num_floating_ips
         try:
-            self.state_rpc.report_state(self.context, self.agent_state,
-                                        self.use_call)
+            agent_status = self.state_rpc.report_state(self.context,
+                                                       self.agent_state,
+                                                       True)
+            if agent_status == l3_constants.AGENT_REVIVED:
+                LOG.info(_LI('Agent has just been revived. '
+                             'Doing a full sync.'))
+                self.fullsync = True
             self.agent_state.pop('start_flag', None)
-            self.use_call = False
         except AttributeError:
             # This means the server does not support report_state
             LOG.warn(_LW("Neutron server does not support state report."
index a5f34ee9d2d864e18fedd54f1993c21e38e7ed06..5ac94c8b4c1aadf2b67c6f58fa025bc07c068025 100644 (file)
@@ -200,3 +200,11 @@ ROUTER_MARK_MASK = "0xffff"
 
 # Time format
 ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
+
+# Agent states as detected by server, used to reply on agent's state report
+# agent has just been registered
+AGENT_NEW = 'new'
+# agent is alive
+AGENT_ALIVE = 'alive'
+# agent has just returned to alive after being dead
+AGENT_REVIVED = 'revived'
index 2a9aeb734530e44bdec5c36f5308922229f7d71e..69ecb13180e1cad8a0a6687f0848ade419e60bb8 100644 (file)
@@ -299,6 +299,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
                       'delta': delta})
 
     def _create_or_update_agent(self, context, agent_state):
+        """Registers new agent in the database or updates existing.
+
+        Returns agent status from server point of view: alive, new or revived.
+        It could be used by agent to do some sync with the server if needed.
+        """
+        status = constants.AGENT_ALIVE
         with context.session.begin(subtransactions=True):
             res_keys = ['agent_type', 'binary', 'host', 'topic']
             res = dict((k, agent_state[k]) for k in res_keys)
@@ -311,6 +317,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
             try:
                 agent_db = self._get_agent_by_type_and_host(
                     context, agent_state['agent_type'], agent_state['host'])
+                if not agent_db.is_active:
+                    status = constants.AGENT_REVIVED
                 res['heartbeat_timestamp'] = current_time
                 if agent_state.get('start_flag'):
                     res['started_at'] = current_time
@@ -327,7 +335,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
                 greenthread.sleep(0)
                 context.session.add(agent_db)
                 self._log_heartbeat(agent_state, agent_db, configurations_dict)
+                status = constants.AGENT_NEW
             greenthread.sleep(0)
+        return status
 
     def create_or_update_agent(self, context, agent):
         """Create or update agent according to report."""
@@ -367,7 +377,10 @@ class AgentExtRpcCallback(object):
         self.plugin = plugin
 
     def report_state(self, context, **kwargs):
-        """Report state from agent to server."""
+        """Report state from agent to server.
+
+        Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE
+        """
         time = kwargs['time']
         time = timeutils.parse_strtime(time)
         agent_state = kwargs['agent_state']['agent_state']
@@ -382,7 +395,7 @@ class AgentExtRpcCallback(object):
             return
         if not self.plugin:
             self.plugin = manager.NeutronManager.get_plugin()
-        self.plugin.create_or_update_agent(context, agent_state)
+        return self.plugin.create_or_update_agent(context, agent_state)
 
     def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
         """Checks if the server and the agent times are in sync.
index 89f819be2af3cc8aa6d2ea96810092eca020f3c7..6a58a28cc07349cbc510f46ea573ec0f8f28ab35 100644 (file)
@@ -876,6 +876,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
 
         # stores received port_updates for processing by the main loop
         self.updated_devices = set()
+        # flag to do a sync after revival
+        self.fullsync = False
         self.context = context.get_admin_context_without_session()
         self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
         self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
@@ -897,8 +899,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
         try:
             devices = len(self.br_mgr.get_tap_devices())
             self.agent_state.get('configurations')['devices'] = devices
-            self.state_rpc.report_state(self.context,
-                                        self.agent_state)
+            agent_status = self.state_rpc.report_state(self.context,
+                                                       self.agent_state,
+                                                       True)
+            if agent_status == constants.AGENT_REVIVED:
+                LOG.info(_LI('Agent has just been revived. '
+                             'Doing a full sync.'))
+                self.fullsync = True
             self.agent_state.pop('start_flag', None)
         except Exception:
             LOG.exception(_LE("Failed reporting state!"))
@@ -1101,11 +1108,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
         while True:
             start = time.time()
 
-            device_info = self.scan_devices(previous=device_info, sync=sync)
+            if self.fullsync:
+                sync = True
+                self.fullsync = False
 
             if sync:
                 LOG.info(_LI("Agent out of sync with plugin!"))
-                sync = False
+
+            device_info = self.scan_devices(previous=device_info, sync=sync)
+            sync = False
 
             if (self._device_info_has_changes(device_info)
                 or self.sg_agent.firewall_refresh_needed()):
index 55846dae07fb2dda40bc3fc9a78ae9b571153cfa..5e00c4839fdb26352c6ff9fe61abfc07528c0ab1 100644 (file)
@@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         super(OVSNeutronAgent, self).__init__()
         self.conf = conf or cfg.CONF
 
+        self.fullsync = True
         # init bridge classes with configured datapath type.
         self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
             functools.partial(bridge_classes[b],
@@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.veth_mtu = veth_mtu
         self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
                                                      p_const.MAX_VLAN_TAG))
-        self.use_call = True
         self.tunnel_types = tunnel_types or []
         self.l2_pop = l2_population
         # TODO(ethuleau): Change ARP responder so it's not dependent on the
@@ -325,9 +325,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             self.dvr_agent.in_distributed_mode())
 
         try:
-            self.state_rpc.report_state(self.context,
-                                        self.agent_state,
-                                        self.use_call)
+            agent_status = self.state_rpc.report_state(self.context,
+                                                       self.agent_state,
+                                                       True)
+            if agent_status == n_const.AGENT_REVIVED:
+                LOG.info(_LI('Agent has just been revived. '
+                             'Doing a full sync.'))
+                self.fullsync = True
             self.use_call = False
             self.agent_state.pop('start_flag', None)
         except Exception:
@@ -1646,7 +1650,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         if not polling_manager:
             polling_manager = polling.get_polling_manager(
                 minimize_polling=False)
-
         sync = True
         ports = set()
         updated_ports_copy = set()
@@ -1655,6 +1658,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         ovs_restarted = False
         consecutive_resyncs = 0
         while self._check_and_handle_signal():
+            if self.fullsync:
+                LOG.info(_LI("rpc_loop doing a full sync."))
+                sync = True
+                self.fullsync = False
             port_info = {}
             ancillary_port_info = {}
             start = time.time()
index fb7355f149ed253304811d5b6a13f44fe63468a7..51bab94af6a2834f5922f6b7257a79cc81f04d3e 100644 (file)
@@ -421,6 +421,20 @@ class TestDhcpAgent(base.BaseTestCase):
             dhcp.periodic_resync()
             spawn.assert_called_once_with(dhcp._periodic_resync_helper)
 
+    def test_report_state_revival_logic(self):
+        dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
+        with mock.patch.object(dhcp.state_rpc,
+                               'report_state') as report_state,\
+            mock.patch.object(dhcp, "run"):
+            report_state.return_value = const.AGENT_ALIVE
+            dhcp._report_state()
+            self.assertEqual(dhcp.needs_resync_reasons, {})
+
+            report_state.return_value = const.AGENT_REVIVED
+            dhcp._report_state()
+            self.assertEqual(dhcp.needs_resync_reasons[None],
+                             ['Agent has just been revived'])
+
     def test_periodic_resync_helper(self):
         with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
             dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
index a3649d9ee6c393b9c2b223cb02507456dd4fb13e..ed94606db1c01975e7b719e914661ef0f3af2b1e 100644 (file)
@@ -197,13 +197,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
                                                        conf=self.conf)
 
             self.assertTrue(agent.agent_state['start_flag'])
-            use_call_arg = agent.use_call
             agent.after_start()
             report_state.assert_called_once_with(agent.context,
                                                  agent.agent_state,
-                                                 use_call_arg)
+                                                 True)
             self.assertIsNone(agent.agent_state.get('start_flag'))
 
+    def test_report_state_revival_logic(self):
+        with mock.patch.object(agent_rpc.PluginReportStateAPI,
+                               'report_state') as report_state:
+            agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME,
+                                                       conf=self.conf)
+            report_state.return_value = l3_constants.AGENT_REVIVED
+            agent._report_state()
+            self.assertTrue(agent.fullsync)
+
+            agent.fullsync = False
+            report_state.return_value = l3_constants.AGENT_ALIVE
+            agent._report_state()
+            self.assertFalse(agent.fullsync)
+
     def test_periodic_sync_routers_task_call_clean_stale_namespaces(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         self.plugin_api.get_routers.return_value = []
index d5eea954ee00ed1437ce037606dea084aa4973ac..b75a53337bd20416a6b0a03ddab8c5834fde9996 100644 (file)
@@ -365,6 +365,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
             self.agent.stop()
             self.assertFalse(mock_set_rpc.called)
 
+    def test_report_state_revived(self):
+        with mock.patch.object(self.agent.state_rpc,
+                               "report_state") as report_st:
+            report_st.return_value = constants.AGENT_REVIVED
+            self.agent._report_state()
+            self.assertTrue(self.agent.fullsync)
+
 
 class TestLinuxBridgeManager(base.BaseTestCase):
     def setUp(self):
index 654b429430e2968322a615a8a8e413f07be8e0cf..f7161e49049d22e5251a23823d9fde90ae7e1db3 100644 (file)
@@ -145,9 +145,6 @@ class TestOvsNeutronAgent(object):
                     return_value=[]):
             self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
                                                         **kwargs)
-            # set back to true because initial report state will succeed due
-            # to mocked out RPC calls
-            self.agent.use_call = True
             self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
         self.agent.sg_agent = mock.Mock()
 
@@ -694,14 +691,13 @@ class TestOvsNeutronAgent(object):
             report_st.assert_called_with(self.agent.context,
                                          self.agent.agent_state, True)
             self.assertNotIn("start_flag", self.agent.agent_state)
-            self.assertFalse(self.agent.use_call)
             self.assertEqual(
                 self.agent.agent_state["configurations"]["devices"],
                 self.agent.int_br_device_count
             )
             self.agent._report_state()
             report_st.assert_called_with(self.agent.context,
-                                         self.agent.agent_state, False)
+                                         self.agent.agent_state, True)
 
     def test_report_state_fail(self):
         with mock.patch.object(self.agent.state_rpc,
@@ -714,6 +710,13 @@ class TestOvsNeutronAgent(object):
             report_st.assert_called_with(self.agent.context,
                                          self.agent.agent_state, True)
 
+    def test_report_state_revived(self):
+        with mock.patch.object(self.agent.state_rpc,
+                               "report_state") as report_st:
+            report_st.return_value = n_const.AGENT_REVIVED
+            self.agent._report_state()
+            self.assertTrue(self.agent.fullsync)
+
     def test_port_update(self):
         port = {"id": TEST_PORT_ID1,
                 "network_id": TEST_NETWORK_ID1,
@@ -1733,9 +1736,6 @@ class TestOvsDvrNeutronAgent(object):
                     return_value=[]):
             self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
                                                         **kwargs)
-            # set back to true because initial report state will succeed due
-            # to mocked out RPC calls
-            self.agent.use_call = True
             self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
         self.agent.sg_agent = mock.Mock()