]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Added test_dvr_router_lifecycle to cover dvr
authoradolfo duarte <adolfo.duarte@hp.com>
Wed, 3 Dec 2014 05:48:50 +0000 (21:48 -0800)
committeradolfo duarte <adolfo.duarte@hp.com>
Thu, 22 Jan 2015 09:29:44 +0000 (01:29 -0800)
Several additions were necessary to support testing the lifecycle of a
dvr router. Added new function _dvr_router_lifecycle and necessary
supporting  functions to avoid impacting any other section of the code
or tests.
Tests Added:
test_dvr_router_lifecycle_without_ha_without_snat_with_fips
test_dvr_router_lifecycle_without_ha_with_snat_with_fips

Change-Id: I1bb4bf6969e18ed345f208ed099a692ada5aa4c0

neutron/tests/functional/agent/test_l3_agent.py [changed mode: 0644->0755]

old mode 100644 (file)
new mode 100755 (executable)
index a24ad12..9bc806d
@@ -18,12 +18,14 @@ import functools
 
 import fixtures
 import mock
+import netaddr
 from oslo.config import cfg
 import webob
 import webob.dec
 import webob.exc
 
 from neutron.agent.common import config as agent_config
+from neutron.agent.l3 import agent as neutron_l3_agent
 from neutron.agent import l3_agent as l3_agent_main
 from neutron.agent.linux import dhcp
 from neutron.agent.linux import external_process
@@ -117,9 +119,9 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
                'fixed_ip_address': fixed_address}
         router.router[l3_constants.FLOATINGIP_KEY].append(fip)
 
-    def _namespace_exists(self, router):
-        ip = ip_lib.IPWrapper(self.root_helper, router.ns_name)
-        return ip.netns.exists(router.ns_name)
+    def _namespace_exists(self, namespace):
+        ip = ip_lib.IPWrapper(self.root_helper, namespace)
+        return ip.netns.exists(namespace)
 
     def _metadata_proxy_exists(self, conf, router):
         pm = external_process.ProcessManager(
@@ -196,6 +198,47 @@ vrrp_instance VR_1 {
             'default_gateway_ip': default_gateway_ip
         }
 
+    def _get_rule(self, iptables_manager, table, chain, predicate):
+        rules = iptables_manager.get_chain(table, chain)
+        result = next(rule for rule in rules if predicate(rule))
+        return result
+
+    def _assert_router_does_not_exist(self, router):
+        # If the namespace assertion succeeds
+        # then the devices and iptable rules have also been deleted,
+        # so there's no need to check that explicitly.
+        self.assertFalse(self._namespace_exists(router.ns_name))
+        self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
+
+    def _assert_snat_chains(self, router):
+        self.assertFalse(router.iptables_manager.is_chain_empty(
+            'nat', 'snat'))
+        self.assertFalse(router.iptables_manager.is_chain_empty(
+            'nat', 'POSTROUTING'))
+
+    def _assert_floating_ip_chains(self, router):
+        self.assertFalse(router.iptables_manager.is_chain_empty(
+            'nat', 'float-snat'))
+
+    def _assert_metadata_chains(self, router):
+        metadata_port_filter = lambda rule: (
+            str(self.agent.conf.metadata_port) in rule.rule)
+        self.assertTrue(self._get_rule(router.iptables_manager,
+                                       'nat',
+                                       'PREROUTING',
+                                       metadata_port_filter))
+        self.assertTrue(self._get_rule(router.iptables_manager,
+                                       'filter',
+                                       'INPUT',
+                                       metadata_port_filter))
+
+    def _assert_internal_devices(self, router):
+        internal_devices = router.router[l3_constants.INTERFACE_KEY]
+        self.assertTrue(len(internal_devices))
+        for device in internal_devices:
+            self.assertTrue(self.device_exists_with_ip_mac(
+                device, self.agent.get_internal_device_name, router.ns_name))
+
 
 class L3AgentTestCase(L3AgentTestFramework):
     def test_observer_notifications_legacy_router(self):
@@ -280,7 +323,7 @@ class L3AgentTestCase(L3AgentTestFramework):
                 router.ns_name)
             helpers.wait_until_true(device_exists)
 
