import fixtures
import mock
+import netaddr
from oslo.config import cfg
import webob
import webob.dec
import webob.exc
from neutron.agent.common import config as agent_config
+from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent import l3_agent as l3_agent_main
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
'fixed_ip_address': fixed_address}
router.router[l3_constants.FLOATINGIP_KEY].append(fip)
- def _namespace_exists(self, router):
- ip = ip_lib.IPWrapper(self.root_helper, router.ns_name)
- return ip.netns.exists(router.ns_name)
+ def _namespace_exists(self, namespace):
+ ip = ip_lib.IPWrapper(self.root_helper, namespace)
+ return ip.netns.exists(namespace)
def _metadata_proxy_exists(self, conf, router):
pm = external_process.ProcessManager(
'default_gateway_ip': default_gateway_ip
}
+ def _get_rule(self, iptables_manager, table, chain, predicate):
+ rules = iptables_manager.get_chain(table, chain)
+ result = next(rule for rule in rules if predicate(rule))
+ return result
+
+ def _assert_router_does_not_exist(self, router):
+ # If the namespace assertion succeeds
+ # then the devices and iptable rules have also been deleted,
+ # so there's no need to check that explicitly.
+ self.assertFalse(self._namespace_exists(router.ns_name))
+ self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
+
+ def _assert_snat_chains(self, router):
+ self.assertFalse(router.iptables_manager.is_chain_empty(
+ 'nat', 'snat'))
+ self.assertFalse(router.iptables_manager.is_chain_empty(
+ 'nat', 'POSTROUTING'))
+
+ def _assert_floating_ip_chains(self, router):
+ self.assertFalse(router.iptables_manager.is_chain_empty(
+ 'nat', 'float-snat'))
+
+ def _assert_metadata_chains(self, router):
+ metadata_port_filter = lambda rule: (
+ str(self.agent.conf.metadata_port) in rule.rule)
+ self.assertTrue(self._get_rule(router.iptables_manager,
+ 'nat',
+ 'PREROUTING',
+ metadata_port_filter))
+ self.assertTrue(self._get_rule(router.iptables_manager,
+ 'filter',
+ 'INPUT',
+ metadata_port_filter))
+
+ def _assert_internal_devices(self, router):
+ internal_devices = router.router[l3_constants.INTERFACE_KEY]
+ self.assertTrue(len(internal_devices))
+ for device in internal_devices:
+ self.assertTrue(self.device_exists_with_ip_mac(
+ device, self.agent.get_internal_device_name, router.ns_name))
+
class L3AgentTestCase(L3AgentTestFramework):
def test_observer_notifications_legacy_router(self):
router.ns_name)
helpers.wait_until_true(device_exists)
- self.assertTrue(self._namespace_exists(router))
+ self.assertTrue(self._namespace_exists(router.ns_name))
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
self._assert_internal_devices(router)
self._assert_external_device(router)
if enable_ha:
self.assertFalse(router.keepalived_manager.process.active)
- def _assert_internal_devices(self, router):
- internal_devices = router.router[l3_constants.INTERFACE_KEY]
- self.assertTrue(len(internal_devices))
- for device in internal_devices:
- self.assertTrue(self.device_exists_with_ip_mac(
- device, self.agent.get_internal_device_name, router.ns_name))
-
def _assert_external_device(self, router):
external_port = self.agent._get_ex_gw_port(router)
self.assertTrue(self.device_exists_with_ip_mac(
external_port['mac_address'],
router.ns_name, self.root_helper))
- def _assert_snat_chains(self, router):
- self.assertFalse(router.iptables_manager.is_chain_empty(
- 'nat', 'snat'))
- self.assertFalse(router.iptables_manager.is_chain_empty(
- 'nat', 'POSTROUTING'))
-
- def _assert_floating_ip_chains(self, router):
- self.assertFalse(router.iptables_manager.is_chain_empty(
- 'nat', 'float-snat'))
-
- def _get_rule(self, iptables_manager, table, chain, predicate):
- rules = iptables_manager.get_chain(table, chain)
- result = next(rule for rule in rules if predicate(rule))
- return result
-
- def _assert_metadata_chains(self, router):
- metadata_port_filter = lambda rule: (
- str(self.agent.conf.metadata_port) in rule.rule)
- self.assertTrue(self._get_rule(router.iptables_manager,
- 'nat',
- 'PREROUTING',
- metadata_port_filter))
- self.assertTrue(self._get_rule(router.iptables_manager,
- 'filter',
- 'INPUT',
- metadata_port_filter))
-
- def _assert_router_does_not_exist(self, router):
- # If the namespace assertion succeeds
- # then the devices and iptable rules have also been deleted,
- # so there's no need to check that explicitly.
- self.assertFalse(self._namespace_exists(router))
- self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
-
def _assert_ha_device(self, router):
self.assertTrue(self.device_exists_with_ip_mac(
router.router[l3_constants.HA_INTERFACE_KEY],
# Check status code
firstline = raw_headers.splitlines()[0]
self.assertIn(str(webob.exc.HTTPOk.code), firstline.split())
+
+
+class TestDvrRouter(L3AgentTestFramework):
+ def test_dvr_router_lifecycle_without_ha_without_snat_with_fips(self):
+ self._dvr_router_lifecycle(enable_ha=False, enable_snat=False)
+
+ def test_dvr_router_lifecycle_without_ha_with_snat_with_fips(self):
+ self._dvr_router_lifecycle(enable_ha=False, enable_snat=True)
+
+ def _dvr_router_lifecycle(self, enable_ha=False, enable_snat=False):
+ '''Test dvr router lifecycle
+
+ :param enable_ha: sets the ha value for the router.
+ :param enable_snat: the value of enable_snat is used
+ to set the agent_mode.
+ '''
+
+ # The value of agent_mode can be dvr, dvr_snat, or legacy.
+ # Since by definition this is a dvr (distributed = true)
+ # only dvr and dvr_snat are applicable
+ self.agent.conf.agent_mode = 'dvr_snat' if enable_snat else 'dvr'
+
+ # We get the router info particular to a dvr router
+ router_info = self.generate_dvr_router_info(
+ enable_ha, enable_snat)
+
+ # We need to mock the get_agent_gateway_port return value
+ # because the whole L3PluginApi is mocked and we need the port
+ # gateway_port information before the l3_agent will create it.
+ # The port returned needs to have the same information as
+ # router_info['gw_port']
+ mocked_gw_port = (
+ neutron_l3_agent.L3PluginApi.return_value.get_agent_gateway_port)
+ mocked_gw_port.return_value = router_info['gw_port']
+
+ # We also need to mock the get_external_network_id method to
+ # get the correct fip namespace.
+ mocked_ext_net_id = (
+ neutron_l3_agent.L3PluginApi.return_value.get_external_network_id)
+ mocked_ext_net_id.return_value = (
+ router_info['_floatingips'][0]['floating_network_id'])
+
+ # With all that set we can now ask the l3_agent to
+ # manage the router (create it, create namespaces,
+ # attach interfaces, etc...)
+ router = self.manage_router(self.agent, router_info)
+
+ self.assertTrue(self._namespace_exists(router.ns_name))
+ self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
+ self._assert_internal_devices(router)
+ self._assert_dvr_external_device(router)
+ self._assert_dvr_gateway(router)
+ self._assert_dvr_floating_ips(router)
+ self._assert_snat_chains(router)
+ self._assert_floating_ip_chains(router)
+ self._assert_metadata_chains(router)
+
+ self._delete_router(self.agent, router.router_id)
+ self._assert_router_does_not_exist(router)
+
+ def generate_dvr_router_info(self, enable_ha=False, enable_snat=False):
+ router = test_l3_agent.prepare_router_data(
+ enable_snat=enable_snat,
+ enable_floating_ip=True,
+ enable_ha=enable_ha)
+ internal_ports = router.get(l3_constants.INTERFACE_KEY, [])
+ router['distributed'] = True
+ router['gw_port_host'] = self.agent.conf.host
+ router['gw_port']['binding:host_id'] = self.agent.conf.host
+ floating_ip = router['_floatingips'][0]
+ floating_ip['floating_network_id'] = router['gw_port']['network_id']
+ floating_ip['host'] = self.agent.conf.host
+ floating_ip['port_id'] = internal_ports[0]['id']
+ floating_ip['status'] = 'ACTIVE'
+
+ if not enable_snat:
+ return router
+
+ self._add_snat_port_info_to_router(router, internal_ports)
+ return router
+
+ def _add_snat_port_info_to_router(self, router, internal_ports):
+ # Add snat port information to the router
+ snat_port_list = router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
+ if not snat_port_list and internal_ports:
+ # Get values from internal port
+ port = internal_ports[0]
+ fixed_ip = port['fixed_ips'][0]
+ snat_subnet = port['subnet']
+ port_ip = fixed_ip['ip_address']
+ # Pick an ip address which is not the same as port_ip
+ snat_ip = str(netaddr.IPAddress(port_ip) + 5)
+ # Add the info to router as the first snat port
+ # in the list of snat ports
+ router[l3_constants.SNAT_ROUTER_INTF_KEY] = [
+ {'subnet':
+ {'cidr': snat_subnet['cidr'],
+ 'gateway_ip': snat_subnet['gateway_ip'],
+ 'id': fixed_ip['subnet_id']},
+ 'network_id': port['network_id'],
+ 'device_owner': 'network:router_centralized_snat',
+ 'mac_address': 'fa:16:3e:80:8d:89',
+ 'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
+ 'ip_address': snat_ip}],
+ 'id': _uuid(),
+ 'device_id': _uuid()}
+ ]
+
+ def _assert_dvr_external_device(self, router):
+ external_port = self.agent._get_ex_gw_port(router)
+ snat_ns_name = self.agent.get_snat_ns_name(router.router_id)
+
+ # if the agent is in dvr_snat mode, then we have to check
+ # that the correct ports and ip addresses exist in the
+ # snat_ns_name namespace
+ if self.agent.conf.agent_mode == 'dvr_snat':
+ self.assertTrue(self.device_exists_with_ip_mac(
+ external_port, self.agent.get_external_device_name,
+ snat_ns_name))
+ # if the agent is in dvr mode then the snat_ns_name namespace
+ # should not be present at all:
+ elif self.agent.conf.agent_mode == 'dvr':
+ self.assertFalse(
+ self._namespace_exists(snat_ns_name),
+ "namespace %s was found but agent is in dvr mode not dvr_snat"
+ % (str(snat_ns_name))
+ )
+ # if the agent is anything else the test is misconfigured
+ # we force a test failure with message
+ else:
+ self.assertTrue(False, " agent not configured for dvr or dvr_snat")
+
+ def _assert_dvr_gateway(self, router):
+ gateway_expected_in_snat_namespace = (
+ self.agent.conf.agent_mode == 'dvr_snat'
+ )
+ if gateway_expected_in_snat_namespace:
+ self._assert_dvr_snat_gateway(router)
+
+ snat_namespace_should_not_exist = (
+ self.agent.conf.agent_mode == 'dvr'
+ )
+ if snat_namespace_should_not_exist:
+ self._assert_snat_namespace_does_not_exist(router)
+
+ def _assert_dvr_snat_gateway(self, router):
+ namespace = self.agent.get_snat_ns_name(router.router_id)
+ external_port = self.agent._get_ex_gw_port(router)
+ external_device_name = self.agent.get_external_device_name(
+ external_port['id'])
+ external_device = ip_lib.IPDevice(external_device_name,
+ self.root_helper,
+ namespace)
+ existing_gateway = (
+ external_device.route.get_gateway().get('gateway'))
+ expected_gateway = external_port['subnet']['gateway_ip']
+ self.assertEqual(expected_gateway, existing_gateway)
+
+ def _assert_snat_namespace_does_not_exist(self, router):
+ namespace = self.agent.get_snat_ns_name(router.router_id)
+ self.assertFalse(self._namespace_exists(namespace))
+
+ def _assert_dvr_floating_ips(self, router):
+ # in the fip namespace:
+ # Check that the fg-<port-id> (floatingip_agent_gateway)
+ # is created with the ip address of the external gateway port
+ floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
+ self.assertTrue(floating_ips)
+
+ external_port = self.agent._get_ex_gw_port(router)
+ fip_ns_name = (
+ self.agent.get_fip_ns_name(floating_ips[0]['floating_network_id'])
+ )
+ fg_port_created_succesfully = ip_lib.device_exists_with_ip_mac(
+ self.agent.get_fip_ext_device_name(external_port['id']),
+ external_port['ip_cidr'],
+ external_port['mac_address'],
+ fip_ns_name, self.root_helper)
+ self.assertTrue(fg_port_created_succesfully)
+ # Check fpr-router device has been created
+ device_name = self.agent.get_fip_int_device_name(router.router_id)
+ fpr_router_device_created_succesfully = ip_lib.device_exists(
+ device_name, self.root_helper, fip_ns_name)
+ self.assertTrue(fpr_router_device_created_succesfully)
+
+ # In the router namespace
+ # Check rfp-<router-id> is created correctly
+ for fip in floating_ips:
+ device_name = self.agent.get_rtr_int_device_name(router.router_id)
+ self.assertTrue(ip_lib.device_exists(
+ device_name, self.root_helper, router.ns_name))