import sys
+import datetime
import eventlet
eventlet.monkey_patch()
import netaddr
from oslo.config import cfg
+import Queue
from neutron.agent.common import config
from neutron.agent.linux import external_process
from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
-from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.openstack.common import processutils
from neutron.openstack.common import service
+from neutron.openstack.common import timeutils
from neutron import service as neutron_service
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
EXTERNAL_DEV_PREFIX = 'qg-'
RPC_LOOP_INTERVAL = 1
FLOATING_IP_CIDR_SUFFIX = '/32'
+# Lower value is higher priority
+PRIORITY_RPC = 0
+PRIORITY_SYNC_ROUTERS_TASK = 1
+DELETE_ROUTER = 1
class L3PluginApi(n_rpc.RpcProxy):
self._snat_action = None
+class RouterUpdate(object):
+ """Encapsulates a router update
+
+ An instance of this object carries the information necessary to prioritize
+ and process a request to update a router.
+ """
+ def __init__(self, router_id, priority,
+ action=None, router=None, timestamp=None):
+ self.priority = priority
+ self.timestamp = timestamp
+ if not timestamp:
+ self.timestamp = timeutils.utcnow()
+ self.id = router_id
+ self.action = action
+ self.router = router
+
+ def __lt__(self, other):
+ """Implements priority among updates
+
+ Lower numerical priority always gets precedence. When comparing two
+ updates of the same priority then the one with the earlier timestamp
+ gets procedence. In the unlikely event that the timestamps are also
+ equal it falls back to a simple comparison of ids meaning the
+ precedence is essentially random.
+ """
+ if self.priority != other.priority:
+ return self.priority < other.priority
+ if self.timestamp != other.timestamp:
+ return self.timestamp < other.timestamp
+ return self.id < other.id
+
+
+class ExclusiveRouterProcessor(object):
+ """Manager for access to a router for processing
+
+ This class controls access to a router in a non-blocking way. The first
+ instance to be created for a given router_id is granted exclusive access to
+ the router.
+
+ Other instances may be created for the same router_id while the first
+ instance has exclusive access. If that happens then it doesn't block and
+ wait for access. Instead, it signals to the master instance that an update
+ came in with the timestamp.
+
+ This way, a thread will not block to wait for access to a router. Instead
+ it effectively signals to the thread that is working on the router that
+ something has changed since it started working on it. That thread will
+ simply finish its current iteration and then repeat.
+
+ This class keeps track of the last time that a router data was fetched and
+ processed. The timestamp that it keeps must be before when the data used
+ to process the router last was fetched from the database. But, as close as
+ possible. The timestamp should not be recorded, however, until the router
+ has been processed using the fetch data.
+ """
+ _masters = {}
+ _router_timestamps = {}
+
+ def __init__(self, router_id):
+ self._router_id = router_id
+
+ if router_id not in self._masters:
+ self._masters[router_id] = self
+ self._queue = []
+
+ self._master = self._masters[router_id]
+
+ def _i_am_master(self):
+ return self == self._master
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ if self._i_am_master():
+ del self._masters[self._router_id]
+
+ def _get_router_data_timestamp(self):
+ return self._router_timestamps.get(self._router_id,
+ datetime.datetime.min)
+
+ def fetched_and_processed(self, timestamp):
+ """Records the data timestamp after it is used to update the router"""
+ new_timestamp = max(timestamp, self._get_router_data_timestamp())
+ self._router_timestamps[self._router_id] = new_timestamp
+
+ def queue_update(self, update):
+ """Queues an update from a worker
+
+ This is the queue used to keep new updates that come in while a router
+ is being processed. These updates have already bubbled to the front of
+ the RouterProcessingQueue.
+ """
+ self._master._queue.append(update)
+
+ def updates(self):
+ """Processes the router until updates stop coming
+
+ Only the master instance will process the router. However, updates may
+ come in from other workers while it is in progress. This method loops
+ until they stop coming.
+ """
+ if self._i_am_master():
+ while self._queue:
+ # Remove the update from the queue even if it is old.
+ update = self._queue.pop(0)
+ # Process the update only if it is fresh.
+ if self._get_router_data_timestamp() < update.timestamp:
+ yield update
+
+
+class RouterProcessingQueue(object):
+ """Manager of the queue of routers to process."""
+ def __init__(self):
+ self._queue = Queue.PriorityQueue()
+
+ def add(self, update):
+ self._queue.put(update)
+
+ def each_update_to_next_router(self):
+ """Grabs the next router from the queue and processes
+
+ This method uses a for loop to process the router repeatedly until
+ updates stop bubbling to the front of the queue.
+ """
+ next_update = self._queue.get()
+
+ with ExclusiveRouterProcessor(next_update.id) as rp:
+ # Queue the update whether this worker is the master or not.
+ rp.queue_update(next_update)
+
+ # Here, if the current worker is not the master, the call to
+ # rp.updates() will not yield and so this will essentially be a
+ # noop.
+ for update in rp.updates():
+ yield (rp, update)
+
+
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
"""Manager for L3NatAgent
self._clean_stale_namespaces = self.conf.use_namespaces
- self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
- self._rpc_loop)
- self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
+ self._queue = RouterProcessingQueue()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
LOG.error(msg)
raise SystemExit(1)
- def _cleanup_namespaces(self, routers):
- """Destroy stale router namespaces on host when L3 agent restarts
-
- This routine is called when self._clean_stale_namespaces is True.
+ def _list_namespaces(self):
+ """Get a set of all router namespaces on host
The argument routers is the list of routers that are recorded in
the database as being hosted on this node.
root_ip = ip_lib.IPWrapper(self.root_helper)
host_namespaces = root_ip.get_namespaces(self.root_helper)
- router_namespaces = set(ns for ns in host_namespaces
- if ns.startswith(NS_PREFIX))
- ns_to_ignore = set(NS_PREFIX + r['id'] for r in routers)
- ns_to_destroy = router_namespaces - ns_to_ignore
+ return set(ns for ns in host_namespaces
+ if ns.startswith(NS_PREFIX))
except RuntimeError:
LOG.exception(_('RuntimeError in obtaining router list '
'for namespace cleanup.'))
- else:
- self._destroy_stale_router_namespaces(ns_to_destroy)
+ return set()
+
+ def _cleanup_namespaces(self, router_namespaces, router_ids):
+ """Destroy stale router namespaces on host when L3 agent restarts
+
+ This routine is called when self._clean_stale_namespaces is True.
+
+ The argument router_namespaces is the list of all routers namespaces
+ The argument router_ids is the list of ids for known routers.
+ """
+ ns_to_ignore = set(NS_PREFIX + id for id in router_ids)
+ ns_to_destroy = router_namespaces - ns_to_ignore
+ self._destroy_stale_router_namespaces(ns_to_destroy)
def _destroy_stale_router_namespaces(self, router_namespaces):
"""Destroys the stale router namespaces
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug(_('Got router deleted notification for %s'), router_id)
- self.removed_routers.add(router_id)
+ update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
+ self._queue.add(update)
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
# This is needed for backward compatibility
if isinstance(routers[0], dict):
routers = [router['id'] for router in routers]
- self.updated_routers.update(routers)
+ for id in routers:
+ update = RouterUpdate(id, PRIORITY_RPC)
+ self._queue.add(update)
def router_removed_from_agent(self, context, payload):
LOG.debug(_('Got router removed from agent :%r'), payload)
- self.removed_routers.add(payload['router_id'])
+ router_id = payload['router_id']
+ update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
+ self._queue.add(update)
def router_added_to_agent(self, context, payload):
LOG.debug(_('Got router added to agent :%r'), payload)
pool.spawn_n(self._router_removed, router_id)
pool.waitall()
- @lockutils.synchronized('l3-agent', 'neutron-')
- def _rpc_loop(self):
- # _rpc_loop and _sync_routers_task will not be
- # executed in the same time because of lock.
- # so we can clear the value of updated_routers
- # and removed_routers, but they can be updated by
- # updated_routers and removed_routers rpc call
- try:
- LOG.debug(_("Starting RPC loop for %d updated routers"),
- len(self.updated_routers))
- if self.updated_routers:
- # We're capturing and clearing the list, and will
- # process the "captured" updates in this loop,
- # and any updates that happen due to a context switch
- # will be picked up on the next pass.
- updated_routers = set(self.updated_routers)
- self.updated_routers.clear()
- router_ids = list(updated_routers)
- routers = self.plugin_rpc.get_routers(
- self.context, router_ids)
- # routers with admin_state_up=false will not be in the fetched
- fetched = set([r['id'] for r in routers])
- self.removed_routers.update(updated_routers - fetched)
-
- self._process_routers(routers)
- self._process_router_delete()
- LOG.debug(_("RPC loop successfully completed"))
- except Exception:
- LOG.exception(_("Failed synchronizing routers"))
- self.fullsync = True
+ def _process_router_update(self):
+ for rp, update in self._queue.each_update_to_next_router():
+ LOG.debug("Starting router update for %s", update.id)
+ router = update.router
+ if update.action != DELETE_ROUTER and not router:
+ try:
+ update.timestamp = timeutils.utcnow()
+ routers = self.plugin_rpc.get_routers(self.context,
+ [update.id])
+ except Exception:
+ msg = _("Failed to fetch router information for '%s'")
+ LOG.exception(msg, update.id)
+ self.fullsync = True
+ continue
+
+ if routers:
+ router = routers[0]
+
+ if not router:
+ self._router_removed(update.id)
+ continue
+
+ self._process_routers([router])
+ LOG.debug("Finished a router update for %s", update.id)
+ rp.fetched_and_processed(update.timestamp)
+
+ def _process_routers_loop(self):
+ LOG.debug("Starting _process_routers_loop")
+ pool = eventlet.GreenPool(size=8)
+ while True:
+ pool.spawn_n(self._process_router_update)
def _process_router_delete(self):
current_removed_routers = list(self.removed_routers)
return [self.conf.router_id]
@periodic_task.periodic_task
- @lockutils.synchronized('l3-agent', 'neutron-')
def periodic_sync_routers_task(self, context):
self._sync_routers_task(context)
self.fullsync)
if not self.fullsync:
return
+
+ # Capture a picture of namespaces *before* fetching the full list from
+ # the database. This is important to correctly identify stale ones.
+ namespaces = set()
+ if self._clean_stale_namespaces:
+ namespaces = self._list_namespaces()
+ prev_router_ids = set(self.router_info)
+
try:
router_ids = self._router_ids()
self.updated_routers.clear()
self.removed_routers.clear()
+ timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(
context, router_ids)
LOG.debug(_('Processing :%r'), routers)
- self._process_routers(routers, all_routers=True)
+ for r in routers:
+ update = RouterUpdate(r['id'],
+ PRIORITY_SYNC_ROUTERS_TASK,
+ router=r,
+ timestamp=timestamp)
+ self._queue.add(update)
self.fullsync = False
LOG.debug(_("_sync_routers_task successfully completed"))
except n_rpc.RPCException:
self.fullsync = True
else:
# Resync is not necessary for the cleanup of stale namespaces
+ curr_router_ids = set([r['id'] for r in routers])
+
+ # Two kinds of stale routers: Routers for which info is cached in
+ # self.router_info and the others. First, handle the former.
+ for router_id in prev_router_ids - curr_router_ids:
+ update = RouterUpdate(router_id,
+ PRIORITY_SYNC_ROUTERS_TASK,
+ timestamp=timestamp,
+ action=DELETE_ROUTER)
+ self._queue.add(update)
+
+ # Next, one effort to clean out namespaces for which we don't have
+ # a record. (i.e. _clean_stale_namespaces=False after one pass)
if self._clean_stale_namespaces:
- self._cleanup_namespaces(routers)
+ ids_to_keep = curr_router_ids | prev_router_ids
+ self._cleanup_namespaces(namespaces, ids_to_keep)
def after_start(self):
+ eventlet.spawn_n(self._process_routers_loop)
LOG.info(_("L3 agent started"))
def _update_routing_table(self, ri, operation, route):
import contextlib
import copy
+import datetime
import mock
import netaddr
_uuid = uuidutils.generate_uuid
HOSTNAME = 'myhost'
FAKE_ID = _uuid()
+FAKE_ID_2 = _uuid()
+
+
+class TestExclusiveRouterProcessor(base.BaseTestCase):
+ def setUp(self):
+ super(TestExclusiveRouterProcessor, self).setUp()
+
+ def test_i_am_master(self):
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
+ not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
+
+ self.assertTrue(master._i_am_master())
+ self.assertFalse(not_master._i_am_master())
+ self.assertTrue(master_2._i_am_master())
+ self.assertFalse(not_master_2._i_am_master())
+
+ master.__exit__(None, None, None)
+ master_2.__exit__(None, None, None)
+
+ def test_master(self):
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
+ not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
+
+ self.assertEqual(master._master, master)
+ self.assertEqual(not_master._master, master)
+ self.assertEqual(master_2._master, master_2)
+ self.assertEqual(not_master_2._master, master_2)
+
+ master.__exit__(None, None, None)
+ master_2.__exit__(None, None, None)
+
+ def test__enter__(self):
+ self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ master.__enter__()
+ self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
+ master.__exit__(None, None, None)
+
+ def test__exit__(self):
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ master.__enter__()
+ self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
+ not_master.__enter__()
+ not_master.__exit__(None, None, None)
+ self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
+ master.__exit__(None, None, None)
+ self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
+
+ def test_data_fetched_since(self):
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ self.assertEqual(master._get_router_data_timestamp(),
+ datetime.datetime.min)
+
+ ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
+ ts2 = datetime.datetime.utcnow()
+
+ master.fetched_and_processed(ts2)
+ self.assertEqual(master._get_router_data_timestamp(), ts2)
+ master.fetched_and_processed(ts1)
+ self.assertEqual(master._get_router_data_timestamp(), ts2)
+
+ master.__exit__(None, None, None)
+
+ def test_updates(self):
+ master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+ not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
+
+ master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
+ not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
+
+ for update in not_master.updates():
+ raise Exception("Only the master should process a router")
+
+ self.assertEqual(2, len([i for i in master.updates()]))
class TestBasicRouterOperations(base.BaseTestCase):
def test__sync_routers_task_call_clean_stale_namespaces(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- self.plugin_api.get_routers.return_value = mock.ANY
+ self.plugin_api.get_routers.return_value = []
with mock.patch.object(agent, '_cleanup_namespaces') as f:
- with mock.patch.object(agent, '_process_routers') as g:
- agent._sync_routers_task(agent.context)
+ agent._sync_routers_task(agent.context)
self.assertTrue(f.called)
- g.assert_called_with(mock.ANY, all_routers=True)
def test_router_info_create(self):
id = _uuid()
def test_router_deleted(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent._queue = mock.Mock()
agent.router_deleted(None, FAKE_ID)
- # verify that will set fullsync
- self.assertIn(FAKE_ID, agent.removed_routers)
+ agent._queue.add.assert_called_once()
def test_routers_updated(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent._queue = mock.Mock()
agent.routers_updated(None, [FAKE_ID])
- # verify that will set fullsync
- self.assertIn(FAKE_ID, agent.updated_routers)
+ agent._queue.add.assert_called_once()
def test_removed_from_agent(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent._queue = mock.Mock()
agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
- # verify that will set fullsync
- self.assertIn(FAKE_ID, agent.removed_routers)
+ agent._queue.add.assert_called_once()
def test_added_to_agent(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent._queue = mock.Mock()
agent.router_added_to_agent(None, [FAKE_ID])
- # verify that will set fullsync
- self.assertIn(FAKE_ID, agent.updated_routers)
+ agent._queue.add.assert_called_once()
def test_process_router_delete(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
pm.reset_mock()
agent._destroy_router_namespace = mock.MagicMock()
- agent._cleanup_namespaces(router_list)
+ ns_list = agent._list_namespaces()
+ agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list])
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
self.assertEqual(agent._destroy_router_namespace.call_count,