]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Replace BaseIPVethTestCase by FakeMachine
authorCedric Brandily <zzelle@gmail.com>
Sun, 1 Mar 2015 23:05:36 +0000 (23:05 +0000)
committerCedric Brandily <zzelle@gmail.com>
Fri, 24 Apr 2015 09:47:57 +0000 (09:47 +0000)
This change removes BaseIPVethTestCase class and moves Pinger class to
allow its use from a fake machine.

Change-Id: I0636f11a327e9535828e7b52e60195e52831a0b2

neutron/tests/common/machine_fixtures.py
neutron/tests/functional/agent/linux/base.py
neutron/tests/functional/agent/linux/helpers.py
neutron/tests/functional/agent/linux/test_ebtables_driver.py
neutron/tests/functional/agent/linux/test_ipset.py
neutron/tests/functional/agent/linux/test_iptables.py
neutron/tests/functional/agent/linux/test_iptables_firewall.py
neutron/tests/functional/agent/test_ovs_flows.py

index 5fa45375255901310ab7b7b765b79efa2b5b4337..7cc626c887ff5c250f2a93bc6ebf8018b6601059 100644 (file)
@@ -17,6 +17,31 @@ import fixtures
 
 from neutron.agent.linux import ip_lib
 from neutron.tests.common import net_helpers
+from neutron.tests import tools
+
+
+class Pinger(object):
+    def __init__(self, namespace, timeout=1, max_attempts=1):
+        self.namespace = namespace
+        self._timeout = timeout
+        self._max_attempts = max_attempts
+
+    def _ping_destination(self, dest_address):
+        ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
+        ns_ip_wrapper.netns.execute(['ping', '-c', self._max_attempts,
+                                     '-W', self._timeout, dest_address])
+
+    def assert_ping(self, dst_ip):
+        self._ping_destination(dst_ip)
+
+    def assert_no_ping(self, dst_ip):
+        try:
+            self._ping_destination(dst_ip)
+            tools.fail("destination ip %(dst_ip)s is replying to ping "
+                       "from namespace %(ns)s, but it shouldn't" %
+                       {'ns': self.namespace, 'dst_ip': dst_ip})
+        except RuntimeError:
+            pass
 
 
 class FakeMachine(fixtures.Fixture):
@@ -45,8 +70,9 @@ class FakeMachine(fixtures.Fixture):
 
     def setUp(self):
         super(FakeMachine, self).setUp()
-        self.namespace = self.useFixture(
-            net_helpers.NamespaceFixture()).name
+        ns_fixture = self.useFixture(
+            net_helpers.NamespaceFixture())
+        self.namespace = ns_fixture.name
 
         self.port = self.useFixture(
             net_helpers.PortFixture.get(self.bridge, self.namespace)).port
@@ -59,6 +85,14 @@ class FakeMachine(fixtures.Fixture):
         ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
         return ns_ip_wrapper.netns.execute(*args, **kwargs)
 