-        self.assertTrue(self._namespace_exists(router))
+        self.assertTrue(self._namespace_exists(router.ns_name))
         self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
         self._assert_internal_devices(router)
         self._assert_external_device(router)
@@ -300,13 +343,6 @@ class L3AgentTestCase(L3AgentTestFramework):
         if enable_ha:
             self.assertFalse(router.keepalived_manager.process.active)
 
-    def _assert_internal_devices(self, router):
-        internal_devices = router.router[l3_constants.INTERFACE_KEY]
-        self.assertTrue(len(internal_devices))
-        for device in internal_devices:
-            self.assertTrue(self.device_exists_with_ip_mac(
-                device, self.agent.get_internal_device_name, router.ns_name))
-
     def _assert_external_device(self, router):
         external_port = self.agent._get_ex_gw_port(router)
         self.assertTrue(self.device_exists_with_ip_mac(
@@ -336,40 +372,6 @@ class L3AgentTestCase(L3AgentTestFramework):
                 external_port['mac_address'],
                 router.ns_name, self.root_helper))
 
-    def _assert_snat_chains(self, router):
-        self.assertFalse(router.iptables_manager.is_chain_empty(
-            'nat', 'snat'))
-        self.assertFalse(router.iptables_manager.is_chain_empty(
-            'nat', 'POSTROUTING'))
-
-    def _assert_floating_ip_chains(self, router):
-        self.assertFalse(router.iptables_manager.is_chain_empty(
-            'nat', 'float-snat'))
-
-    def _get_rule(self, iptables_manager, table, chain, predicate):
-        rules = iptables_manager.get_chain(table, chain)
-        result = next(rule for rule in rules if predicate(rule))
-        return result
-
-    def _assert_metadata_chains(self, router):
-        metadata_port_filter = lambda rule: (
-            str(self.agent.conf.metadata_port) in rule.rule)
-        self.assertTrue(self._get_rule(router.iptables_manager,
-                                       'nat',
-                                       'PREROUTING',
-                                       metadata_port_filter))
-        self.assertTrue(self._get_rule(router.iptables_manager,
-                                       'filter',
-                                       'INPUT',
-                                       metadata_port_filter))
-
-    def _assert_router_does_not_exist(self, router):
-        # If the namespace assertion succeeds
-        # then the devices and iptable rules have also been deleted,
-        # so there's no need to check that explicitly.
-        self.assertFalse(self._namespace_exists(router))
-        self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
-
     def _assert_ha_device(self, router):
         self.assertTrue(self.device_exists_with_ip_mac(
             router.router[l3_constants.HA_INTERFACE_KEY],
@@ -473,3 +475,194 @@ class MetadataL3AgentTestCase(L3AgentTestFramework):
         # Check status code
         firstline = raw_headers.splitlines()[0]
         self.assertIn(str(webob.exc.HTTPOk.code), firstline.split())
+
+
+class TestDvrRouter(L3AgentTestFramework):
+    def test_dvr_router_lifecycle_without_ha_without_snat_with_fips(self):
+        self._dvr_router_lifecycle(enable_ha=False, enable_snat=False)
+
+    def test_dvr_router_lifecycle_without_ha_with_snat_with_fips(self):
+        self._dvr_router_lifecycle(enable_ha=False, enable_snat=True)
+
+    def _dvr_router_lifecycle(self, enable_ha=False, enable_snat=False):
+        '''Test dvr router lifecycle
+
+        :param enable_ha: sets the ha value for the router.
+        :param enable_snat:  the value of enable_snat is used
+        to  set the  agent_mode.
+        '''
+
+        # The value of agent_mode can be dvr, dvr_snat, or legacy.
+        # Since by definition this is a dvr (distributed = true)
+        # only dvr and dvr_snat are applicable
+        self.agent.conf.agent_mode = 'dvr_snat' if enable_snat else 'dvr'
+
+        # We get the router info particular to a dvr router
+        router_info = self.generate_dvr_router_info(
+            enable_ha, enable_snat)
+
+        # We need to mock the get_agent_gateway_port return value
+        # because the whole L3PluginApi is mocked and we need the port
+        # gateway_port information before the l3_agent will create it.
+        # The port returned needs to have the same information as
+        # router_info['gw_port']
+        mocked_gw_port = (
+            neutron_l3_agent.L3PluginApi.return_value.get_agent_gateway_port)
+        mocked_gw_port.return_value = router_info['gw_port']
+
+        # We also need to mock the get_external_network_id method to
+        # get the correct fip namespace.
+        mocked_ext_net_id = (
+            neutron_l3_agent.L3PluginApi.return_value.get_external_network_id)
+        mocked_ext_net_id.return_value = (
+            router_info['_floatingips'][0]['floating_network_id'])
+
+        # With all that set we can now ask the l3_agent to
+        # manage the router (create it, create namespaces,
+        # attach interfaces, etc...)
+        router = self.manage_router(self.agent, router_info)
+
+        self.assertTrue(self._namespace_exists(router.ns_name))
+        self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
+        self._assert_internal_devices(router)
+        self._assert_dvr_external_device(router)
+        self._assert_dvr_gateway(router)
+        self._assert_dvr_floating_ips(router)
+        self._assert_snat_chains(router)
+        self._assert_floating_ip_chains(router)
+        self._assert_metadata_chains(router)
+
+        self._delete_router(self.agent, router.router_id)
+        self._assert_router_does_not_exist(router)
+
+    def generate_dvr_router_info(self, enable_ha=False, enable_snat=False):
+        router = test_l3_agent.prepare_router_data(
+            enable_snat=enable_snat,
+            enable_floating_ip=True,
+            enable_ha=enable_ha)
+        internal_ports = router.get(l3_constants.INTERFACE_KEY, [])
+        router['distributed'] = True
+        router['gw_port_host'] = self.agent.conf.host
+        router['gw_port']['binding:host_id'] = self.agent.conf.host
+        floating_ip = router['_floatingips'][0]
+        floating_ip['floating_network_id'] = router['gw_port']['network_id']
+        floating_ip['host'] = self.agent.conf.host
+        floating_ip['port_id'] = internal_ports[0]['id']
+        floating_ip['status'] = 'ACTIVE'
+
+        if not enable_snat:
+            return router
+
+        self._add_snat_port_info_to_router(router, internal_ports)
+        return router
+
+    def _add_snat_port_info_to_router(self, router, internal_ports):
+        # Add snat port information to the router
+        snat_port_list = router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
+        if not snat_port_list and internal_ports:
+            # Get values from internal port
+            port = internal_ports[0]
+            fixed_ip = port['fixed_ips'][0]
+            snat_subnet = port['subnet']
+            port_ip = fixed_ip['ip_address']
+            # Pick an ip address which is not the same as port_ip
+            snat_ip = str(netaddr.IPAddress(port_ip) + 5)
+            # Add the info to router as the first snat port
+            # in the list of snat ports
+            router[l3_constants.SNAT_ROUTER_INTF_KEY] = [
+                {'subnet':
+                    {'cidr': snat_subnet['cidr'],
+                        'gateway_ip': snat_subnet['gateway_ip'],
+                        'id': fixed_ip['subnet_id']},
+                    'network_id': port['network_id'],
+                    'device_owner': 'network:router_centralized_snat',
+                    'mac_address': 'fa:16:3e:80:8d:89',
+                    'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
+                                    'ip_address': snat_ip}],
+                    'id': _uuid(),
+                    'device_id': _uuid()}
+            ]
+
+    def _assert_dvr_external_device(self, router):
+        external_port = self.agent._get_ex_gw_port(router)
+        snat_ns_name = self.agent.get_snat_ns_name(router.router_id)
+
+        # if the agent is in dvr_snat mode, then we have to check
+        # that the correct ports and ip addresses exist in the
+        # snat_ns_name namespace
+        if self.agent.conf.agent_mode == 'dvr_snat':
+            self.assertTrue(self.device_exists_with_ip_mac(
+                external_port, self.agent.get_external_device_name,
+                snat_ns_name))
+        # if the agent is in dvr mode then the snat_ns_name namespace
+        # should not be present at all:
+        elif self.agent.conf.agent_mode == 'dvr':
+            self.assertFalse(
+                self._namespace_exists(snat_ns_name),
+                "namespace %s was found but agent is in dvr mode not dvr_snat"
+                % (str(snat_ns_name))
+            )
+        # if the agent is anything else the test is misconfigured
+        # we force a test failure with message
+        else:
+            self.assertTrue(False, " agent not configured for dvr or dvr_snat")
+
+    def _assert_dvr_gateway(self, router):
+        gateway_expected_in_snat_namespace = (
+            self.agent.conf.agent_mode == 'dvr_snat'
+        )
+        if gateway_expected_in_snat_namespace:
+            self._assert_dvr_snat_gateway(router)
+
+        snat_namespace_should_not_exist = (
+            self.agent.conf.agent_mode == 'dvr'
+        )
+        if snat_namespace_should_not_exist:
+            self._assert_snat_namespace_does_not_exist(router)
+
+    def _assert_dvr_snat_gateway(self, router):
+        namespace = self.agent.get_snat_ns_name(router.router_id)
+        external_port = self.agent._get_ex_gw_port(router)
+        external_device_name = self.agent.get_external_device_name(
+            external_port['id'])
+        external_device = ip_lib.IPDevice(external_device_name,
+                                          self.root_helper,
+                                          namespace)
+        existing_gateway = (
+            external_device.route.get_gateway().get('gateway'))
+        expected_gateway = external_port['subnet']['gateway_ip']
+        self.assertEqual(expected_gateway, existing_gateway)
+
+    def _assert_snat_namespace_does_not_exist(self, router):
+        namespace = self.agent.get_snat_ns_name(router.router_id)
+        self.assertFalse(self._namespace_exists(namespace))
+
+    def _assert_dvr_floating_ips(self, router):
+        # in the fip namespace:
+        # Check that the fg-<port-id> (floatingip_agent_gateway)
+        # is created with the ip address of the external gateway port
+        floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
+        self.assertTrue(floating_ips)
+
+        external_port = self.agent._get_ex_gw_port(router)
+        fip_ns_name = (
+            self.agent.get_fip_ns_name(floating_ips[0]['floating_network_id'])
+        )
+        fg_port_created_succesfully = ip_lib.device_exists_with_ip_mac(
+            self.agent.get_fip_ext_device_name(external_port['id']),
+            external_port['ip_cidr'],
+            external_port['mac_address'],
+            fip_ns_name, self.root_helper)
+        self.assertTrue(fg_port_created_succesfully)
+        # Check fpr-router device has been created
+        device_name = self.agent.get_fip_int_device_name(router.router_id)
+        fpr_router_device_created_succesfully = ip_lib.device_exists(
+            device_name, self.root_helper, fip_ns_name)
+        self.assertTrue(fpr_router_device_created_succesfully)
+
+        # In the router namespace
+        # Check rfp-<router-id> is created correctly
+        for fip in floating_ips:
+            device_name = self.agent.get_rtr_int_device_name(router.router_id)
+            self.assertTrue(ip_lib.device_exists(
+                device_name, self.root_helper, router.ns_name))