from neutron.agent.l3 import router_processing_queue as queue
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
-from neutron.agent.linux import ra
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as l3_constants
from neutron.common import ipv6_utils
from neutron.common import rpc as n_rpc
from neutron.common import topics
-from neutron.common import utils as common_utils
from neutron import context as n_context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
return dvr_router.DvrRouter(*args, **kwargs)
if router.get('ha'):
+ kwargs['state_change_callback'] = self.enqueue_state_change
return ha_router.HaRouter(*args, **kwargs)
return legacy_router.LegacyRouter(*args, **kwargs)
def _router_added(self, router_id, router):
ri = self._create_router(router_id, router)
- ri.radvd = ra.DaemonMonitor(router['id'],
- ri.ns_name,
- self.process_monitor,
- ri.get_internal_device_name)
self.event_observers.notify(
adv_svc.AdvancedService.before_router_added, ri)
self.router_info[router_id] = ri
- ri.create()
- self.process_router_add(ri)
- if ri.is_ha:
- ri.initialize(self.process_monitor, self.enqueue_state_change)
+ ri.initialize(self.process_monitor)
+
+ # TODO(Carl) This is a hook in to fwaas. It should be cleaned up.
+ self.process_router_add(ri)
def _router_removed(self, router_id):
ri = self.router_info.get(router_id)
self.event_observers.notify(
adv_svc.AdvancedService.before_router_removed, ri)
- if ri.is_ha:
- ri.terminate(self.process_monitor)
-
- ri.router['gw_port'] = None
- ri.router[l3_constants.INTERFACE_KEY] = []
- ri.router[l3_constants.FLOATINGIP_KEY] = []
- self.process_router(ri)
+ ri.delete(self)
del self.router_info[router_id]
- ri.delete()
+
self.event_observers.notify(
adv_svc.AdvancedService.after_router_removed, ri)
self.plugin_rpc.update_floatingip_statuses(
self.context, ri.router_id, fip_statuses)
- @common_utils.exception_logger()
- def process_router(self, ri):
- # TODO(mrsmith) - we shouldn't need to check here
- if 'distributed' not in ri.router:
- ri.router['distributed'] = False
- ex_gw_port = ri.get_ex_gw_port()
- if ri.router.get('distributed') and ex_gw_port:
- ri.fip_ns = self.get_fip_ns(ex_gw_port['network_id'])
- ri.fip_ns.scan_fip_ports(ri)
- ri._process_internal_ports()
- ri.process_external(self)
- # Process static routes for router
- ri.routes_updated()
-
- # If process_router was called during a create or update
- if ri.is_ha and ri.ha_port:
- ri.enable_keepalived()
-
- # Update ex_gw_port and enable_snat on the router info cache
- ri.ex_gw_port = ex_gw_port
- ri.snat_ports = ri.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
- ri.enable_snat = ri.router.get('enable_snat')
-
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug('Got router deleted notification for %s', router_id)
self._process_updated_router(router)
def _process_added_router(self, router):
- # TODO(pcm): Next refactoring will rework this logic
self._router_added(router['id'], router)
ri = self.router_info[router['id']]
ri.router = router
- self.process_router(ri)
+ ri.process(self)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_added, ri)
def _process_updated_router(self, router):
- # TODO(pcm): Next refactoring will rework this logic
ri = self.router_info[router['id']]
ri.router = router
self.event_observers.notify(
adv_svc.AdvancedService.before_router_updated, ri)
- self.process_router(ri)
+ ri.process(self)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_updated, ri)
# kicks the FW Agent to add rules for the IR namespace if
# configured
self.agent.process_router_add(self)
+
+ def process(self, agent):
+ ex_gw_port = self.get_ex_gw_port()
+ if ex_gw_port:
+ self.fip_ns = agent.get_fip_ns(ex_gw_port['network_id'])
+ self.fip_ns.scan_fip_ports(self)
+
+ super(DvrRouter, self).process(agent)
class HaRouter(router.RouterInfo):
- def __init__(self, *args, **kwargs):
+ def __init__(self, state_change_callback, *args, **kwargs):
super(HaRouter, self).__init__(*args, **kwargs)
self.ha_port = None
self.keepalived_manager = None
+ self.state_change_callback = state_change_callback
@property
def is_ha(self):
LOG.error(_LE('Error while writing HA state for %s'),
self.router_id)
- def initialize(self, process_monitor, state_change_callback):
+ def initialize(self, process_monitor):
+ super(HaRouter, self).initialize(process_monitor)
ha_port = self.router.get(n_consts.HA_INTERFACE_KEY)
if not ha_port:
LOG.error(_LE('Unable to process HA router %s without HA port'),
self.ha_port = ha_port
self._init_keepalived_manager(process_monitor)
self.ha_network_added()
- self.update_initial_state(state_change_callback)
+ self.update_initial_state(self.state_change_callback)
self.spawn_state_change_monitor(process_monitor)
- def terminate(self, process_monitor):
- self.destroy_state_change_monitor(process_monitor)
- self.ha_network_removed()
- self.disable_keepalived()
-
def _init_keepalived_manager(self, process_monitor):
self.keepalived_manager = keepalived.KeepalivedManager(
self.router['id'],
super(HaRouter, self).external_gateway_removed(ex_gw_port,
interface_name)
+
+ def delete(self, agent):
+ self.destroy_state_change_monitor(self.process_monitor)
+ self.ha_network_removed()
+ self.disable_keepalived()
+ super(HaRouter, self).delete(agent)
+
+ def process(self, agent):
+ super(HaRouter, self).process(agent)
+
+ if self.ha_port:
+ self.enable_keepalived()
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
+from neutron.agent.linux import ra
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import utils as common_utils
# radvd is a neutron.agent.linux.ra.DaemonMonitor
self.radvd = None
+ def initialize(self, process_monitor):
+ """Initialize the router on the system.
+
+ This differs from __init__ in that this method actually affects the
+ system creating namespaces, starting processes, etc. The other merely
+ initializes the python object. This separates in-memory object
+ initialization from methods that actually go do stuff to the system.
+
+ :param process_monitor: The agent's process monitor instance.
+ """
+ self.process_monitor = process_monitor
+ self.radvd = ra.DaemonMonitor(self.router_id,
+ self.ns_name,
+ process_monitor,
+ self.get_internal_device_name)
+
+ if self.router_namespace:
+ self.router_namespace.create()
+
@property
def router(self):
return self._router
fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR
return fip_statuses
- def create(self):
- if self.router_namespace:
- self.router_namespace.create()
-
- def delete(self):
+ def delete(self, agent):
+ self.router['gw_port'] = None
+ self.router[l3_constants.INTERFACE_KEY] = []
+ self.router[l3_constants.FLOATINGIP_KEY] = []
+ self.process(agent)
self.radvd.disable()
if self.router_namespace:
self.router_namespace.delete()
fip_statuses = self.put_fips_in_error_state()
agent.update_fip_statuses(self, existing_floating_ips, fip_statuses)
+
+ @common_utils.exception_logger()
+ def process(self, agent):
+ """Process updates to this router
+
+ This method is the point where the agent requests that updates be
+ applied to this router.
+
+ :param agent: Passes the agent in order to send RPC messages.
+ """
+ self._process_internal_ports()
+ self.process_external(agent)
+ # Process static routes for router
+ self.routes_updated()
+
+ # Update ex_gw_port and enable_snat on the router info cache
+ self.ex_gw_port = self.get_ex_gw_port()
+ self.snat_ports = self.router.get(
+ l3_constants.SNAT_ROUTER_INTF_KEY, [])
+ self.enable_snat = self.router.get('enable_snat')
clean_fips(router)
self._add_fip(router, client_address, fixed_address=server_address)
- self.agent.process_router(router)
+ router.process(self.agent)
router_ns = ip_lib.IPWrapper(namespace=router.ns_name)
netcat = helpers.NetcatTester(router_ns, router_ns,
assert_num_of_conntrack_rules(1)
clean_fips(router)
- self.agent.process_router(router)
+ router.process(self.agent)
assert_num_of_conntrack_rules(0)
with testtools.ExpectedException(RuntimeError):
router.router['gw_port']['subnet']['gateway_ip'] = '19.4.4.5'
router.router['gw_port']['fixed_ips'][0]['ip_address'] = '19.4.4.10'
- self.agent.process_router(router)
+ router.process(self.agent)
# Get the updated configuration and assert that both FIPs are in,
# and that the GW IP address was updated.
# NOTE The use_namespaces config will soon be deprecated
self.agent_conf.use_namespaces = True
self.router_id = _uuid()
- return ha_router.HaRouter(self.router_id,
+ return ha_router.HaRouter(mock.sentinel.enqueue_state,
+ self.router_id,
router,
self.agent_conf,
mock.sentinel.driver,
ri.ns_name,
agent.process_monitor,
ri.get_internal_device_name)
- agent.process_router(ri)
+ ri.process(agent)
class TestBasicRouterOperations(BasicRouterOperationsFramework):
'fixed_ip_address': '7.7.7.7',
'port_id': _uuid(),
'host': HOSTNAME}]}
- agent.process_router(ri)
+ ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8'
router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips']
- agent.process_router(ri)
+ ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
['fixed_ips'][0]['ip_address']))
ri.router['gw_port']['fixed_ips'][0]['ip_address'] = str(old_ip + 1)
- agent.process_router(ri)
+ ri.process(agent)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.reset_mock()
self.assertEqual(ri.external_gateway_added.call_count, 0)
# remove just the floating ips
del router[l3_constants.FLOATINGIP_KEY]
- agent.process_router(ri)
+ ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
# now no ports so state is torn down
del router[l3_constants.INTERFACE_KEY]
del router['gw_port']
- agent.process_router(ri)
+ ri.process(agent)
self.assertEqual(self.send_arp.call_count, 1)
distributed = ri.router.get('distributed', False)
self.assertEqual(ri.process_floating_ip_addresses.called,
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
- agent.process_router(ri)
+ ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Reprocess without NAT
router['enable_snat'] = False
# Reassign the router object to RouterInfo
ri.router = router
- agent.process_router(ri)
+ ri.process(agent)
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in orig_nat_rules
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process without NAT
- agent.process_router(ri)
+ ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Reprocess with NAT
router['enable_snat'] = True
# Reassign the router object to RouterInfo
ri.router = router
- agent.process_router(ri)
+ ri.process(agent)
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
- agent.process_router(ri)
+ ri.process(agent)
# Add an interface and reprocess
router_append_interface(router)
# Reassign the router object to RouterInfo
ri.router = router
- agent.process_router(ri)
- # send_arp is called both times process_router is called
+ ri.process(agent)
+ # send_arp is called both times process is called
self.assertEqual(self.send_arp.call_count, 2)
def test_process_ipv6_only_gw(self):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
- agent.process_router(ri)
+ ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv6 interface and reprocess
router_append_interface(router, count=1, ip_version=6, ra_mode=ra_mode,
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
- agent.process_router(ri)
+ ri.process(agent)
# Add an IPv4 and IPv6 interface and reprocess
router_append_interface(router, count=1, ip_version=4)
router_append_interface(router, count=1, ip_version=6)
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
- agent.process_router(ri)
+ ri.process(agent)
# Add an interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
# Reassign the router object to RouterInfo
ri.router = router
- agent.process_router(ri)
- # send_arp is called both times process_router is called
+ ri.process(agent)
+ # send_arp is called both times process is called
self.assertEqual(self.send_arp.call_count, 2)
def test_process_router_ipv6_interface_removed(self):
# raise RuntimeError to simulate that an unexpected exception
# occurs
internal_network_added.side_effect = RuntimeError
- self.assertRaises(RuntimeError, agent.process_router, ri)
+ self.assertRaises(RuntimeError, ri.process, agent)
self.assertNotIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
# periodic_sync_routers_task finds out that _rpc_loop failed to
# process the router last time, it will retry in the next run.
- agent.process_router(ri)
+ ri.process(agent)
# We were able to add the port to ri.internal_ports
self.assertIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# add an internal port
- agent.process_router(ri)
+ ri.process(agent)
with mock.patch.object(
ri,
internal_net_removed.side_effect = RuntimeError
ri.internal_ports[0]['admin_state_up'] = False
# The above port is set to down state, remove it.
- self.assertRaises(RuntimeError, agent.process_router, ri)
+ self.assertRaises(RuntimeError, ri.process, agent)
self.assertIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
# periodic_sync_routers_task finds out that _rpc_loop failed to
# process the router last time, it will retry in the next run.
- agent.process_router(ri)
+ ri.process(agent)
# We were able to remove the port from ri.internal_ports
self.assertNotIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
router,
**self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
- agent.process_router(ri)
+ ri.process(agent)
# Assess the call for putting the floating IP up was performed
mock_update_fip_status.assert_called_once_with(
mock.ANY, ri.router_id,
# Process the router again, this time without floating IPs
router[l3_constants.FLOATINGIP_KEY] = []
ri.router = router
- agent.process_router(ri)
+ ri.process(agent)
# Assess the call for putting the floating IP up was performed
mock_update_fip_status.assert_called_once_with(
mock.ANY, ri.router_id,
ri.process_floating_ip_addresses = mock.Mock(
side_effect=RuntimeError)
ri.external_gateway_added = mock.Mock()
- agent.process_router(ri)
+ ri.process(agent)
# Assess the call for putting the floating IP into Error
# was performed
mock_update_fip_status.assert_called_once_with(
external_gateway_removed,
external_gateway_added):
- agent.process_router(ri)
+ ri.process(agent)
self.assertEqual(external_gateway_added.call_count, 1)
self.assertFalse(external_gateway_removed.called)
self.mock_ip.get_devices.return_value = stale_devlist
- agent.process_router(ri)
+ ri.process(agent)
self.mock_driver.unplug.assert_called_with(
stale_devnames[0],