]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add functional testing to ipset_manager
authorMiguel Angel Ajo <mangelajo@redhat.com>
Tue, 9 Sep 2014 13:22:03 +0000 (15:22 +0200)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 12 Sep 2014 06:44:20 +0000 (08:44 +0200)
Add functional testing to the ipset_manager module to verify
it works as expected in combination with iptables matching.

Implements:  blueprint add-ipset-to-security

Change-Id: Iec791ec30f87f6c00805f1d52c23b84aa7bc19de

neutron/agent/linux/ipset_manager.py
neutron/tests/functional/agent/linux/base.py
neutron/tests/functional/agent/linux/pinger.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_ipset.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_iptables.py

index 8728567f9a44e1f9feecf29a5d661cf6535f234f..ddd736e8e8afca8ad2a2d56dbc0fd0845d513c94 100644 (file)
@@ -18,9 +18,10 @@ from neutron.common import utils
 class IpsetManager(object):
     """Wrapper for ipset."""
 
-    def __init__(self, execute=None, root_helper=None):
+    def __init__(self, execute=None, root_helper=None, namespace=None):
         self.execute = execute or linux_utils.execute
         self.root_helper = root_helper
+        self.namespace = namespace
 
     @utils.synchronized('ipset', external=True)
     def create_ipset_chain(self, chain_name, ethertype):
@@ -57,7 +58,13 @@ class IpsetManager(object):
 
     def _apply(self, cmd, input=None):
         input = '\n'.join(input) if input else None
-        self.execute(cmd, root_helper=self.root_helper, process_input=input)
+        cmd_ns = []
+        if self.namespace:
+            cmd_ns.extend(['ip', 'netns', 'exec', self.namespace])
+        cmd_ns.extend(cmd)
+        self.execute(cmd_ns,
+                     root_helper=self.root_helper,
+                     process_input=input)
 
     def _get_ipset_chain_type(self, ethertype):
         return 'inet6' if ethertype == 'IPv6' else 'inet'
index ffc1038d6c57cab099f768a06526c5daaea5f57e..ce82d5b2aa200e0947005143bd6aa1710647cf82 100644 (file)
 
 import random
 
+from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
 from neutron.agent.linux import utils
 from neutron.common import constants as n_const
+from neutron.openstack.common import uuidutils
+from neutron.tests.functional.agent.linux import pinger
 from neutron.tests.functional import base as functional_base
 
 
 BR_PREFIX = 'test-br'
+ICMP_BLOCK_RULE = '-p icmp -j DROP'
 
 
 class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
@@ -64,3 +68,53 @@ class BaseOVSLinuxTestCase(BaseLinuxTestCase):
         br = self.create_resource(br_prefix, self.ovs.add_bridge)
         self.addCleanup(br.destroy)
         return br