+    def assert_ping(self, dst_ip):
+        pinger = Pinger(self.namespace)
+        pinger.assert_ping(dst_ip)
+
+    def assert_no_ping(self, dst_ip):
+        pinger = Pinger(self.namespace)
+        pinger.assert_no_ping(dst_ip)
+
 
 class PeerMachines(fixtures.Fixture):
     """Create 'amount' peered machines on an ip_cidr.
index 25741b140fab0b9a41a7e0ff75464c724ed5b0e2..b2d7b1be26ccfbe56b9b4698a907c148642daf9e 100644 (file)
@@ -14,9 +14,7 @@
 
 import testscenarios
 
-from neutron.agent.linux import ip_lib
 from neutron.tests import base as tests_base
-from neutron.tests.common import machine_fixtures
 from neutron.tests.common import net_helpers
 from neutron.tests.functional import base as functional_base
 
@@ -54,14 +52,3 @@ class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
     def setUp(self):
         super(BaseOVSLinuxTestCase, self).setUp()
         self.config(group='OVS', ovsdb_interface=self.ovsdb_interface)
-
-
-class BaseIPVethTestCase(functional_base.BaseSudoTestCase):
-
-    def prepare_veth_pairs(self):
-        bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
-        machines = self.useFixture(
-            machine_fixtures.PeerMachines(bridge)).machines
-        self.SRC_ADDRESS = machines[0].ip
-        self.DST_ADDRESS = machines[1].ip
-        return [ip_lib.IPWrapper(m.namespace) for m in machines]
index 1e51a9b81c0083195c8686fddcac1fd4793b549c..611a423045df9812e77d36422574b53d3437ff22 100644 (file)
@@ -25,7 +25,6 @@ import fixtures
 from neutron.agent.common import config
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
-from neutron.tests import tools
 
 CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
 CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
@@ -99,29 +98,6 @@ def get_unused_port(used, start=1024, end=65535):
     return random.choice(list(candidates - used))
 
 
-class Pinger(object):
-    def __init__(self, namespace, timeout=1, max_attempts=1):
-        self.namespace = namespace
-        self._timeout = timeout
-        self._max_attempts = max_attempts
-
-    def _ping_destination(self, dest_address):
-        self.namespace.netns.execute(['ping', '-c', self._max_attempts,
-                                      '-W', self._timeout, dest_address])
-
-    def assert_ping(self, dst_ip):
-        self._ping_destination(dst_ip)
-
-    def assert_no_ping(self, dst_ip):
-        try:
-            self._ping_destination(dst_ip)
-            tools.fail("destination ip %(dst_ip)s is replying to ping"
-                       "from namespace %(ns)s, but it shouldn't" %
-                       {'ns': self.namespace.namespace, 'dst_ip': dst_ip})
-        except RuntimeError:
-            pass
-
-
 class RootHelperProcess(subprocess.Popen):
     def __init__(self, cmd, *args, **kwargs):
         for arg in ('stdin', 'stdout', 'stderr'):
index 8fc5fe10df3b6db43401fa7cdfe49321ea07651f..999cc6762dc8951f219a68388be57869306b29b2 100644 (file)
@@ -15,9 +15,9 @@
 
 from neutron.agent.linux import ebtables_driver
 from neutron.agent.linux import ip_lib
-from neutron.agent.linux import utils as linux_utils
-from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
+from neutron.tests.functional import base
 
 
 NO_FILTER_APPLY = (
@@ -70,36 +70,31 @@ FILTER_APPLY_TEMPLATE = (
     "COMMIT")
 
 
-class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
+class EbtablesLowLevelTestCase(base.BaseSudoTestCase):
 
     def setUp(self):
         super(EbtablesLowLevelTestCase, self).setUp()
-        self.src_ns, self.dst_ns = self.prepare_veth_pairs()
-        devs = [d for d in self.src_ns.get_devices() if d.name != "lo"]
-        src_dev_name = devs[0].name
-        self.ns = self.src_ns.namespace
-        self.execute = linux_utils.execute
-        self.pinger = helpers.Pinger(self.src_ns)
+
+        bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+        self.source, self.destination = self.useFixture(
+            machine_fixtures.PeerMachines(bridge)).machines
 
         # Extract MAC and IP address of one of my interfaces
-        self.mac = self.src_ns.device(src_dev_name).link.address
-        addr = [a for a in
-                self.src_ns.device(src_dev_name).addr.list()][0]['cidr']
-        self.addr = addr.split("/")[0]
+        self.mac = self.source.port.link.address
+        self.addr = self.source.ip
 
         # Pick one of the namespaces and setup a bridge for the local ethernet
         # interface there, because ebtables only works on bridged interfaces.
-        self.src_ns.netns.execute("brctl addbr mybridge".split())
-        self.src_ns.netns.execute(("brctl addif mybridge %s" % src_dev_name).
-                                  split())
+        self.source.execute(['brctl', 'addbr', 'mybridge'])
+        self.source.execute(
+            ['brctl', 'addif', 'mybridge', self.source.port.name])
 
         # Take the IP addrss off one of the interfaces and apply it to the
         # bridge interface instead.
-        dev_source = ip_lib.IPDevice(src_dev_name, self.src_ns.namespace)
-        dev_mybridge = ip_lib.IPDevice("mybridge", self.src_ns.namespace)
-        dev_source.addr.delete(addr)
+        self.source.port.addr.delete(self.source.ip_cidr)
+        dev_mybridge = ip_lib.IPDevice("mybridge", self.source.namespace)
         dev_mybridge.link.set_up()
-        dev_mybridge.addr.add(addr)
+        dev_mybridge.addr.add(self.source.ip_cidr)
 
     def _test_basic_port_filter_wrong_mac(self):
         # Setup filter with wrong IP/MAC address pair. Basic filters only allow
@@ -108,15 +103,13 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
         mac_ip_pair = dict(mac_addr="11:11:11:22:22:22", ip_addr=self.addr)
         filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
         ebtables_driver.ebtables_restore(filter_apply,
-                                         self.execute,
-                                         self.ns)
-        self.pinger.assert_no_ping(self.DST_ADDRESS)
+                                         self.source.execute)
+        self.source.assert_no_ping(self.destination.ip)
 
         # Assure that ping will work once we unfilter the instance
         ebtables_driver.ebtables_restore(NO_FILTER_APPLY,
-                                         self.execute,
-                                         self.ns)
-        self.pinger.assert_ping(self.DST_ADDRESS)
+                                         self.source.execute)
+        self.source.assert_ping(self.destination.ip)
 
     def _test_basic_port_filter_correct_mac(self):
         # Use the correct IP/MAC address pair for this one.
@@ -124,10 +117,9 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
 
         filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
         ebtables_driver.ebtables_restore(filter_apply,
-                                         self.execute,
-                                         self.ns)
+                                         self.source.execute)
 
-        self.pinger.assert_ping(self.DST_ADDRESS)
+        self.source.assert_ping(self.destination.ip)
 
     def test_ebtables_filtering(self):
         # Cannot parallelize those tests. Therefore need to call them
index 499386423f1db56c78e3673698879af7ae1052fc..703db22449bf83609264fc7390aa1780f279101d 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ipset_manager
 from neutron.agent.linux import iptables_manager
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
 from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.functional import base as functional_base
 
 IPSET_SET = 'test-set'
 IPSET_ETHERTYPE = 'IPv4'
@@ -24,20 +27,22 @@ ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET
 UNRELATED_IP = '1.1.1.1'
 
 
-class IpsetBase(base.BaseIPVethTestCase):
+class IpsetBase(functional_base.BaseSudoTestCase):
 
     def setUp(self):
         super(IpsetBase, self).setUp()
 
-        self.src_ns, self.dst_ns = self.prepare_veth_pairs()
-        self.ipset = self._create_ipset_manager_and_set(self.dst_ns,
-                                                        IPSET_SET)
+        bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+        self.source, self.destination = self.useFixture(
+            machine_fixtures.PeerMachines(bridge)).machines
+
+        self.ipset = self._create_ipset_manager_and_set(
+            ip_lib.IPWrapper(self.destination.namespace), IPSET_SET)
 
         self.dst_iptables = iptables_manager.IptablesManager(
-            namespace=self.dst_ns.namespace)
+            namespace=self.destination.namespace)
 
         self._add_iptables_ipset_rules(self.dst_iptables)
-        self.pinger = helpers.Pinger(self.src_ns)
 
     def _create_ipset_manager_and_set(self, dst_ns, set_name):
         ipset = ipset_manager.IpsetManager(
@@ -61,28 +66,28 @@ class IpsetBase(base.BaseIPVethTestCase):
 class IpsetManagerTestCase(IpsetBase):
 
     def test_add_member_allows_ping(self):
-        self.pinger.assert_no_ping(self.DST_ADDRESS)
-        self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
-        self.pinger.assert_ping(self.DST_ADDRESS)
+        self.source.assert_no_ping(self.destination.ip)
+        self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
+        self.source.assert_ping(self.destination.ip)
 
     def test_del_member_denies_ping(self):
-        self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
-        self.pinger.assert_ping(self.DST_ADDRESS)
+        self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
+        self.source.assert_ping(self.destination.ip)
 
-        self.ipset._del_member_from_set(IPSET_SET, self.SRC_ADDRESS)
-        self.pinger.assert_no_ping(self.DST_ADDRESS)
+        self.ipset._del_member_from_set(IPSET_SET, self.source.ip)
+        self.source.assert_no_ping(self.destination.ip)
 
     def test_refresh_ipset_allows_ping(self):
         self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
-        self.pinger.assert_no_ping(self.DST_ADDRESS)
+        self.source.assert_no_ping(self.destination.ip)
 
-        self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.SRC_ADDRESS],
+        self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip],
                                 IPSET_ETHERTYPE)
-        self.pinger.assert_ping(self.DST_ADDRESS)
+        self.source.assert_ping(self.destination.ip)
 
-        self.ipset._refresh_set(IPSET_SET, [self.SRC_ADDRESS, UNRELATED_IP],
+        self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP],
                                 IPSET_ETHERTYPE)
-        self.pinger.assert_ping(self.DST_ADDRESS)
+        self.source.assert_ping(self.destination.ip)
 
     def test_destroy_ipset_set(self):
         self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)
index e38058971272d9f110d65c7077b52c0ca4924a3c..43a8dc7b4f49ccb1ff3bc7afbe76602a02cf62cb 100644 (file)
@@ -16,15 +16,19 @@ import os.path
 
 import testtools
 
+from neutron.agent.linux import ip_lib
 from neutron.agent.linux import iptables_manager
 from neutron.agent.linux import utils
 from neutron.tests import base
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
 from neutron.tests.functional.agent.linux import base as linux_base
 from neutron.tests.functional.agent.linux.bin import ipt_binname
 from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.functional import base as functional_base
 
 
-class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
+class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
     DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
                               'egress': 'OUTPUT'}
     PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
@@ -32,17 +36,21 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
 
     def setUp(self):
         super(IptablesManagerTestCase, self).setUp()
-        self.client_ns, self.server_ns = self.prepare_veth_pairs()
+
+        bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+        self.client, self.server = self.useFixture(
+            machine_fixtures.PeerMachines(bridge)).machines
+
         self.client_fw, self.server_fw = self.create_firewalls()
         # The port is used in isolated namespace that precludes possibility of
         # port conflicts
-        self.port = helpers.get_free_namespace_port(self.server_ns.namespace)
+        self.port = helpers.get_free_namespace_port(self.server.namespace)
 
     def create_firewalls(self):
         client_iptables = iptables_manager.IptablesManager(
-            namespace=self.client_ns.namespace)
+            namespace=self.client.namespace)
         server_iptables = iptables_manager.IptablesManager(
-            namespace=self.server_ns.namespace)
+            namespace=self.server.namespace)
 
         return client_iptables, server_iptables
 
@@ -71,49 +79,48 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
         return chain, rule
 
     def _test_with_nc(self, fw_manager, direction, port, udp):
-        netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
-                                      self.DST_ADDRESS, self.port,
-                                      run_as_root=True, udp=udp)
+        netcat = helpers.NetcatTester(
+            ip_lib.IPWrapper(self.client.namespace),
+            ip_lib.IPWrapper(self.server.namespace),
+            self.server.ip, self.port, run_as_root=True, udp=udp)
         self.addCleanup(netcat.stop_processes)
         protocol = 'tcp'
         if udp:
             protocol = 'udp'
         self.assertTrue(netcat.test_connectivity())
         self.filter_add_rule(
-            fw_manager, self.DST_ADDRESS, direction, protocol, port)
+            fw_manager, self.server.ip, direction, protocol, port)
         with testtools.ExpectedException(RuntimeError):
             netcat.test_connectivity()
         self.filter_remove_rule(
-            fw_manager, self.DST_ADDRESS, direction, protocol, port)
+            fw_manager, self.server.ip, direction, protocol, port)
         self.assertTrue(netcat.test_connectivity(True))
 
     def test_icmp(self):
-        pinger = helpers.Pinger(self.client_ns)
-        pinger.assert_ping(self.DST_ADDRESS)
+        self.client.assert_ping(self.server.ip)
         self.server_fw.ipv4['filter'].add_rule('INPUT',
                                                linux_base.ICMP_BLOCK_RULE)
         self.server_fw.apply()
-        pinger.assert_no_ping(self.DST_ADDRESS)
+        self.client.assert_no_ping(self.server.ip)
         self.server_fw.ipv4['filter'].remove_rule('INPUT',
                                                   linux_base.ICMP_BLOCK_RULE)
         self.server_fw.apply()
-        pinger.assert_ping(self.DST_ADDRESS)
+        self.client.assert_ping(self.server.ip)
 
     def test_mangle_icmp(self):
-        pinger = helpers.Pinger(self.client_ns)
-        pinger.assert_ping(self.DST_ADDRESS)
+        self.client.assert_ping(self.server.ip)
         self.server_fw.ipv4['mangle'].add_rule('INPUT',
                                                linux_base.ICMP_MARK_RULE)
         self.server_fw.ipv4['filter'].add_rule('INPUT',
                                                linux_base.MARKED_BLOCK_RULE)
         self.server_fw.apply()
-        pinger.assert_no_ping(self.DST_ADDRESS)
+        self.client.assert_no_ping(self.server.ip)
         self.server_fw.ipv4['mangle'].remove_rule('INPUT',
                                                   linux_base.ICMP_MARK_RULE)
         self.server_fw.ipv4['filter'].remove_rule('INPUT',
                                                   linux_base.MARKED_BLOCK_RULE)
         self.server_fw.apply()
-        pinger.assert_ping(self.DST_ADDRESS)
+        self.client.assert_ping(self.server.ip)
 
     def test_tcp_input_port(self):
         self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False)
index 4d2c815e8ab0fda99c66fb850c088a225b0a2386..2d82fdd73b8e4dc84b5daa8baf81f76b25f74d3c 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from neutron.agent.linux import ip_lib
 from neutron.agent.linux import iptables_firewall
 from neutron.agent import securitygroups_rpc as sg_cfg
 from neutron.tests.common import machine_fixtures
 from neutron.tests.common import net_helpers
-from neutron.tests.functional.agent.linux import helpers
 from neutron.tests.functional import base
 from oslo_config import cfg
 
@@ -63,8 +61,6 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase):
     # setup firewall on bridge and send packet from src_veth and observe
     # if sent packet can be observed on dst_veth
     def test_port_sec_within_firewall(self):
-        client_ip_wrapper = ip_lib.IPWrapper(self.client.namespace)
-        pinger = helpers.Pinger(client_ip_wrapper)
 
         # update the sg_group to make ping pass
         sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
@@ -76,13 +72,13 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase):
                                                 self.FAKE_SECURITY_GROUP_ID,
                                                 sg_rules)
         self.firewall.prepare_port_filter(self.src_port_desc)
-        pinger.assert_ping(self.server.ip)
+        self.client.assert_ping(self.server.ip)
 
         # modify the src_veth's MAC and test again
         self._set_src_mac(self.MAC_SPOOFED)
-        pinger.assert_no_ping(self.server.ip)
+        self.client.assert_no_ping(self.server.ip)
 
         # update the port's port_security_enabled value and test again
         self.src_port_desc['port_security_enabled'] = False
         self.firewall.update_port_filter(self.src_port_desc)
-        pinger.assert_ping(self.server.ip)
+        self.client.assert_ping(self.server.ip)
index c012880a555777f0cadc361061619489ce4590c2..699e4c6ba34c2a18c03ad7b8447924dc44d0d0cc 100644 (file)
 
 from neutron.cmd.sanity import checks
 from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
+from neutron.tests.common import machine_fixtures
 from neutron.tests.common import net_helpers
-from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
 from neutron.tests.functional.agent import test_ovs_lib
+from neutron.tests.functional import base
 
 
 class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
-                       base.BaseIPVethTestCase):
+                       base.BaseSudoTestCase):
 
     def setUp(self):
         if not checks.arp_header_match_supported():
@@ -35,7 +35,8 @@ class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
         self.dst_addr = '192.168.0.2'
         self.src_ns = self._create_namespace()
         self.dst_ns = self._create_namespace()
-        self.pinger = helpers.Pinger(self.src_ns, max_attempts=2)
+        self.pinger = machine_fixtures.Pinger(
+            self.src_ns.namespace, max_attempts=2)
         self.src_p = self.useFixture(
             net_helpers.OVSPortFixture(self.br, self.src_ns.namespace)).port
         self.dst_p = self.useFixture(