#
import eventlet
-from eventlet import semaphore
import netaddr
from oslo.config import cfg
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
+from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
NS_PREFIX = 'qrouter-'
INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
+RPC_LOOP_INTERVAL = 1
class L3PluginApi(proxy.RpcProxy):
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.host = host
- def get_routers(self, context, fullsync=True, router_id=None):
+ def get_routers(self, context, fullsync=True, router_ids=None):
"""Make a remote process call to retrieve the sync data for routers."""
- router_ids = [router_id] if router_id else None
return self.call(context,
self.make_msg('sync_routers', host=self.host,
fullsync=fullsync,
class L3NATAgent(manager.Manager):
+ """Manager for L3NatAgent
+
+ API version history:
+ 1.0 initial Version
+ 1.1 changed the type of the routers parameter
+ to the routers_updated method.
+ It was previously a list of routers in dict format.
+ It is now a list of router IDs only.
+ Per rpc versioning rules, it is backwards compatible.
+ """
+ RPC_API_VERSION = '1.1'
OPTS = [
cfg.StrOpt('external_network_bridge', default='br-ex',
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.fullsync = True
- self.sync_sem = semaphore.Semaphore(1)
+ self.updated_routers = set()
+ self.removed_routers = set()
+ self.sync_progress = False
if self.conf.use_namespaces:
self._destroy_router_namespaces(self.conf.router_id)
+
+ self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
+ self._rpc_loop)
+ self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
super(L3NATAgent, self).__init__(host=self.conf.host)
def _destroy_router_namespaces(self, only_router_id=None):
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
def process_router(self, ri):
+ ri.iptables_manager.defer_apply_on()
ex_gw_port = self._get_ex_gw_port(ri)
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
existing_port_ids = set([p['id'] for p in ri.internal_ports])
ri.ex_gw_port = ex_gw_port
ri.enable_snat = ri.router.get('enable_snat')
self.routes_updated(ri)
+ ri.iptables_manager.defer_apply_off()
def _handle_router_snat_rules(self, ri, ex_gw_port, internal_cidrs,
interface_name, action):
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
- with self.sync_sem:
- if router_id in self.router_info:
- try:
- self._router_removed(router_id)
- except Exception:
- msg = _("Failed dealing with router "
- "'%s' deletion RPC message")
- LOG.debug(msg, router_id)
- self.fullsync = True
+ LOG.debug(_('Got router deleted notification for %s'), router_id)
+ self.removed_routers.add(router_id)
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
- if not routers:
- return
- with self.sync_sem:
- try:
- self._process_routers(routers)
- except Exception:
- msg = _("Failed dealing with routers update RPC message")
- LOG.debug(msg)
- self.fullsync = True
+ LOG.debug(_('Got routers updated notification :%s'), routers)
+ if routers:
+ # This is needed for backward compatiblity
+ if isinstance(routers[0], dict):
+ routers = [router['id'] for router in routers]
+ self.updated_routers.update(routers)
def router_removed_from_agent(self, context, payload):
- self.router_deleted(context, payload['router_id'])
+ LOG.debug(_('Got router removed from agent :%r'), payload)
+ self.removed_routers.add(payload['router_id'])
def router_added_to_agent(self, context, payload):
+ LOG.debug(_('Got router added to agent :%r'), payload)
self.routers_updated(context, payload)
def _process_routers(self, routers, all_routers=False):
+ pool = eventlet.GreenPool()
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("The external network bridge '%s' does not exist"),
self._router_added(r['id'], r)
ri = self.router_info[r['id']]
ri.router = r
- self.process_router(ri)
+ pool.spawn_n(self.process_router, ri)
# identify and remove routers that no longer exist
for router_id in prev_router_ids - cur_router_ids:
- self._router_removed(router_id)
+ pool.spawn_n(self._router_removed, router_id)
+ pool.waitall()
+
+ @lockutils.synchronized('l3-agent', 'neutron-')
+ def _rpc_loop(self):
+ # _rpc_loop and _sync_routers_task will not be
+ # executed in the same time because of lock.
+ # so we can clear the value of updated_routers
+ # and removed_routers
+ try:
+ if self.updated_routers:
+ router_ids = list(self.updated_routers)
+ self.updated_routers.clear()
+ routers = self.plugin_rpc.get_routers(
+ self.context, router_ids)
+ self._process_routers(routers)
+ self._process_router_delete()
+ except Exception:
+ LOG.exception(_("Failed synchronizing routers"))
+ self.fullsync = True
+
+ def _process_router_delete(self):
+ current_removed_routers = list(self.removed_routers)
+ for router_id in current_removed_routers:
+ self._router_removed(context, router_id)
+ self.removed_routers.remove(router_id)
+
+ def _router_ids(self):
+ if not self.conf.use_namespaces:
+ return [self.conf.router_id]
@periodic_task.periodic_task
+ @lockutils.synchronized('l3-agent', 'neutron-')
def _sync_routers_task(self, context):
- # we need to sync with router deletion RPC message
- with self.sync_sem:
- if self.fullsync:
- try:
- if not self.conf.use_namespaces:
- router_id = self.conf.router_id
- else:
- router_id = None
- routers = self.plugin_rpc.get_routers(
- context, router_id)
- self._process_routers(routers, all_routers=True)
- self.fullsync = False
- except Exception:
- LOG.exception(_("Failed synchronizing routers"))
- self.fullsync = True
+ if not self.fullsync:
+ return
+ try:
+ router_ids = self._router_ids()
+ self.updated_routers.clear()
+ self.removed_routers.clear()
+ routers = self.plugin_rpc.get_routers(
+ context, router_ids)
+
+ LOG.debug(_('Processing :%r'), routers)
+ self._process_routers(routers, all_routers=True)
+ self.fullsync = False
+ except Exception:
+ LOG.exception(_("Failed synchronizing routers"))
+ self.fullsync = True
def after_start(self):
LOG.info(_("L3 agent started"))
payload=payload),
topic='%s.%s' % (topics.L3_AGENT, host))
- def _agent_notification(self, context, method, routers,
+ def _agent_notification(self, context, method, router_ids,
operation, data):
"""Notify changed routers to hosting l3 agents."""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.NeutronManager.get_plugin()
- for router in routers:
+ for router_id in router_ids:
l3_agents = plugin.get_l3_agents_hosting_routers(
- adminContext, [router['id']],
+ adminContext, [router_id],
admin_state_up=True,
active=True)
for l3_agent in l3_agents:
'method': method})
self.cast(
context, self.make_msg(method,
- routers=[router]),
- topic='%s.%s' % (l3_agent.topic, l3_agent.host))
+ routers=[router_id]),
+ topic='%s.%s' % (l3_agent.topic, l3_agent.host),
+ version='1.1')
- def _notification(self, context, method, routers, operation, data):
+ def _notification(self, context, method, router_ids, operation, data):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_plugin()
if utils.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
adminContext = (context.is_admin and
context or context.elevated())
- plugin.schedule_routers(adminContext, routers)
+ plugin.schedule_routers(adminContext, router_ids)
self._agent_notification(
- context, method, routers, operation, data)
+ context, method, router_ids, operation, data)
else:
self.fanout_cast(
context, self.make_msg(method,
- routers=routers),
+ routers=router_ids),
topic=topics.L3_AGENT)
def _notification_fanout(self, context, method, router_id):
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)
- def routers_updated(self, context, routers, operation=None, data=None):
- if routers:
- self._notification(context, 'routers_updated', routers,
+ def routers_updated(self, context, router_ids, operation=None, data=None):
+ if router_ids:
+ self._notification(context, 'routers_updated', router_ids,
operation, data)
def router_removed_from_agent(self, context, router_id, host):
self._notification_host(context, 'router_removed_from_agent',
{'router_id': router_id}, host)
- def router_added_to_agent(self, context, routers, host):
+ def router_added_to_agent(self, context, router_ids, host):
self._notification_host(context, 'router_added_to_agent',
- routers, host)
+ router_ids, host)
L3AgentNotify = L3AgentNotifyAPI()
router_id=router_id, agent_id=id)
if self.l3_agent_notifier:
- routers = self.get_sync_data(context, [router_id])
self.l3_agent_notifier.router_added_to_agent(
- context, routers, agent_db.host)
+ context, [router_id], agent_db.host)
def remove_router_from_l3_agent(self, context, id, router_id):
"""Remove the router from l3 agent.
# Ensure we actually have something to update
if r.keys():
router_db.update(r)
- routers = self.get_sync_data(context.elevated(),
- [router_db['id']])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, [router_db['id']])
return self._make_router_dict(router_db)
def _create_router_gw_port(self, context, router, network_id):
raise q_exc.BadRequest(resource='router', msg=msg)
if 'port_id' in interface_info:
- if 'subnet_id' in interface_info:
- msg = _("Cannot specify both subnet-id and port-id")
- raise q_exc.BadRequest(resource='router', msg=msg)
-
- port = self._get_port(context, interface_info['port_id'])
- if port['device_id']:
- raise q_exc.PortInUse(net_id=port['network_id'],
- port_id=port['id'],
- device_id=port['device_id'])
- fixed_ips = [ip for ip in port['fixed_ips']]
- if len(fixed_ips) != 1:
- msg = _('Router port must have exactly one fixed IP')
- raise q_exc.BadRequest(resource='router', msg=msg)
- subnet_id = fixed_ips[0]['subnet_id']
- subnet = self._get_subnet(context, subnet_id)
- self._check_for_dup_router_subnet(context, router_id,
- port['network_id'],
- subnet['id'],
- subnet['cidr'])
- port.update({'device_id': router_id,
- 'device_owner': DEVICE_OWNER_ROUTER_INTF})
+ # make sure port update is committed
+ with context.session.begin(subtransactions=True):
+ if 'subnet_id' in interface_info:
+ msg = _("Cannot specify both subnet-id and port-id")
+ raise q_exc.BadRequest(resource='router', msg=msg)
+
+ port = self._get_port(context, interface_info['port_id'])
+ if port['device_id']:
+ raise q_exc.PortInUse(net_id=port['network_id'],
+ port_id=port['id'],
+ device_id=port['device_id'])
+ fixed_ips = [ip for ip in port['fixed_ips']]
+ if len(fixed_ips) != 1:
+ msg = _('Router port must have exactly one fixed IP')
+ raise q_exc.BadRequest(resource='router', msg=msg)
+ subnet_id = fixed_ips[0]['subnet_id']
+ subnet = self._get_subnet(context, subnet_id)
+ self._check_for_dup_router_subnet(context, router_id,
+ port['network_id'],
+ subnet['id'],
+ subnet['cidr'])
+ port.update({'device_id': router_id,
+ 'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
subnet = self._get_subnet(context, subnet_id)
'device_owner': DEVICE_OWNER_ROUTER_INTF,
'name': ''}})
- routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(
- context, routers, 'add_router_interface',
- {'network_id': port['network_id'],
- 'subnet_id': subnet_id})
+ context, [router_id], 'add_router_interface')
info = {'id': router_id,
'tenant_id': subnet['tenant_id'],
'port_id': port['id'],
subnet = self._get_subnet(context, subnet_id)
self._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
- _network_id = port_db['network_id']
self.delete_port(context, port_db['id'], l3_port_check=False)
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
port_id = p['id']
- _network_id = p['network_id']
self.delete_port(context, p['id'], l3_port_check=False)
found = True
break
if not found:
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
subnet_id=subnet_id)
- routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(
- context, routers, 'remove_router_interface',
- {'network_id': _network_id,
- 'subnet_id': subnet_id})
+ context, [router_id], 'remove_router_interface')
info = {'id': router_id,
'tenant_id': subnet['tenant_id'],
'port_id': port_id,
router_id = floatingip_db['router_id']
if router_id:
- routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
- 'create_floatingip')
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, [router_id],
+ 'create_floatingip')
return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip):
if router_id and router_id != before_router_id:
router_ids.append(router_id)
if router_ids:
- routers = self.get_sync_data(context.elevated(), router_ids)
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(context, router_ids,
'update_floatingip')
return self._make_floatingip_dict(floatingip_db)
floatingip['floating_port_id'],
l3_port_check=False)
if router_id:
- routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
- 'delete_floatingip')
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, [router_id],
+ 'delete_floatingip')
def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id)
raise Exception(_('Multiple floating IPs found for port %s')
% port_id)
if router_id:
- routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, [router_id])
def _network_is_external(self, context, net_id):
try:
context.session.add(binding)
return True
- def schedule(self, plugin, context, sync_router):
+ def schedule(self, plugin, context, router_id):
"""Schedule the router to an active L3 agent if there
is no enable L3 agent hosting it.
"""
# timing problem. Non-active l3 agent can return to
# active any time
l3_agents = plugin.get_l3_agents_hosting_routers(
- context, [sync_router['id']], admin_state_up=True)
+ context, [router_id], admin_state_up=True)
if l3_agents:
LOG.debug(_('Router %(router_id)s has already been hosted'
' by L3 agent %(agent_id)s'),
- {'router_id': sync_router['id'],
+ {'router_id': router_id,
'agent_id': l3_agents[0]['id']})
return
+ sync_router = plugin.get_router(context, router_id)
active_l3_agents = plugin.get_l3_agents(context, active=True)
if not active_l3_agents:
LOG.warn(_('No active L3 agents'))
L3_HOSTA)
self._add_router_to_l3_agent(hosta_id,
router1['router']['id'])
- routers = plugin.get_sync_data(self.adminContext,
- [router1['router']['id']])
+ routers = [router1['router']['id']]
mock_l3.assert_called_with(
mock.ANY,
plugin.l3_agent_notifier.make_msg(
_uuid = uuidutils.generate_uuid
HOSTNAME = 'myhost'
+FAKE_ID = _uuid()
class TestBasicRouterOperations(base.BaseTestCase):
agent._process_routers(routers)
self.assertNotIn(routers[0]['id'], agent.router_info)
- def testSingleLoopRouterRemoval(self):
+ def test_router_deleted(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- self.plugin_api.get_external_network_id.return_value = None
- routers = [
- {'id': _uuid(),
- 'admin_state_up': True,
- 'routes': [],
- 'external_gateway_info': {}}]
- agent._process_routers(routers)
+ agent.router_deleted(None, FAKE_ID)
+ # verify that will set fullsync
+ self.assertTrue(FAKE_ID in agent.removed_routers)
- agent.router_deleted(None, routers[0]['id'])
- # verify that remove is called
- self.assertEqual(self.mock_ip.get_devices.call_count, 1)
+ def test_routers_updated(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.routers_updated(None, [FAKE_ID])
+ # verify that will set fullsync
+ self.assertTrue(FAKE_ID in agent.updated_routers)
- self.device_exists.assert_has_calls(
- [mock.call(self.conf.external_network_bridge)])
+ def test_removed_from_agent(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
+ # verify that will set fullsync
+ self.assertTrue(FAKE_ID in agent.removed_routers)
+
+ def test_added_to_agent(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.router_added_to_agent(None, [FAKE_ID])
+ # verify that will set fullsync
+ self.assertTrue(FAKE_ID in agent.updated_routers)
def testDestroyNamespace(self):