]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
L3 agent: paginate sync routers task
authorOleg Bondarev <obondarev@mirantis.com>
Tue, 13 Oct 2015 09:45:59 +0000 (12:45 +0300)
committerOleg Bondarev <obondarev@mirantis.com>
Tue, 12 Jan 2016 09:56:45 +0000 (12:56 +0300)
In case there are thousands of routers attached to thousands of
networks, sync_routers request might take a long time and lead to timeout
on agent side, so agent initiate another resync. This may lead to an endless
loop causing server overload and agent not being able to sync state.

This patch makes l3 agent first check how many routers are assigned to
it and then start to fetch routers by chunks.
Initial chunk size is set to 256 but may be decreased dynamically in case
timeouts happen while waiting response from server.

This approach allows to reduce the load on server side and to speed up
resync on agent side by starting processing right after receiving
the first chunk.

Closes-Bug: #1516260
Change-Id: Id675910c2a0b862bfb9e6f4fdaf3cd9fe337e52f

neutron/agent/l3/agent.py
neutron/api/rpc/handlers/l3_rpc.py
neutron/db/l3_agentschedulers_db.py
neutron/tests/functional/agent/l3/test_legacy_router.py
neutron/tests/unit/agent/l3/test_agent.py

index ad31ed94372b48e0e35bf75d26ccd584e3e1fe52..703337903409eeec96475fe68f83e1ab0ad4785d 100644 (file)
@@ -64,6 +64,11 @@ NS_PREFIX = namespaces.NS_PREFIX
 INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
 EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
 
+# Number of routers to fetch from server at a time on resync.
+# Needed to reduce load on server side and to speed up resync on agent side.
+SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
+SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
+
 
 class L3PluginApi(object):
     """Agent side of the l3 agent RPC API.
@@ -83,6 +88,7 @@ class L3PluginApi(object):
         1.7 - DVR support: new L3 plugin methods added.
               - delete_agent_gateway_port
         1.8 - Added address scope information
+        1.9 - Added get_router_ids
     """
 
     def __init__(self, topic, host):
@@ -96,6 +102,11 @@ class L3PluginApi(object):
         return cctxt.call(context, 'sync_routers', host=self.host,
                           router_ids=router_ids)
 
+    def get_router_ids(self, context):
+        """Make a remote process call to retrieve scheduled routers ids."""
+        cctxt = self.client.prepare(version='1.9')
+        return cctxt.call(context, 'get_router_ids', host=self.host)
+
     def get_external_network_id(self, context):
         """Make a remote process call to retrieve the external network id.
 
@@ -188,6 +199,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
         self.context = n_context.get_admin_context_without_session()
         self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
         self.fullsync = True
+        self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
 
         # Get the list of service plugins from Neutron Server
         # This is the first place where we contact neutron-server on startup
@@ -532,45 +544,68 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
 
     def fetch_and_sync_all_routers(self, context, ns_manager):
         prev_router_ids = set(self.router_info)
+        curr_router_ids = set()
         timestamp = timeutils.utcnow()
 
         try:
-            if self.conf.router_id:
-                routers = self.plugin_rpc.get_routers(context,
-                                                      [self.conf.router_id])
+            router_ids = ([self.conf.router_id] if self.conf.router_id else
+                          self.plugin_rpc.get_router_ids(context))
+            # fetch routers by chunks to reduce the load on server and to
+            # start router processing earlier
+            for i in range(0, len(router_ids), self.sync_routers_chunk_size):
+                routers = self.plugin_rpc.get_routers(
+                    context, router_ids[i:i + self.sync_routers_chunk_size])
+                LOG.debug('Processing :%r', routers)
+                for r in routers:
+                    curr_router_ids.add(r['id'])
+                    ns_manager.keep_router(r['id'])
+                    if r.get('distributed'):
+                        # need to keep fip namespaces as well
+                        ext_net_id = (r['external_gateway_info'] or {}).get(
+                            'network_id')
+                        if ext_net_id:
+                            ns_manager.keep_ext_net(ext_net_id)
+                    update = queue.RouterUpdate(
+                        r['id'],
+                        queue.PRIORITY_SYNC_ROUTERS_TASK,
+                        router=r,
+                        timestamp=timestamp)
+                    self._queue.add(update)
+        except oslo_messaging.MessagingTimeout:
+            if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
+                self.sync_routers_chunk_size = max(
+                    self.sync_routers_chunk_size / 2,
+                    SYNC_ROUTERS_MIN_CHUNK_SIZE)
+                LOG.error(_LE('Server failed to return info for routers in '
+                              'required time, decreasing chunk size to: %s'),
+                          self.sync_routers_chunk_size)
             else:
-                routers = self.plugin_rpc.get_routers(context)
+                LOG.error(_LE('Server failed to return info for routers in '
+                              'required time even with min chunk size: %s. '
+                              'It might be under very high load or '
+                              'just inoperable'),
+                          self.sync_routers_chunk_size)
+            raise
         except oslo_messaging.MessagingException:
             LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
             raise n_exc.AbortSyncRouters()
-        else:
-            LOG.debug('Processing :%r', routers)
-            for r in routers:
-                ns_manager.keep_router(r['id'])
-                if r.get('distributed'):
-                    # need to keep fip namespaces as well
-                    ext_net_id = (r['external_gateway_info'] or {}).get(
-                        'network_id')
-                    if ext_net_id:
-                        ns_manager.keep_ext_net(ext_net_id)
-                update = queue.RouterUpdate(r['id'],
-                                            queue.PRIORITY_SYNC_ROUTERS_TASK,
-                                            router=r,
-                                            timestamp=timestamp)
-                self._queue.add(update)
-            self.fullsync = False
-            LOG.debug("periodic_sync_routers_task successfully completed")
-
-            curr_router_ids = set([r['id'] for r in routers])
-
-            # Delete routers that have disappeared since the last sync
-            for router_id in prev_router_ids - curr_router_ids:
-                ns_manager.keep_router(router_id)
-                update = queue.RouterUpdate(router_id,
-                                            queue.PRIORITY_SYNC_ROUTERS_TASK,
-                                            timestamp=timestamp,
-                                            action=queue.DELETE_ROUTER)
-                self._queue.add(update)
+
+        self.fullsync = False
+        LOG.debug("periodic_sync_routers_task successfully completed")
+        # adjust chunk size after successful sync
+        if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
+            self.sync_routers_chunk_size = min(
+                self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
+                SYNC_ROUTERS_MAX_CHUNK_SIZE)
+
+        # Delete routers that have disappeared since the last sync
+        for router_id in prev_router_ids - curr_router_ids:
+            ns_manager.keep_router(router_id)
+            update = queue.RouterUpdate(router_id,
+                                        queue.PRIORITY_SYNC_ROUTERS_TASK,
+                                        timestamp=timestamp,
+                                        action=queue.DELETE_ROUTER)
+            self._queue.add(update)
 
     def after_start(self):
         # Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
index e0ec3e1c61e4fad74f4192532afbb4385c5f1109..b16eea820c0388734aaf219a63e12dae5c1847f6 100644 (file)
@@ -46,7 +46,8 @@ class L3RpcCallback(object):
     # 1.6 Added process_prefix_update to support IPv6 Prefix Delegation
     # 1.7 Added method delete_agent_gateway_port for DVR Routers
     # 1.8 Added address scope information
-    target = oslo_messaging.Target(version='1.8')
+    # 1.9 Added get_router_ids
+    target = oslo_messaging.Target(version='1.9')
 
     @property
     def plugin(self):
@@ -61,6 +62,10 @@ class L3RpcCallback(object):
                 plugin_constants.L3_ROUTER_NAT]
         return self._l3plugin
 
+    def get_router_ids(self, context, host):
+        """Returns IDs of routers scheduled to l3 agent on <host>"""
+        return self.l3plugin.list_router_ids_on_host(context, host)
+
     @db_api.retry_db_errors
     def sync_routers(self, context, **kwargs):
         """Sync routers according to filters to a specific agent.
