From: Oleg Bondarev Date: Fri, 7 Aug 2015 12:44:29 +0000 (+0300) Subject: Ensure l3 agent receives notification about added router X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=30b121dfa4d8542665b2209d21ece805d8957aa5;p=openstack-build%2Fneutron-build.git Ensure l3 agent receives notification about added router Currently router_added (and other) notifications are sent to agents with an RPC cast() method which does not ensure that the message is actually delivered to the recipient. If the message is lost (due to instability of messaging system in some failover scenarios for example) neither server nor agent will be aware of that and router will be "lost" till next agent resync. Resync will only happen in case of errors on agent side or restart. The fix makes server use call() to notify agents about added routers thus ensuring no routers will be lost. This also unifies reschedule_router() method to avoid code duplication between legacy and dvr agent schedulers. Closes-Bug: #1482630 Related-Bug #1404743 Change-Id: Id08764ba837d8f47a28649d081a5876797fe369e --- diff --git a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py index 2bc6c2ccc..eeaa81808 100644 --- a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -38,13 +38,15 @@ class L3AgentNotifyAPI(object): target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) - def _notification_host(self, context, method, host, **kwargs): + def _notification_host(self, context, method, host, use_call=False, + **kwargs): """Notify the agent that is hosting the router.""" LOG.debug('Notify agent at %(host)s the message ' '%(method)s', {'host': host, 'method': method}) cctxt = self.client.prepare(server=host) - cctxt.cast(context, method, **kwargs) + rpc_method = cctxt.call if use_call else cctxt.cast + rpc_method(context, method, **kwargs) def _agent_notification(self, context, method, router_ids, operation, shuffle_agents): @@ -156,8 +158,12 @@ class L3AgentNotifyAPI(object): payload={'router_id': router_id}) def router_added_to_agent(self, context, router_ids, host): + # need to use call here as we want to be sure agent received + # notification and router will not be "lost". However using call() + # itself is not a guarantee, calling code should handle exceptions and + # retry self._notification_host(context, 'router_added_to_agent', host, - payload=router_ids) + use_call=True, payload=router_ids) def routers_updated_on_host(self, context, router_ids, host): self._notification_host(context, 'routers_updated', host, diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index 4ccde0bda..9051b8ce3 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -55,6 +55,10 @@ L3_AGENTS_SCHEDULER_OPTS = [ cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS) +# default messaging timeout is 60 sec, so 2 here is chosen to not block API +# call for more than 2 minutes +AGENT_NOTIFY_MAX_ATTEMPTS = 2 + class RouterL3AgentBinding(model_base.BASEV2): """Represents binding between neutron routers and L3 agents.""" @@ -270,7 +274,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, query.delete() def reschedule_router(self, context, router_id, candidates=None): - """Reschedule router to a new l3 agent + """Reschedule router to (a) new l3 agent(s) Remove the router from the agent(s) currently hosting it and schedule it again @@ -281,19 +285,45 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, for agent in cur_agents: self._unbind_router(context, router_id, agent['id']) - new_agent = self.schedule_router(context, router_id, - candidates=candidates) - if not new_agent: + self.schedule_router(context, router_id, candidates=candidates) + new_agents = self.list_l3_agents_hosting_router( + context, router_id)['agents'] + if not new_agents: raise l3agentscheduler.RouterReschedulingFailed( router_id=router_id) + self._notify_agents_router_rescheduled(context, router_id, + cur_agents, new_agents) + + def _notify_agents_router_rescheduled(self, context, router_id, + old_agents, new_agents): l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3) - if l3_notifier: - for agent in cur_agents: - l3_notifier.router_removed_from_agent( - context, router_id, agent['host']) - l3_notifier.router_added_to_agent( - context, [router_id], new_agent.host) + if not l3_notifier: + return + + old_hosts = [agent['host'] for agent in old_agents] + new_hosts = [agent['host'] for agent in new_agents] + for host in set(old_hosts) - set(new_hosts): + l3_notifier.router_removed_from_agent( + context, router_id, host) + + for agent in new_agents: + # Need to make sure agents are notified or unschedule otherwise + for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS): + try: + l3_notifier.router_added_to_agent( + context, [router_id], agent['host']) + break + except oslo_messaging.MessagingException: + LOG.warning(_LW('Failed to notify L3 agent on host ' + '%(host)s about added router. Attempt ' + '%(attempt)d out of %(max_attempts)d'), + {'host': agent['host'], 'attempt': attempt + 1, + 'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS}) + else: + self._unbind_router(context, router_id, agent['id']) + raise l3agentscheduler.RouterReschedulingFailed( + router_id=router_id) def list_routers_on_l3_agent(self, context, agent_id): query = context.session.query(RouterL3AgentBinding.router_id) diff --git a/neutron/db/l3_dvrscheduler_db.py b/neutron/db/l3_dvrscheduler_db.py index 425f47ec5..8c53096ea 100644 --- a/neutron/db/l3_dvrscheduler_db.py +++ b/neutron/db/l3_dvrscheduler_db.py @@ -382,42 +382,12 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin): context, router_id, chosen_agent) return chosen_agent - def reschedule_router(self, context, router_id, candidates=None): - """Reschedule router to new l3 agents - - Remove the router from l3 agents currently hosting it and - schedule it again - """ + def _unbind_router(self, context, router_id, agent_id): router = self.get_router(context, router_id) - is_distributed = router.get('distributed', False) - if not is_distributed: - return super(L3_DVRsch_db_mixin, self).reschedule_router( - context, router_id, candidates) - - old_agents = self.list_l3_agents_hosting_router( - context, router_id)['agents'] - with context.session.begin(subtransactions=True): - for agent in old_agents: - self._unbind_router(context, router_id, agent['id']) - self.unbind_snat_servicenode(context, router_id) - - self.schedule_router(context, router_id, candidates=candidates) - new_agents = self.list_l3_agents_hosting_router( - context, router_id)['agents'] - if not new_agents: - raise l3agentscheduler.RouterReschedulingFailed( - router_id=router_id) - - l3_notifier = self.agent_notifiers.get(n_const.AGENT_TYPE_L3) - if l3_notifier: - old_hosts = [agent['host'] for agent in old_agents] - new_hosts = [agent['host'] for agent in new_agents] - for host in set(old_hosts) - set(new_hosts): - l3_notifier.router_removed_from_agent( - context, router_id, host) - for host in new_hosts: - l3_notifier.router_added_to_agent( - context, [router_id], host) + super(L3_DVRsch_db_mixin, self)._unbind_router(context, router_id, + agent_id) + if router.get('distributed', False): + self.unbind_snat(context, router_id, agent_id) def _get_active_l3_agent_routers_sync_data(self, context, host, agent, router_ids): diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 2a56241ab..0c67e0849 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -229,6 +229,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, service_plugins = {'l3_plugin_name': self.l3_plugin} else: service_plugins = None + mock.patch('neutron.common.rpc.get_client').start() super(OvsAgentSchedulerTestCaseBase, self).setUp( self.plugin_str, service_plugins=service_plugins) ext_mgr = extensions.PluginAwareExtensionManager.get_instance() @@ -773,6 +774,49 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self.assertIn(dvr_agent['host'], [a['host'] for a in agents['agents']]) + def test_router_reschedule_succeeded_after_failed_notification(self): + l3_plugin = (manager.NeutronManager.get_service_plugins() + [service_constants.L3_ROUTER_NAT]) + l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] + l3_rpc_cb = l3_rpc.L3RpcCallback() + self._register_agent_states() + with self.router() as router: + # schedule the router to host A + l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA) + with mock.patch.object( + l3_notifier, 'router_added_to_agent') as notification_mock: + notification_mock.side_effect = [ + oslo_messaging.MessagingTimeout, None] + self._take_down_agent_and_run_reschedule(L3_HOSTA) + self.assertEqual( + 2, l3_notifier.router_added_to_agent.call_count) + # make sure router was rescheduled even when first attempt + # failed to notify l3 agent + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id'])['agents'] + self.assertEqual(1, len(l3_agents)) + self.assertEqual(L3_HOSTB, l3_agents[0]['host']) + + def test_router_reschedule_failed_notification_all_attempts(self): + l3_plugin = (manager.NeutronManager.get_service_plugins() + [service_constants.L3_ROUTER_NAT]) + l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] + l3_rpc_cb = l3_rpc.L3RpcCallback() + self._register_agent_states() + with self.router() as router: + # schedule the router to host A + l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA) + with mock.patch.object( + l3_notifier, 'router_added_to_agent') as notification_mock: + notification_mock.side_effect = oslo_messaging.MessagingTimeout + self._take_down_agent_and_run_reschedule(L3_HOSTA) + self.assertEqual( + l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS, + l3_notifier.router_added_to_agent.call_count) + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id'])['agents'] + self.assertEqual(0, len(l3_agents)) + def test_router_auto_schedule_with_invalid_router(self): with self.router() as router: l3_rpc_cb = l3_rpc.L3RpcCallback() @@ -1494,7 +1538,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin, l3_notifier.client, 'prepare', return_value=l3_notifier.client) as mock_prepare,\ - mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\ + mock.patch.object(l3_notifier.client, 'call') as mock_call,\ self.router() as router1: self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, @@ -1503,7 +1547,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin, router1['router']['id']) routers = [router1['router']['id']] mock_prepare.assert_called_with(server='hosta') - mock_cast.assert_called_with( + mock_call.assert_called_with( mock.ANY, 'router_added_to_agent', payload=routers) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'l3_agent.router.add' @@ -1518,6 +1562,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin, 'prepare', return_value=l3_notifier.client) as mock_prepare,\ mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\ + mock.patch.object(l3_notifier.client, 'call'),\ self.router() as router1: self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, diff --git a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py index ccf0012bd..f0d7f1a6b 100644 --- a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py +++ b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py @@ -1646,6 +1646,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase, super(L3HATestCaseMixin, self).setUp() self.adminContext = n_context.get_admin_context() + mock.patch('neutron.common.rpc.get_client').start() self.plugin = L3HAPlugin() self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')