self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
- self.updated_routers = set()
- self.removed_routers = set()
self.sync_progress = False
# Get the list of service plugins from Neutron Server
while True:
pool.spawn_n(self._process_router_update)
- def _process_router_delete(self):
- current_removed_routers = list(self.removed_routers)
- for router_id in current_removed_routers:
- self._router_removed(router_id)
- self.removed_routers.remove(router_id)
-
def _router_ids(self):
if not self.conf.use_namespaces:
return [self.conf.router_id]
try:
router_ids = self._router_ids()
- self.updated_routers.clear()
- self.removed_routers.clear()
timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(
context, router_ids)
agent.router_added_to_agent(None, [FAKE_ID])
agent._queue.add.assert_called_once()
- def test_process_router_delete(self):
- agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- ex_gw_port = {'id': _uuid(),
- 'network_id': _uuid(),
- 'fixed_ips': [{'ip_address': '19.4.4.4',
- 'subnet_id': _uuid()}],
- 'subnet': {'cidr': '19.4.4.0/24',
- 'gateway_ip': '19.4.4.1'}}
- router = {
- 'id': _uuid(),
- 'enable_snat': True,
- 'routes': [],
- 'gw_port': ex_gw_port}
- router['distributed'] = False
- agent._router_added(router['id'], router)
- agent.router_deleted(None, router['id'])
- agent._process_router_delete()
- self.assertFalse(list(agent.removed_routers))
-
def test_destroy_fip_namespace(self):
class FakeDev(object):
def __init__(self, name):