INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
+# Number of routers to fetch from server at a time on resync.
+# Needed to reduce load on server side and to speed up resync on agent side.
+SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
+SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
+
class L3PluginApi(object):
"""Agent side of the l3 agent RPC API.
1.7 - DVR support: new L3 plugin methods added.
- delete_agent_gateway_port
1.8 - Added address scope information
+ 1.9 - Added get_router_ids
"""
def __init__(self, topic, host):
return cctxt.call(context, 'sync_routers', host=self.host,
router_ids=router_ids)
+ def get_router_ids(self, context):
+ """Make a remote process call to retrieve scheduled routers ids."""
+ cctxt = self.client.prepare(version='1.9')
+ return cctxt.call(context, 'get_router_ids', host=self.host)
+
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
self.context = n_context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
+ self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
# Get the list of service plugins from Neutron Server
# This is the first place where we contact neutron-server on startup
def fetch_and_sync_all_routers(self, context, ns_manager):
prev_router_ids = set(self.router_info)
+ curr_router_ids = set()
timestamp = timeutils.utcnow()
try:
- if self.conf.router_id:
- routers = self.plugin_rpc.get_routers(context,
- [self.conf.router_id])
+ router_ids = ([self.conf.router_id] if self.conf.router_id else
+ self.plugin_rpc.get_router_ids(context))
+ # fetch routers by chunks to reduce the load on server and to
+ # start router processing earlier
+ for i in range(0, len(router_ids), self.sync_routers_chunk_size):
+ routers = self.plugin_rpc.get_routers(
+ context, router_ids[i:i + self.sync_routers_chunk_size])
+ LOG.debug('Processing :%r', routers)
+ for r in routers:
+ curr_router_ids.add(r['id'])
+ ns_manager.keep_router(r['id'])
+ if r.get('distributed'):
+ # need to keep fip namespaces as well
+ ext_net_id = (r['external_gateway_info'] or {}).get(
+ 'network_id')
+ if ext_net_id:
+ ns_manager.keep_ext_net(ext_net_id)
+ update = queue.RouterUpdate(
+ r['id'],
+ queue.PRIORITY_SYNC_ROUTERS_TASK,
+ router=r,
+ timestamp=timestamp)
+ self._queue.add(update)
+ except oslo_messaging.MessagingTimeout:
+ if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
+ self.sync_routers_chunk_size = max(
+ self.sync_routers_chunk_size / 2,
+ SYNC_ROUTERS_MIN_CHUNK_SIZE)
+ LOG.error(_LE('Server failed to return info for routers in '
+ 'required time, decreasing chunk size to: %s'),
+ self.sync_routers_chunk_size)
else:
- routers = self.plugin_rpc.get_routers(context)
+ LOG.error(_LE('Server failed to return info for routers in '
+ 'required time even with min chunk size: %s. '
+ 'It might be under very high load or '
+ 'just inoperable'),
+ self.sync_routers_chunk_size)
+ raise
except oslo_messaging.MessagingException:
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
raise n_exc.AbortSyncRouters()
- else:
- LOG.debug('Processing :%r', routers)
- for r in routers:
- ns_manager.keep_router(r['id'])
- if r.get('distributed'):
- # need to keep fip namespaces as well
- ext_net_id = (r['external_gateway_info'] or {}).get(
- 'network_id')
- if ext_net_id:
- ns_manager.keep_ext_net(ext_net_id)
- update = queue.RouterUpdate(r['id'],
- queue.PRIORITY_SYNC_ROUTERS_TASK,
- router=r,
- timestamp=timestamp)
- self._queue.add(update)
- self.fullsync = False
- LOG.debug("periodic_sync_routers_task successfully completed")
-
- curr_router_ids = set([r['id'] for r in routers])
-
- # Delete routers that have disappeared since the last sync
- for router_id in prev_router_ids - curr_router_ids:
- ns_manager.keep_router(router_id)
- update = queue.RouterUpdate(router_id,
- queue.PRIORITY_SYNC_ROUTERS_TASK,
- timestamp=timestamp,
- action=queue.DELETE_ROUTER)
- self._queue.add(update)
+
+ self.fullsync = False
+ LOG.debug("periodic_sync_routers_task successfully completed")
+ # adjust chunk size after successful sync
+ if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
+ self.sync_routers_chunk_size = min(
+ self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
+ SYNC_ROUTERS_MAX_CHUNK_SIZE)
+
+ # Delete routers that have disappeared since the last sync
+ for router_id in prev_router_ids - curr_router_ids:
+ ns_manager.keep_router(router_id)
+ update = queue.RouterUpdate(router_id,
+ queue.PRIORITY_SYNC_ROUTERS_TASK,
+ timestamp=timestamp,
+ action=queue.DELETE_ROUTER)
+ self._queue.add(update)
def after_start(self):
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
return self.get_sync_data(context, router_ids=router_ids, active=True)
- def list_active_sync_routers_on_active_l3_agent(
- self, context, host, router_ids):
+ def list_router_ids_on_host(self, context, host, router_ids=None):
agent = self._get_agent_by_type_and_host(
context, constants.AGENT_TYPE_L3, host)
if not agentschedulers_db.services_available(agent.admin_state_up):
if router_ids:
query = query.filter(
RouterL3AgentBinding.router_id.in_(router_ids))
- router_ids = [item[0] for item in query]
+
+ return [item[0] for item in query]
+
+ def list_active_sync_routers_on_active_l3_agent(
+ self, context, host, router_ids):
+ router_ids = self.list_router_ids_on_host(context, host, router_ids)
if router_ids:
+ agent = self._get_agent_by_type_and_host(
+ context, constants.AGENT_TYPE_L3, host)
return self._get_active_l3_agent_routers_sync_data(context, host,
agent,
router_ids)