]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Ensure l3 agent receives notification about added router
authorOleg Bondarev <obondarev@mirantis.com>
Fri, 7 Aug 2015 12:44:29 +0000 (15:44 +0300)
committerOleg Bondarev <obondarev@mirantis.com>
Fri, 23 Oct 2015 09:08:31 +0000 (12:08 +0300)
Currently router_added (and other) notifications are sent
to agents with an RPC cast() method which does not ensure that
the message is actually delivered to the recipient.
If the message is lost (due to instability of messaging system
in some failover scenarios for example) neither server nor agent
will be aware of that and router will be "lost" till next agent
resync. Resync will only happen in case of errors on agent side
or restart.
The fix makes server use call() to notify agents about added routers
thus ensuring no routers will be lost.

This also unifies reschedule_router() method to avoid code duplication
between legacy and dvr agent schedulers.

Closes-Bug: #1482630
Related-Bug #1404743
Change-Id: Id08764ba837d8f47a28649d081a5876797fe369e

neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py
neutron/db/l3_agentschedulers_db.py
neutron/db/l3_dvrscheduler_db.py
neutron/tests/unit/db/test_agentschedulers_db.py
neutron/tests/unit/scheduler/test_l3_agent_scheduler.py

index 2bc6c2ccc79fc025e5a95f4e4928568016219d6a..eeaa81808cb618553a190f25271c1cc1a5c1acad 100644 (file)
@@ -38,13 +38,15 @@ class L3AgentNotifyAPI(object):
         target = oslo_messaging.Target(topic=topic, version='1.0')
         self.client = n_rpc.get_client(target)
 
-    def _notification_host(self, context, method, host, **kwargs):
+    def _notification_host(self, context, method, host, use_call=False,
+                           **kwargs):
         """Notify the agent that is hosting the router."""
         LOG.debug('Notify agent at %(host)s the message '
                   '%(method)s', {'host': host,
                                  'method': method})
         cctxt = self.client.prepare(server=host)
-        cctxt.cast(context, method, **kwargs)
+        rpc_method = cctxt.call if use_call else cctxt.cast
+        rpc_method(context, method, **kwargs)
 
     def _agent_notification(self, context, method, router_ids, operation,
                             shuffle_agents):
@@ -156,8 +158,12 @@ class L3AgentNotifyAPI(object):
                                 payload={'router_id': router_id})
 
     def router_added_to_agent(self, context, router_ids, host):