+
+
+class BaseIPVethTestCase(BaseLinuxTestCase):
+    SRC_ADDRESS = '192.168.0.1'
+    DST_ADDRESS = '192.168.0.2'
+    BROADCAST_ADDRESS = '192.168.0.255'
+    SRC_VETH = 'source'
+    DST_VETH = 'destination'
+
+    def setUp(self):
+        super(BaseIPVethTestCase, self).setUp()
+        self.check_sudo_enabled()
+        self.pinger = pinger.Pinger(self)
+
+    @staticmethod
+    def _set_ip_up(device, cidr, broadcast, ip_version=4):
+        device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
+        device.link.set_up()
+
+    def _create_namespace(self):
+        ip_cmd = ip_lib.IPWrapper(self.root_helper)
+        name = "func-%s" % uuidutils.generate_uuid()
+        namespace = ip_cmd.ensure_namespace(name)
+        self.addCleanup(namespace.netns.delete, namespace.namespace)
+
+        return namespace
+
+    def prepare_veth_pairs(self, src_addr=None,
+                           dst_addr=None,
+                           broadcast_addr=None,
+                           src_ns=None, dst_ns=None,
+                           src_veth=None,
+                           dst_veth=None):
+
+        src_addr = src_addr or self.SRC_ADDRESS
+        dst_addr = dst_addr or self.DST_ADDRESS
+        broadcast_addr = broadcast_addr or self.BROADCAST_ADDRESS
+        src_veth = src_veth or self.SRC_VETH
+        dst_veth = dst_veth or self.DST_VETH
+        src_ns = src_ns or self._create_namespace()
+        dst_ns = dst_ns or self._create_namespace()
+
+        src_veth, dst_veth = src_ns.add_veth(src_veth,
+                                             dst_veth,
+                                             dst_ns.namespace)
+
+        self._set_ip_up(src_veth, '%s/24' % src_addr, broadcast_addr)
+        self._set_ip_up(dst_veth, '%s/24' % dst_addr, broadcast_addr)
+
+        return src_ns, dst_ns
diff --git a/neutron/tests/functional/agent/linux/pinger.py b/neutron/tests/functional/agent/linux/pinger.py
new file mode 100644 (file)
index 0000000..a79e823
--- /dev/null
@@ -0,0 +1,42 @@
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+class Pinger(object):
+    def __init__(self, testcase, timeout=1, max_attempts=1):
+        self.testcase = testcase
+        self._timeout = timeout
+        self._max_attempts = max_attempts
+
+    def _ping_destination(self, src_namespace, dest_address):
+        src_namespace.netns.execute(['ping', '-c', self._max_attempts,
+                                     '-W', self._timeout, dest_address])
+
+    def assert_ping_from_ns(self, src_ns, dst_ip):
+        try:
+            self._ping_destination(src_ns, dst_ip)
+        except RuntimeError:
+            self.testcase.fail("destination ip %(dst_ip)s is not replying "
+                               "to ping from namespace %(src_ns)s" %
+                               {'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
+
+    def assert_no_ping_from_ns(self, src_ns, dst_ip):
+        try:
+            self._ping_destination(src_ns, dst_ip)
+            self.testcase.fail("destination ip %(dst_ip)s is replying to ping"
+                               "from namespace %(src_ns)s, but it shouldn't" %
+                               {'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
+        except RuntimeError:
+            pass
diff --git a/neutron/tests/functional/agent/linux/test_ipset.py b/neutron/tests/functional/agent/linux/test_ipset.py
new file mode 100644 (file)
index 0000000..d8bbfc9
--- /dev/null
@@ -0,0 +1,92 @@
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.agent.linux import ipset_manager
+from neutron.agent.linux import iptables_manager
+from neutron.tests.functional.agent.linux import base
+
+IPSET_CHAIN = 'test-chain'
+IPSET_ETHERTYPE = 'IPv4'
+ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_CHAIN
+UNRELATED_IP = '1.1.1.1'
+
+
+class IpsetBase(base.BaseIPVethTestCase):
+
+    def setUp(self):
+        super(IpsetBase, self).setUp()
+
+        self.src_ns, self.dst_ns = self.prepare_veth_pairs()
+        self.ipset = self._create_ipset_manager_and_chain(self.dst_ns,
+                                                          IPSET_CHAIN)
+
+        self.dst_iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            namespace=self.dst_ns.namespace)
+
+        self._add_iptables_ipset_rules(self.dst_iptables)
+
+    def _create_ipset_manager_and_chain(self, dst_ns, chain_name):
+        ipset = ipset_manager.IpsetManager(
+            root_helper=self.root_helper,
+            namespace=dst_ns.namespace)
+
+        ipset.create_ipset_chain(chain_name, IPSET_ETHERTYPE)
+        return ipset
+
+    @staticmethod
+    def _remove_iptables_ipset_rules(iptables_manager):
+        iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
+        iptables_manager.apply()
+
+    @staticmethod
+    def _add_iptables_ipset_rules(iptables_manager):
+        iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
+        iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
+        iptables_manager.apply()
+
+
+class IpsetManagerTestCase(IpsetBase):
+
+    def test_add_member_allows_ping(self):
+        self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+        self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+    def test_del_member_denies_ping(self):
+        self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+        self.ipset.del_ipset_chain_member(IPSET_CHAIN, self.SRC_ADDRESS)
+        self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+    def test_refresh_ipset_allows_ping(self):
+        self.ipset.refresh_ipset_chain_by_name(IPSET_CHAIN, [UNRELATED_IP],
+                                               IPSET_ETHERTYPE)
+        self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+        self.ipset.refresh_ipset_chain_by_name(
+            IPSET_CHAIN, [UNRELATED_IP, self.SRC_ADDRESS], IPSET_ETHERTYPE)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+        self.ipset.refresh_ipset_chain_by_name(
+            IPSET_CHAIN, [self.SRC_ADDRESS, UNRELATED_IP], IPSET_ETHERTYPE)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+    def test_destroy_ipset_chain(self):
+        self.assertRaises(RuntimeError,
+                          self.ipset.destroy_ipset_chain_by_name, IPSET_CHAIN)
+        self._remove_iptables_ipset_rules(self.dst_iptables)
+        self.ipset.destroy_ipset_chain_by_name(IPSET_CHAIN)
index 6e600eb5f9f1c71f91cd2bd3a195a3c086bbc9ee..4e7efb69ce90588b44f23dcb9aa70a80437b2452 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from neutron.agent.linux import ip_lib
 from neutron.agent.linux import iptables_manager
-from neutron.openstack.common import uuidutils
 from neutron.tests.functional.agent.linux import base
 
-ICMP_BLOCK_RULE = '-p icmp -j DROP'
-SRC_VETH_NAME = 'source'
-DEST_VETH_NAME = 'destination'
 
+class IptablesManagerTestCase(base.BaseIPVethTestCase):
 
-class IpBase(base.BaseLinuxTestCase):
-    SRC_ADDRESS = '192.168.0.1'
-    DST_ADDRESS = '192.168.0.2'
-
-    @staticmethod
-    def _set_ip_up(device, cidr, broadcast='192.168.0.255', ip_version=4):
-        device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
-        device.link.set_up()
-
-    @staticmethod
-    def _ping_destination(src_namespace, dest_address, attempts=3):
-        src_namespace.netns.execute(['ping', '-c', attempts, dest_address])
-
-    def _create_namespace(self):
-        ip_cmd = ip_lib.IPWrapper(self.root_helper)
-        name = "func-%s" % uuidutils.generate_uuid()
-        namespace = ip_cmd.ensure_namespace(name)
-        self.addCleanup(namespace.netns.delete, namespace.namespace)
-
-        return namespace
-
-    def _prepare_veth_pairs(self):
-        src_ns = self._create_namespace()
-        dst_ns = self._create_namespace()
-        src_veth, dst_veth = src_ns.add_veth(SRC_VETH_NAME,
-                                             DEST_VETH_NAME,
-                                             dst_ns.namespace)
-        self._set_ip_up(src_veth, '%s/24' % self.SRC_ADDRESS)
-        self._set_ip_up(dst_veth, '%s/24' % self.DST_ADDRESS)
-
-        return src_ns, dst_ns
-
-
-class IptablesManagerTestCase(IpBase):
     def setUp(self):
         super(IptablesManagerTestCase, self).setUp()
-        self.check_sudo_enabled()
-        self.src_ns, self.dst_ns = self._prepare_veth_pairs()
+        self.src_ns, self.dst_ns = self.prepare_veth_pairs()
         self.iptables = iptables_manager.IptablesManager(
             root_helper=self.root_helper,
             namespace=self.dst_ns.namespace)
 
     def test_icmp(self):
-        self._ping_destination(self.src_ns, self.DST_ADDRESS)
-        self.iptables.ipv4['filter'].add_rule('INPUT', ICMP_BLOCK_RULE)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+        self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
         self.iptables.apply()
-        self.assertRaises(RuntimeError, self._ping_destination, self.src_ns,
-                          self.DST_ADDRESS)
-        self.iptables.ipv4['filter'].remove_rule('INPUT', ICMP_BLOCK_RULE)
+        self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+        self.iptables.ipv4['filter'].remove_rule('INPUT',
+                                                 base.ICMP_BLOCK_RULE)
         self.iptables.apply()
-        self._ping_destination(self.src_ns, self.DST_ADDRESS)
+        self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)