target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
- def _notification_host(self, context, method, host, **kwargs):
+ def _notification_host(self, context, method, host, use_call=False,
+ **kwargs):
"""Notify the agent that is hosting the router."""
LOG.debug('Notify agent at %(host)s the message '
'%(method)s', {'host': host,
'method': method})
cctxt = self.client.prepare(server=host)
- cctxt.cast(context, method, **kwargs)
+ rpc_method = cctxt.call if use_call else cctxt.cast
+ rpc_method(context, method, **kwargs)
def _agent_notification(self, context, method, router_ids, operation,
shuffle_agents):
payload={'router_id': router_id})
def router_added_to_agent(self, context, router_ids, host):
+ # need to use call here as we want to be sure agent received
+ # notification and router will not be "lost". However using call()
+ # itself is not a guarantee, calling code should handle exceptions and
+ # retry
self._notification_host(context, 'router_added_to_agent', host,
- payload=router_ids)
+ use_call=True, payload=router_ids)
def routers_updated_on_host(self, context, router_ids, host):
self._notification_host(context, 'routers_updated', host,
cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
+# default messaging timeout is 60 sec, so 2 here is chosen to not block API
+# call for more than 2 minutes
+AGENT_NOTIFY_MAX_ATTEMPTS = 2
+
class RouterL3AgentBinding(model_base.BASEV2):
"""Represents binding between neutron routers and L3 agents."""
query.delete()
def reschedule_router(self, context, router_id, candidates=None):
- """Reschedule router to a new l3 agent
+ """Reschedule router to (a) new l3 agent(s)
Remove the router from the agent(s) currently hosting it and
schedule it again
for agent in cur_agents:
self._unbind_router(context, router_id, agent['id'])
- new_agent = self.schedule_router(context, router_id,
- candidates=candidates)
- if not new_agent:
+ self.schedule_router(context, router_id, candidates=candidates)
+ new_agents = self.list_l3_agents_hosting_router(
+ context, router_id)['agents']
+ if not new_agents:
raise l3agentscheduler.RouterReschedulingFailed(
router_id=router_id)
+ self._notify_agents_router_rescheduled(context, router_id,
+ cur_agents, new_agents)
+
+ def _notify_agents_router_rescheduled(self, context, router_id,
+ old_agents, new_agents):
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
- if l3_notifier:
- for agent in cur_agents:
- l3_notifier.router_removed_from_agent(
- context, router_id, agent['host'])
- l3_notifier.router_added_to_agent(
- context, [router_id], new_agent.host)
+ if not l3_notifier:
+ return
+
+ old_hosts = [agent['host'] for agent in old_agents]
+ new_hosts = [agent['host'] for agent in new_agents]
+ for host in set(old_hosts) - set(new_hosts):
+ l3_notifier.router_removed_from_agent(
+ context, router_id, host)
+
+ for agent in new_agents:
+ # Need to make sure agents are notified or unschedule otherwise
+ for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS):
+ try:
+ l3_notifier.router_added_to_agent(
+ context, [router_id], agent['host'])
+ break
+ except oslo_messaging.MessagingException:
+ LOG.warning(_LW('Failed to notify L3 agent on host '
+ '%(host)s about added router. Attempt '
+ '%(attempt)d out of %(max_attempts)d'),
+ {'host': agent['host'], 'attempt': attempt + 1,
+ 'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS})
+ else:
+ self._unbind_router(context, router_id, agent['id'])
+ raise l3agentscheduler.RouterReschedulingFailed(
+ router_id=router_id)
def list_routers_on_l3_agent(self, context, agent_id):
query = context.session.query(RouterL3AgentBinding.router_id)
context, router_id, chosen_agent)
return chosen_agent
- def reschedule_router(self, context, router_id, candidates=None):
- """Reschedule router to new l3 agents
-
- Remove the router from l3 agents currently hosting it and
- schedule it again
- """
+ def _unbind_router(self, context, router_id, agent_id):
router = self.get_router(context, router_id)
- is_distributed = router.get('distributed', False)
- if not is_distributed:
- return super(L3_DVRsch_db_mixin, self).reschedule_router(
- context, router_id, candidates)
-
- old_agents = self.list_l3_agents_hosting_router(
- context, router_id)['agents']
- with context.session.begin(subtransactions=True):
- for agent in old_agents:
- self._unbind_router(context, router_id, agent['id'])
- self.unbind_snat_servicenode(context, router_id)
-
- self.schedule_router(context, router_id, candidates=candidates)
- new_agents = self.list_l3_agents_hosting_router(
- context, router_id)['agents']
- if not new_agents:
- raise l3agentscheduler.RouterReschedulingFailed(
- router_id=router_id)
-
- l3_notifier = self.agent_notifiers.get(n_const.AGENT_TYPE_L3)
- if l3_notifier:
- old_hosts = [agent['host'] for agent in old_agents]
- new_hosts = [agent['host'] for agent in new_agents]
- for host in set(old_hosts) - set(new_hosts):
- l3_notifier.router_removed_from_agent(
- context, router_id, host)
- for host in new_hosts:
- l3_notifier.router_added_to_agent(
- context, [router_id], host)
+ super(L3_DVRsch_db_mixin, self)._unbind_router(context, router_id,
+ agent_id)
+ if router.get('distributed', False):
+ self.unbind_snat(context, router_id, agent_id)
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
router_ids):
service_plugins = {'l3_plugin_name': self.l3_plugin}
else:
service_plugins = None
+ mock.patch('neutron.common.rpc.get_client').start()
super(OvsAgentSchedulerTestCaseBase, self).setUp(
self.plugin_str, service_plugins=service_plugins)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.assertIn(dvr_agent['host'],
[a['host'] for a in agents['agents']])
+ def test_router_reschedule_succeeded_after_failed_notification(self):
+ l3_plugin = (manager.NeutronManager.get_service_plugins()
+ [service_constants.L3_ROUTER_NAT])
+ l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+ l3_rpc_cb = l3_rpc.L3RpcCallback()
+ self._register_agent_states()
+ with self.router() as router:
+ # schedule the router to host A
+ l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
+ with mock.patch.object(
+ l3_notifier, 'router_added_to_agent') as notification_mock:
+ notification_mock.side_effect = [
+ oslo_messaging.MessagingTimeout, None]
+ self._take_down_agent_and_run_reschedule(L3_HOSTA)
+ self.assertEqual(
+ 2, l3_notifier.router_added_to_agent.call_count)
+ # make sure router was rescheduled even when first attempt
+ # failed to notify l3 agent
+ l3_agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])['agents']
+ self.assertEqual(1, len(l3_agents))
+ self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
+
+ def test_router_reschedule_failed_notification_all_attempts(self):
+ l3_plugin = (manager.NeutronManager.get_service_plugins()
+ [service_constants.L3_ROUTER_NAT])
+ l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+ l3_rpc_cb = l3_rpc.L3RpcCallback()
+ self._register_agent_states()
+ with self.router() as router:
+ # schedule the router to host A
+ l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
+ with mock.patch.object(
+ l3_notifier, 'router_added_to_agent') as notification_mock:
+ notification_mock.side_effect = oslo_messaging.MessagingTimeout
+ self._take_down_agent_and_run_reschedule(L3_HOSTA)
+ self.assertEqual(
+ l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS,
+ l3_notifier.router_added_to_agent.call_count)
+ l3_agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])['agents']
+ self.assertEqual(0, len(l3_agents))
+
def test_router_auto_schedule_with_invalid_router(self):
with self.router() as router:
l3_rpc_cb = l3_rpc.L3RpcCallback()
l3_notifier.client,
'prepare',
return_value=l3_notifier.client) as mock_prepare,\
- mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
+ mock.patch.object(l3_notifier.client, 'call') as mock_call,\
self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
router1['router']['id'])
routers = [router1['router']['id']]
mock_prepare.assert_called_with(server='hosta')
- mock_cast.assert_called_with(
+ mock_call.assert_called_with(
mock.ANY, 'router_added_to_agent', payload=routers)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'l3_agent.router.add'
'prepare',
return_value=l3_notifier.client) as mock_prepare,\
mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
+ mock.patch.object(l3_notifier.client, 'call'),\
self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
super(L3HATestCaseMixin, self).setUp()
self.adminContext = n_context.get_admin_context()
+ mock.patch('neutron.common.rpc.get_client').start()
self.plugin = L3HAPlugin()
self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')