+        # need to use call here as we want to be sure agent received
+        # notification and router will not be "lost". However using call()
+        # itself is not a guarantee, calling code should handle exceptions and
+        # retry
         self._notification_host(context, 'router_added_to_agent', host,
-                                payload=router_ids)
+                                use_call=True, payload=router_ids)
 
     def routers_updated_on_host(self, context, router_ids, host):
         self._notification_host(context, 'routers_updated', host,
index 4ccde0bdaf5f35d4fe0fdaf84029055b7df151af..9051b8ce38a12fa82bbfa04a896e93707e24f1e7 100644 (file)
@@ -55,6 +55,10 @@ L3_AGENTS_SCHEDULER_OPTS = [
 
 cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
 
+# default messaging timeout is 60 sec, so 2 here is chosen to not block API
+# call for more than 2 minutes
+AGENT_NOTIFY_MAX_ATTEMPTS = 2
+
 
 class RouterL3AgentBinding(model_base.BASEV2):
     """Represents binding between neutron routers and L3 agents."""
@@ -270,7 +274,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
             query.delete()
 
     def reschedule_router(self, context, router_id, candidates=None):
-        """Reschedule router to a new l3 agent
+        """Reschedule router to (a) new l3 agent(s)
 
         Remove the router from the agent(s) currently hosting it and
         schedule it again
@@ -281,19 +285,45 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
             for agent in cur_agents:
                 self._unbind_router(context, router_id, agent['id'])
 
-            new_agent = self.schedule_router(context, router_id,
-                                             candidates=candidates)
-            if not new_agent:
+            self.schedule_router(context, router_id, candidates=candidates)
+            new_agents = self.list_l3_agents_hosting_router(
+                context, router_id)['agents']
+            if not new_agents:
                 raise l3agentscheduler.RouterReschedulingFailed(
                     router_id=router_id)
 
+        self._notify_agents_router_rescheduled(context, router_id,
+                                               cur_agents, new_agents)
+
+    def _notify_agents_router_rescheduled(self, context, router_id,
+                                          old_agents, new_agents):
         l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
-        if l3_notifier:
-            for agent in cur_agents:
-                l3_notifier.router_removed_from_agent(
-                    context, router_id, agent['host'])
-            l3_notifier.router_added_to_agent(
-                context, [router_id], new_agent.host)
+        if not l3_notifier:
+            return
+
+        old_hosts = [agent['host'] for agent in old_agents]
+        new_hosts = [agent['host'] for agent in new_agents]
+        for host in set(old_hosts) - set(new_hosts):
+            l3_notifier.router_removed_from_agent(
+                context, router_id, host)
+
+        for agent in new_agents:
+            # Need to make sure agents are notified or unschedule otherwise
+            for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS):
+                try:
+                    l3_notifier.router_added_to_agent(
+                        context, [router_id], agent['host'])
+                    break
+                except oslo_messaging.MessagingException:
+                    LOG.warning(_LW('Failed to notify L3 agent on host '
+                                    '%(host)s about added router. Attempt '
+                                    '%(attempt)d out of %(max_attempts)d'),
+                                {'host': agent['host'], 'attempt': attempt + 1,
+                                 'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS})
+            else:
+                self._unbind_router(context, router_id, agent['id'])
+                raise l3agentscheduler.RouterReschedulingFailed(
+                    router_id=router_id)
 
     def list_routers_on_l3_agent(self, context, agent_id):
         query = context.session.query(RouterL3AgentBinding.router_id)
index 425f47ec5c4d92be8243419b5d30464cc943990f..8c53096eac46eb30a7717657475ca4800ebbab0f 100644 (file)
@@ -382,42 +382,12 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
                 context, router_id, chosen_agent)
             return chosen_agent
 
-    def reschedule_router(self, context, router_id, candidates=None):
-        """Reschedule router to new l3 agents
-
-        Remove the router from l3 agents currently hosting it and
-        schedule it again
-        """
+    def _unbind_router(self, context, router_id, agent_id):
         router = self.get_router(context, router_id)
-        is_distributed = router.get('distributed', False)
-        if not is_distributed:
-            return super(L3_DVRsch_db_mixin, self).reschedule_router(
-                context, router_id, candidates)
-
-        old_agents = self.list_l3_agents_hosting_router(
-            context, router_id)['agents']
-        with context.session.begin(subtransactions=True):
-            for agent in old_agents:
-                self._unbind_router(context, router_id, agent['id'])
-                self.unbind_snat_servicenode(context, router_id)
-
-            self.schedule_router(context, router_id, candidates=candidates)
-            new_agents = self.list_l3_agents_hosting_router(
-                context, router_id)['agents']
-            if not new_agents:
-                raise l3agentscheduler.RouterReschedulingFailed(
-                    router_id=router_id)
-
-        l3_notifier = self.agent_notifiers.get(n_const.AGENT_TYPE_L3)
-        if l3_notifier:
-            old_hosts = [agent['host'] for agent in old_agents]
-            new_hosts = [agent['host'] for agent in new_agents]
-            for host in set(old_hosts) - set(new_hosts):
-                l3_notifier.router_removed_from_agent(
-                    context, router_id, host)
-            for host in new_hosts:
-                l3_notifier.router_added_to_agent(
-                    context, [router_id], host)
+        super(L3_DVRsch_db_mixin, self)._unbind_router(context, router_id,
+                                                       agent_id)
+        if router.get('distributed', False):
+            self.unbind_snat(context, router_id, agent_id)
 
     def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
                                                router_ids):
index 2a56241ab34ef61c162c892e9f6e3a3b0c9b08e9..0c67e084948b273c672687fc80e7174d55349a22 100644 (file)
@@ -229,6 +229,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
             service_plugins = {'l3_plugin_name': self.l3_plugin}
         else:
             service_plugins = None
+        mock.patch('neutron.common.rpc.get_client').start()
         super(OvsAgentSchedulerTestCaseBase, self).setUp(
             self.plugin_str, service_plugins=service_plugins)
         ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
@@ -773,6 +774,49 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
             self.assertIn(dvr_agent['host'],
                           [a['host'] for a in agents['agents']])
 
+    def test_router_reschedule_succeeded_after_failed_notification(self):
+        l3_plugin = (manager.NeutronManager.get_service_plugins()
+                     [service_constants.L3_ROUTER_NAT])
+        l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+        l3_rpc_cb = l3_rpc.L3RpcCallback()
+        self._register_agent_states()
+        with self.router() as router:
+            # schedule the router to host A
+            l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
+            with mock.patch.object(
+                    l3_notifier, 'router_added_to_agent') as notification_mock:
+                notification_mock.side_effect = [
+                    oslo_messaging.MessagingTimeout, None]
+                self._take_down_agent_and_run_reschedule(L3_HOSTA)
+                self.assertEqual(
+                    2, l3_notifier.router_added_to_agent.call_count)
+                # make sure router was rescheduled even when first attempt
+                # failed to notify l3 agent
+                l3_agents = self._list_l3_agents_hosting_router(
+                    router['router']['id'])['agents']
+                self.assertEqual(1, len(l3_agents))
+                self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
+
+    def test_router_reschedule_failed_notification_all_attempts(self):
+        l3_plugin = (manager.NeutronManager.get_service_plugins()
+                     [service_constants.L3_ROUTER_NAT])
+        l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+        l3_rpc_cb = l3_rpc.L3RpcCallback()
+        self._register_agent_states()
+        with self.router() as router:
+            # schedule the router to host A
+            l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
+            with mock.patch.object(
+                    l3_notifier, 'router_added_to_agent') as notification_mock:
+                notification_mock.side_effect = oslo_messaging.MessagingTimeout
+                self._take_down_agent_and_run_reschedule(L3_HOSTA)
+                self.assertEqual(
+                    l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS,
+                    l3_notifier.router_added_to_agent.call_count)
+                l3_agents = self._list_l3_agents_hosting_router(
+                    router['router']['id'])['agents']
+                self.assertEqual(0, len(l3_agents))
+
     def test_router_auto_schedule_with_invalid_router(self):
         with self.router() as router:
             l3_rpc_cb = l3_rpc.L3RpcCallback()
@@ -1494,7 +1538,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
             l3_notifier.client,
             'prepare',
             return_value=l3_notifier.client) as mock_prepare,\
-                mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
+                mock.patch.object(l3_notifier.client, 'call') as mock_call,\
                 self.router() as router1:
             self._register_agent_states()
             hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@@ -1503,7 +1547,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
                                          router1['router']['id'])
             routers = [router1['router']['id']]
             mock_prepare.assert_called_with(server='hosta')
-            mock_cast.assert_called_with(
+            mock_call.assert_called_with(
                 mock.ANY, 'router_added_to_agent', payload=routers)
             notifications = fake_notifier.NOTIFICATIONS
             expected_event_type = 'l3_agent.router.add'
@@ -1518,6 +1562,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
             'prepare',
             return_value=l3_notifier.client) as mock_prepare,\
                 mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
+                mock.patch.object(l3_notifier.client, 'call'),\
                 self.router() as router1:
             self._register_agent_states()
             hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
index ccf0012bded795807956274318bce7651f4e5aed..f0d7f1a6b8af98417a890d3b506e2a9db4324067 100644 (file)
@@ -1646,6 +1646,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase,
         super(L3HATestCaseMixin, self).setUp()
 
         self.adminContext = n_context.get_admin_context()
+        mock.patch('neutron.common.rpc.get_client').start()
         self.plugin = L3HAPlugin()
 
         self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')