index 5ee651f794e16953bf72e8eefd324b0216adea9d..0c6ec3978c06cb3e460d477311bb7bdba20ca4c0 100644 (file)
@@ -370,8 +370,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
 
         return self.get_sync_data(context, router_ids=router_ids, active=True)
 
-    def list_active_sync_routers_on_active_l3_agent(
-            self, context, host, router_ids):
+    def list_router_ids_on_host(self, context, host, router_ids=None):
         agent = self._get_agent_by_type_and_host(
             context, constants.AGENT_TYPE_L3, host)
         if not agentschedulers_db.services_available(agent.admin_state_up):
@@ -383,8 +382,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
         if router_ids:
             query = query.filter(
                 RouterL3AgentBinding.router_id.in_(router_ids))
-        router_ids = [item[0] for item in query]
+
+        return [item[0] for item in query]
+
+    def list_active_sync_routers_on_active_l3_agent(
+            self, context, host, router_ids):
+        router_ids = self.list_router_ids_on_host(context, host, router_ids)
         if router_ids:
+            agent = self._get_agent_by_type_and_host(
+                context, constants.AGENT_TYPE_L3, host)
             return self._get_active_l3_agent_routers_sync_data(context, host,
                                                                agent,
                                                                router_ids)
index 2f075970d4e76301af597a4aaeffee750393007c..153d58865840fc5ac862d59f5a8e4362b893b6d9 100644 (file)
@@ -90,6 +90,10 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
             deleted_routers_info.append(ri)
             ns_names_to_retrieve.add(ri.ns_name)
 
+        mocked_get_router_ids = self.mock_plugin_api.get_router_ids
+        mocked_get_router_ids.return_value = [r['id'] for r in
+                                              routers_to_keep +
+                                              routers_deleted_during_resync]
         mocked_get_routers = self.mock_plugin_api.get_routers
         mocked_get_routers.return_value = (routers_to_keep +
                                            routers_deleted_during_resync)
index 23d93c4f24ceb3d9685c0c24c3afa0f1876b47e0..c12d4c1c40f05200d472dd0b6be20f3113046c75 100644 (file)
@@ -200,6 +200,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
 
     def test_periodic_sync_routers_task_raise_exception(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+        self.plugin_api.get_router_ids.return_value = ['fake_id']
         self.plugin_api.get_routers.side_effect = ValueError
         self.assertRaises(ValueError,
                           agent.periodic_sync_routers_task,
@@ -247,6 +248,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         stale_router_ids = [_uuid(), _uuid()]
         active_routers = [{'id': _uuid()}, {'id': _uuid()}]
+        self.plugin_api.get_router_ids.return_value = [r['id'] for r
+                                                       in active_routers]
         self.plugin_api.get_routers.return_value = active_routers
         namespace_list = [namespaces.NS_PREFIX + r_id
                           for r_id in stale_router_ids]