From: Oleg Bondarev Date: Tue, 13 Oct 2015 09:45:59 +0000 (+0300) Subject: L3 agent: paginate sync routers task X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=0e97feb0f30bc0ef6f4fe041cb41b7aa81042263;p=openstack-build%2Fneutron-build.git L3 agent: paginate sync routers task In case there are thousands of routers attached to thousands of networks, sync_routers request might take a long time and lead to timeout on agent side, so agent initiate another resync. This may lead to an endless loop causing server overload and agent not being able to sync state. This patch makes l3 agent first check how many routers are assigned to it and then start to fetch routers by chunks. Initial chunk size is set to 256 but may be decreased dynamically in case timeouts happen while waiting response from server. This approach allows to reduce the load on server side and to speed up resync on agent side by starting processing right after receiving the first chunk. Closes-Bug: #1516260 Change-Id: Id675910c2a0b862bfb9e6f4fdaf3cd9fe337e52f --- diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index ad31ed943..703337903 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -64,6 +64,11 @@ NS_PREFIX = namespaces.NS_PREFIX INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX +# Number of routers to fetch from server at a time on resync. +# Needed to reduce load on server side and to speed up resync on agent side. +SYNC_ROUTERS_MAX_CHUNK_SIZE = 256 +SYNC_ROUTERS_MIN_CHUNK_SIZE = 32 + class L3PluginApi(object): """Agent side of the l3 agent RPC API. @@ -83,6 +88,7 @@ class L3PluginApi(object): 1.7 - DVR support: new L3 plugin methods added. - delete_agent_gateway_port 1.8 - Added address scope information + 1.9 - Added get_router_ids """ def __init__(self, topic, host): @@ -96,6 +102,11 @@ class L3PluginApi(object): return cctxt.call(context, 'sync_routers', host=self.host, router_ids=router_ids) + def get_router_ids(self, context): + """Make a remote process call to retrieve scheduled routers ids.""" + cctxt = self.client.prepare(version='1.9') + return cctxt.call(context, 'get_router_ids', host=self.host) + def get_external_network_id(self, context): """Make a remote process call to retrieve the external network id. @@ -188,6 +199,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, self.context = n_context.get_admin_context_without_session() self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host) self.fullsync = True + self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE # Get the list of service plugins from Neutron Server # This is the first place where we contact neutron-server on startup @@ -532,45 +544,68 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, def fetch_and_sync_all_routers(self, context, ns_manager): prev_router_ids = set(self.router_info) + curr_router_ids = set() timestamp = timeutils.utcnow() try: - if self.conf.router_id: - routers = self.plugin_rpc.get_routers(context, - [self.conf.router_id]) + router_ids = ([self.conf.router_id] if self.conf.router_id else + self.plugin_rpc.get_router_ids(context)) + # fetch routers by chunks to reduce the load on server and to + # start router processing earlier + for i in range(0, len(router_ids), self.sync_routers_chunk_size): + routers = self.plugin_rpc.get_routers( + context, router_ids[i:i + self.sync_routers_chunk_size]) + LOG.debug('Processing :%r', routers) + for r in routers: + curr_router_ids.add(r['id']) + ns_manager.keep_router(r['id']) + if r.get('distributed'): + # need to keep fip namespaces as well + ext_net_id = (r['external_gateway_info'] or {}).get( + 'network_id') + if ext_net_id: + ns_manager.keep_ext_net(ext_net_id) + update = queue.RouterUpdate( + r['id'], + queue.PRIORITY_SYNC_ROUTERS_TASK, + router=r, + timestamp=timestamp) + self._queue.add(update) + except oslo_messaging.MessagingTimeout: + if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE: + self.sync_routers_chunk_size = max( + self.sync_routers_chunk_size / 2, + SYNC_ROUTERS_MIN_CHUNK_SIZE) + LOG.error(_LE('Server failed to return info for routers in ' + 'required time, decreasing chunk size to: %s'), + self.sync_routers_chunk_size) else: - routers = self.plugin_rpc.get_routers(context) + LOG.error(_LE('Server failed to return info for routers in ' + 'required time even with min chunk size: %s. ' + 'It might be under very high load or ' + 'just inoperable'), + self.sync_routers_chunk_size) + raise except oslo_messaging.MessagingException: LOG.exception(_LE("Failed synchronizing routers due to RPC error")) raise n_exc.AbortSyncRouters() - else: - LOG.debug('Processing :%r', routers) - for r in routers: - ns_manager.keep_router(r['id']) - if r.get('distributed'): - # need to keep fip namespaces as well - ext_net_id = (r['external_gateway_info'] or {}).get( - 'network_id') - if ext_net_id: - ns_manager.keep_ext_net(ext_net_id) - update = queue.RouterUpdate(r['id'], - queue.PRIORITY_SYNC_ROUTERS_TASK, - router=r, - timestamp=timestamp) - self._queue.add(update) - self.fullsync = False - LOG.debug("periodic_sync_routers_task successfully completed") - - curr_router_ids = set([r['id'] for r in routers]) - - # Delete routers that have disappeared since the last sync - for router_id in prev_router_ids - curr_router_ids: - ns_manager.keep_router(router_id) - update = queue.RouterUpdate(router_id, - queue.PRIORITY_SYNC_ROUTERS_TASK, - timestamp=timestamp, - action=queue.DELETE_ROUTER) - self._queue.add(update) + + self.fullsync = False + LOG.debug("periodic_sync_routers_task successfully completed") + # adjust chunk size after successful sync + if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE: + self.sync_routers_chunk_size = min( + self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE, + SYNC_ROUTERS_MAX_CHUNK_SIZE) + + # Delete routers that have disappeared since the last sync + for router_id in prev_router_ids - curr_router_ids: + ns_manager.keep_router(router_id) + update = queue.RouterUpdate(router_id, + queue.PRIORITY_SYNC_ROUTERS_TASK, + timestamp=timestamp, + action=queue.DELETE_ROUTER) + self._queue.add(update) def after_start(self): # Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It diff --git a/neutron/api/rpc/handlers/l3_rpc.py b/neutron/api/rpc/handlers/l3_rpc.py index e0ec3e1c6..b16eea820 100644 --- a/neutron/api/rpc/handlers/l3_rpc.py +++ b/neutron/api/rpc/handlers/l3_rpc.py @@ -46,7 +46,8 @@ class L3RpcCallback(object): # 1.6 Added process_prefix_update to support IPv6 Prefix Delegation # 1.7 Added method delete_agent_gateway_port for DVR Routers # 1.8 Added address scope information - target = oslo_messaging.Target(version='1.8') + # 1.9 Added get_router_ids + target = oslo_messaging.Target(version='1.9') @property def plugin(self): @@ -61,6 +62,10 @@ class L3RpcCallback(object): plugin_constants.L3_ROUTER_NAT] return self._l3plugin + def get_router_ids(self, context, host): + """Returns IDs of routers scheduled to l3 agent on """ + return self.l3plugin.list_router_ids_on_host(context, host) + @db_api.retry_db_errors def sync_routers(self, context, **kwargs): """Sync routers according to filters to a specific agent. diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index 5ee651f79..0c6ec3978 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -370,8 +370,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, return self.get_sync_data(context, router_ids=router_ids, active=True) - def list_active_sync_routers_on_active_l3_agent( - self, context, host, router_ids): + def list_router_ids_on_host(self, context, host, router_ids=None): agent = self._get_agent_by_type_and_host( context, constants.AGENT_TYPE_L3, host) if not agentschedulers_db.services_available(agent.admin_state_up): @@ -383,8 +382,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, if router_ids: query = query.filter( RouterL3AgentBinding.router_id.in_(router_ids)) - router_ids = [item[0] for item in query] + + return [item[0] for item in query] + + def list_active_sync_routers_on_active_l3_agent( + self, context, host, router_ids): + router_ids = self.list_router_ids_on_host(context, host, router_ids) if router_ids: + agent = self._get_agent_by_type_and_host( + context, constants.AGENT_TYPE_L3, host) return self._get_active_l3_agent_routers_sync_data(context, host, agent, router_ids) diff --git a/neutron/tests/functional/agent/l3/test_legacy_router.py b/neutron/tests/functional/agent/l3/test_legacy_router.py index 2f075970d..153d58865 100644 --- a/neutron/tests/functional/agent/l3/test_legacy_router.py +++ b/neutron/tests/functional/agent/l3/test_legacy_router.py @@ -90,6 +90,10 @@ class L3AgentTestCase(framework.L3AgentTestFramework): deleted_routers_info.append(ri) ns_names_to_retrieve.add(ri.ns_name) + mocked_get_router_ids = self.mock_plugin_api.get_router_ids + mocked_get_router_ids.return_value = [r['id'] for r in + routers_to_keep + + routers_deleted_during_resync] mocked_get_routers = self.mock_plugin_api.get_routers mocked_get_routers.return_value = (routers_to_keep + routers_deleted_during_resync) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index 23d93c4f2..c12d4c1c4 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -200,6 +200,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): def test_periodic_sync_routers_task_raise_exception(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.plugin_api.get_router_ids.return_value = ['fake_id'] self.plugin_api.get_routers.side_effect = ValueError self.assertRaises(ValueError, agent.periodic_sync_routers_task, @@ -247,6 +248,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) stale_router_ids = [_uuid(), _uuid()] active_routers = [{'id': _uuid()}, {'id': _uuid()}] + self.plugin_api.get_router_ids.return_value = [r['id'] for r + in active_routers] self.plugin_api.get_routers.return_value = active_routers namespace_list = [namespaces.NS_PREFIX + r_id for r_id in stale_router_ids]