From: Carl Baldwin Date: Tue, 10 Mar 2015 23:12:51 +0000 (+0000) Subject: Move final remnants of router processing to router classes X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=8a93a0665b42d2d2f86bbd8d340398629b076cd7;p=openstack-build%2Fneutron-build.git Move final remnants of router processing to router classes Change-Id: I467bb680666ec9bc82e55cfe534d74db29009cce Partially-Implements: bp/restructure-l3-agent --- diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index 6797ef270..4160821f5 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -32,7 +32,6 @@ from neutron.agent.l3 import namespaces from neutron.agent.l3 import router_processing_queue as queue from neutron.agent.linux import external_process from neutron.agent.linux import ip_lib -from neutron.agent.linux import ra from neutron.agent.metadata import driver as metadata_driver from neutron.agent import rpc as agent_rpc from neutron.common import constants as l3_constants @@ -40,7 +39,6 @@ from neutron.common import exceptions as n_exc from neutron.common import ipv6_utils from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.common import utils as common_utils from neutron import context as n_context from neutron.i18n import _LE, _LI, _LW from neutron import manager @@ -288,25 +286,22 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, return dvr_router.DvrRouter(*args, **kwargs) if router.get('ha'): + kwargs['state_change_callback'] = self.enqueue_state_change return ha_router.HaRouter(*args, **kwargs) return legacy_router.LegacyRouter(*args, **kwargs) def _router_added(self, router_id, router): ri = self._create_router(router_id, router) - ri.radvd = ra.DaemonMonitor(router['id'], - ri.ns_name, - self.process_monitor, - ri.get_internal_device_name) self.event_observers.notify( adv_svc.AdvancedService.before_router_added, ri) self.router_info[router_id] = ri - ri.create() - self.process_router_add(ri) - if ri.is_ha: - ri.initialize(self.process_monitor, self.enqueue_state_change) + ri.initialize(self.process_monitor) + + # TODO(Carl) This is a hook in to fwaas. It should be cleaned up. + self.process_router_add(ri) def _router_removed(self, router_id): ri = self.router_info.get(router_id) @@ -318,15 +313,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, self.event_observers.notify( adv_svc.AdvancedService.before_router_removed, ri) - if ri.is_ha: - ri.terminate(self.process_monitor) - - ri.router['gw_port'] = None - ri.router[l3_constants.INTERFACE_KEY] = [] - ri.router[l3_constants.FLOATINGIP_KEY] = [] - self.process_router(ri) + ri.delete(self) del self.router_info[router_id] - ri.delete() + self.event_observers.notify( adv_svc.AdvancedService.after_router_removed, ri) @@ -340,29 +329,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, self.plugin_rpc.update_floatingip_statuses( self.context, ri.router_id, fip_statuses) - @common_utils.exception_logger() - def process_router(self, ri): - # TODO(mrsmith) - we shouldn't need to check here - if 'distributed' not in ri.router: - ri.router['distributed'] = False - ex_gw_port = ri.get_ex_gw_port() - if ri.router.get('distributed') and ex_gw_port: - ri.fip_ns = self.get_fip_ns(ex_gw_port['network_id']) - ri.fip_ns.scan_fip_ports(ri) - ri._process_internal_ports() - ri.process_external(self) - # Process static routes for router - ri.routes_updated() - - # If process_router was called during a create or update - if ri.is_ha and ri.ha_port: - ri.enable_keepalived() - - # Update ex_gw_port and enable_snat on the router info cache - ri.ex_gw_port = ex_gw_port - ri.snat_ports = ri.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, []) - ri.enable_snat = ri.router.get('enable_snat') - def router_deleted(self, context, router_id): """Deal with router deletion RPC message.""" LOG.debug('Got router deleted notification for %s', router_id) @@ -427,21 +393,19 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, self._process_updated_router(router) def _process_added_router(self, router): - # TODO(pcm): Next refactoring will rework this logic self._router_added(router['id'], router) ri = self.router_info[router['id']] ri.router = router - self.process_router(ri) + ri.process(self) self.event_observers.notify( adv_svc.AdvancedService.after_router_added, ri) def _process_updated_router(self, router): - # TODO(pcm): Next refactoring will rework this logic ri = self.router_info[router['id']] ri.router = router self.event_observers.notify( adv_svc.AdvancedService.before_router_updated, ri) - self.process_router(ri) + ri.process(self) self.event_observers.notify( adv_svc.AdvancedService.after_router_updated, ri) diff --git a/neutron/agent/l3/dvr_router.py b/neutron/agent/l3/dvr_router.py index 449a93ae7..b13abc416 100644 --- a/neutron/agent/l3/dvr_router.py +++ b/neutron/agent/l3/dvr_router.py @@ -504,3 +504,11 @@ class DvrRouter(router.RouterInfo): # kicks the FW Agent to add rules for the IR namespace if # configured self.agent.process_router_add(self) + + def process(self, agent): + ex_gw_port = self.get_ex_gw_port() + if ex_gw_port: + self.fip_ns = agent.get_fip_ns(ex_gw_port['network_id']) + self.fip_ns.scan_fip_ports(self) + + super(DvrRouter, self).process(agent) diff --git a/neutron/agent/l3/ha_router.py b/neutron/agent/l3/ha_router.py index f6ecc06ec..94a36e33b 100644 --- a/neutron/agent/l3/ha_router.py +++ b/neutron/agent/l3/ha_router.py @@ -32,11 +32,12 @@ IP_MONITOR_PROCESS_SERVICE = 'ip_monitor' class HaRouter(router.RouterInfo): - def __init__(self, *args, **kwargs): + def __init__(self, state_change_callback, *args, **kwargs): super(HaRouter, self).__init__(*args, **kwargs) self.ha_port = None self.keepalived_manager = None + self.state_change_callback = state_change_callback @property def is_ha(self): @@ -73,7 +74,8 @@ class HaRouter(router.RouterInfo): LOG.error(_LE('Error while writing HA state for %s'), self.router_id) - def initialize(self, process_monitor, state_change_callback): + def initialize(self, process_monitor): + super(HaRouter, self).initialize(process_monitor) ha_port = self.router.get(n_consts.HA_INTERFACE_KEY) if not ha_port: LOG.error(_LE('Unable to process HA router %s without HA port'), @@ -84,14 +86,9 @@ class HaRouter(router.RouterInfo): self.ha_port = ha_port self._init_keepalived_manager(process_monitor) self.ha_network_added() - self.update_initial_state(state_change_callback) + self.update_initial_state(self.state_change_callback) self.spawn_state_change_monitor(process_monitor) - def terminate(self, process_monitor): - self.destroy_state_change_monitor(process_monitor) - self.ha_network_removed() - self.disable_keepalived() - def _init_keepalived_manager(self, process_monitor): self.keepalived_manager = keepalived.KeepalivedManager( self.router['id'], @@ -338,3 +335,15 @@ class HaRouter(router.RouterInfo): super(HaRouter, self).external_gateway_removed(ex_gw_port, interface_name) + + def delete(self, agent): + self.destroy_state_change_monitor(self.process_monitor) + self.ha_network_removed() + self.disable_keepalived() + super(HaRouter, self).delete(agent) + + def process(self, agent): + super(HaRouter, self).process(agent) + + if self.ha_port: + self.enable_keepalived() diff --git a/neutron/agent/l3/router_info.py b/neutron/agent/l3/router_info.py index 340b6b4a0..fcc36802b 100644 --- a/neutron/agent/l3/router_info.py +++ b/neutron/agent/l3/router_info.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from neutron.agent.l3 import namespaces from neutron.agent.linux import ip_lib from neutron.agent.linux import iptables_manager +from neutron.agent.linux import ra from neutron.common import constants as l3_constants from neutron.common import exceptions as n_exc from neutron.common import utils as common_utils @@ -62,6 +63,25 @@ class RouterInfo(object): # radvd is a neutron.agent.linux.ra.DaemonMonitor self.radvd = None + def initialize(self, process_monitor): + """Initialize the router on the system. + + This differs from __init__ in that this method actually affects the + system creating namespaces, starting processes, etc. The other merely + initializes the python object. This separates in-memory object + initialization from methods that actually go do stuff to the system. + + :param process_monitor: The agent's process monitor instance. + """ + self.process_monitor = process_monitor + self.radvd = ra.DaemonMonitor(self.router_id, + self.ns_name, + process_monitor, + self.get_internal_device_name) + + if self.router_namespace: + self.router_namespace.create() + @property def router(self): return self._router @@ -257,11 +277,11 @@ class RouterInfo(object): fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR return fip_statuses - def create(self): - if self.router_namespace: - self.router_namespace.create() - - def delete(self): + def delete(self, agent): + self.router['gw_port'] = None + self.router[l3_constants.INTERFACE_KEY] = [] + self.router[l3_constants.FLOATINGIP_KEY] = [] + self.process(agent) self.radvd.disable() if self.router_namespace: self.router_namespace.delete() @@ -514,3 +534,23 @@ class RouterInfo(object): fip_statuses = self.put_fips_in_error_state() agent.update_fip_statuses(self, existing_floating_ips, fip_statuses) + + @common_utils.exception_logger() + def process(self, agent): + """Process updates to this router + + This method is the point where the agent requests that updates be + applied to this router. + + :param agent: Passes the agent in order to send RPC messages. + """ + self._process_internal_ports() + self.process_external(agent) + # Process static routes for router + self.routes_updated() + + # Update ex_gw_port and enable_snat on the router info cache + self.ex_gw_port = self.get_ex_gw_port() + self.snat_ports = self.router.get( + l3_constants.SNAT_ROUTER_INTF_KEY, []) + self.enable_snat = self.router.get('enable_snat') diff --git a/neutron/tests/functional/agent/test_l3_agent.py b/neutron/tests/functional/agent/test_l3_agent.py index 735f33d32..07d435b0d 100644 --- a/neutron/tests/functional/agent/test_l3_agent.py +++ b/neutron/tests/functional/agent/test_l3_agent.py @@ -373,7 +373,7 @@ class L3AgentTestCase(L3AgentTestFramework): clean_fips(router) self._add_fip(router, client_address, fixed_address=server_address) - self.agent.process_router(router) + router.process(self.agent) router_ns = ip_lib.IPWrapper(namespace=router.ns_name) netcat = helpers.NetcatTester(router_ns, router_ns, @@ -396,7 +396,7 @@ class L3AgentTestCase(L3AgentTestFramework): assert_num_of_conntrack_rules(1) clean_fips(router) - self.agent.process_router(router) + router.process(self.agent) assert_num_of_conntrack_rules(0) with testtools.ExpectedException(RuntimeError): @@ -421,7 +421,7 @@ class L3AgentTestCase(L3AgentTestFramework): router.router['gw_port']['subnet']['gateway_ip'] = '19.4.4.5' router.router['gw_port']['fixed_ips'][0]['ip_address'] = '19.4.4.10' - self.agent.process_router(router) + router.process(self.agent) # Get the updated configuration and assert that both FIPs are in, # and that the GW IP address was updated. diff --git a/neutron/tests/unit/agent/l3/test_ha_router.py b/neutron/tests/unit/agent/l3/test_ha_router.py index e91e1468b..7e933fb53 100644 --- a/neutron/tests/unit/agent/l3/test_ha_router.py +++ b/neutron/tests/unit/agent/l3/test_ha_router.py @@ -32,7 +32,8 @@ class TestBasicRouterOperations(base.BaseTestCase): # NOTE The use_namespaces config will soon be deprecated self.agent_conf.use_namespaces = True self.router_id = _uuid() - return ha_router.HaRouter(self.router_id, + return ha_router.HaRouter(mock.sentinel.enqueue_state, + self.router_id, router, self.agent_conf, mock.sentinel.driver, diff --git a/neutron/tests/unit/test_l3_agent.py b/neutron/tests/unit/test_l3_agent.py index b5744ade3..0c0c7bb21 100644 --- a/neutron/tests/unit/test_l3_agent.py +++ b/neutron/tests/unit/test_l3_agent.py @@ -287,7 +287,7 @@ class BasicRouterOperationsFramework(base.BaseTestCase): ri.ns_name, agent.process_monitor, ri.get_internal_device_name) - agent.process_router(ri) + ri.process(agent) class TestBasicRouterOperations(BasicRouterOperationsFramework): @@ -782,7 +782,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): 'fixed_ip_address': '7.7.7.7', 'port_id': _uuid(), 'host': HOSTNAME}]} - agent.process_router(ri) + ri.process(agent) ri.process_floating_ip_addresses.assert_called_with(mock.ANY) ri.process_floating_ip_addresses.reset_mock() ri.process_floating_ip_nat_rules.assert_called_with() @@ -794,7 +794,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8' router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips'] - agent.process_router(ri) + ri.process(agent) ri.process_floating_ip_addresses.assert_called_with(mock.ANY) ri.process_floating_ip_addresses.reset_mock() ri.process_floating_ip_nat_rules.assert_called_with() @@ -811,7 +811,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ['fixed_ips'][0]['ip_address'])) ri.router['gw_port']['fixed_ips'][0]['ip_address'] = str(old_ip + 1) - agent.process_router(ri) + ri.process(agent) ri.process_floating_ip_addresses.reset_mock() ri.process_floating_ip_nat_rules.reset_mock() self.assertEqual(ri.external_gateway_added.call_count, 0) @@ -819,7 +819,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # remove just the floating ips del router[l3_constants.FLOATINGIP_KEY] - agent.process_router(ri) + ri.process(agent) ri.process_floating_ip_addresses.assert_called_with(mock.ANY) ri.process_floating_ip_addresses.reset_mock() ri.process_floating_ip_nat_rules.assert_called_with() @@ -828,7 +828,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # now no ports so state is torn down del router[l3_constants.INTERFACE_KEY] del router['gw_port'] - agent.process_router(ri) + ri.process(agent) self.assertEqual(self.send_arp.call_count, 1) distributed = ri.router.get('distributed', False) self.assertEqual(ri.process_floating_ip_addresses.called, @@ -992,13 +992,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process with NAT - agent.process_router(ri) + ri.process(agent) orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:] # Reprocess without NAT router['enable_snat'] = False # Reassign the router object to RouterInfo ri.router = router - agent.process_router(ri) + ri.process(agent) # For some reason set logic does not work well with # IpTablesRule instances nat_rules_delta = [r for r in orig_nat_rules @@ -1013,13 +1013,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process without NAT - agent.process_router(ri) + ri.process(agent) orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:] # Reprocess with NAT router['enable_snat'] = True # Reassign the router object to RouterInfo ri.router = router - agent.process_router(ri) + ri.process(agent) # For some reason set logic does not work well with # IpTablesRule instances nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules @@ -1034,13 +1034,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process with NAT - agent.process_router(ri) + ri.process(agent) # Add an interface and reprocess router_append_interface(router) # Reassign the router object to RouterInfo ri.router = router - agent.process_router(ri) - # send_arp is called both times process_router is called + ri.process(agent) + # send_arp is called both times process is called self.assertEqual(self.send_arp.call_count, 2) def test_process_ipv6_only_gw(self): @@ -1073,7 +1073,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process with NAT - agent.process_router(ri) + ri.process(agent) orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:] # Add an IPv6 interface and reprocess router_append_interface(router, count=1, ip_version=6, ra_mode=ra_mode, @@ -1131,7 +1131,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process with NAT - agent.process_router(ri) + ri.process(agent) # Add an IPv4 and IPv6 interface and reprocess router_append_interface(router, count=1, ip_version=4) router_append_interface(router, count=1, ip_version=6) @@ -1145,13 +1145,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # Process with NAT - agent.process_router(ri) + ri.process(agent) # Add an interface and reprocess del router[l3_constants.INTERFACE_KEY][1] # Reassign the router object to RouterInfo ri.router = router - agent.process_router(ri) - # send_arp is called both times process_router is called + ri.process(agent) + # send_arp is called both times process is called self.assertEqual(self.send_arp.call_count, 2) def test_process_router_ipv6_interface_removed(self): @@ -1183,7 +1183,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # raise RuntimeError to simulate that an unexpected exception # occurs internal_network_added.side_effect = RuntimeError - self.assertRaises(RuntimeError, agent.process_router, ri) + self.assertRaises(RuntimeError, ri.process, agent) self.assertNotIn( router[l3_constants.INTERFACE_KEY][0], ri.internal_ports) @@ -1192,7 +1192,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # periodic_sync_routers_task finds out that _rpc_loop failed to # process the router last time, it will retry in the next run. - agent.process_router(ri) + ri.process(agent) # We were able to add the port to ri.internal_ports self.assertIn( router[l3_constants.INTERFACE_KEY][0], ri.internal_ports) @@ -1203,7 +1203,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() # add an internal port - agent.process_router(ri) + ri.process(agent) with mock.patch.object( ri, @@ -1213,7 +1213,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): internal_net_removed.side_effect = RuntimeError ri.internal_ports[0]['admin_state_up'] = False # The above port is set to down state, remove it. - self.assertRaises(RuntimeError, agent.process_router, ri) + self.assertRaises(RuntimeError, ri.process, agent) self.assertIn( router[l3_constants.INTERFACE_KEY][0], ri.internal_ports) @@ -1222,7 +1222,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # periodic_sync_routers_task finds out that _rpc_loop failed to # process the router last time, it will retry in the next run. - agent.process_router(ri) + ri.process(agent) # We were able to remove the port from ri.internal_ports self.assertNotIn( router[l3_constants.INTERFACE_KEY][0], ri.internal_ports) @@ -1244,7 +1244,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): router, **self.ri_kwargs) ri.external_gateway_added = mock.Mock() - agent.process_router(ri) + ri.process(agent) # Assess the call for putting the floating IP up was performed mock_update_fip_status.assert_called_once_with( mock.ANY, ri.router_id, @@ -1253,7 +1253,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): # Process the router again, this time without floating IPs router[l3_constants.FLOATINGIP_KEY] = [] ri.router = router - agent.process_router(ri) + ri.process(agent) # Assess the call for putting the floating IP up was performed mock_update_fip_status.assert_called_once_with( mock.ANY, ri.router_id, @@ -1276,7 +1276,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri.process_floating_ip_addresses = mock.Mock( side_effect=RuntimeError) ri.external_gateway_added = mock.Mock() - agent.process_router(ri) + ri.process(agent) # Assess the call for putting the floating IP into Error # was performed mock_update_fip_status.assert_called_once_with( @@ -1390,7 +1390,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): external_gateway_removed, external_gateway_added): - agent.process_router(ri) + ri.process(agent) self.assertEqual(external_gateway_added.call_count, 1) self.assertFalse(external_gateway_removed.called) @@ -1415,7 +1415,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): self.mock_ip.get_devices.return_value = stale_devlist - agent.process_router(ri) + ri.process(agent) self.mock_driver.unplug.assert_called_with( stale_devnames[0],