From bc7c8acf90b55a27298484a77a32c089914afb67 Mon Sep 17 00:00:00 2001 From: Jakub Libosvar Date: Fri, 29 Aug 2014 18:50:13 +0200 Subject: [PATCH] Add functional test for IptablesManager Introduce test of IptablesManager using filtering of ICMP packets in namespaces. Partial-bug: #1243216 Change-Id: I90ab0d397780247de619bd90f5febd0e59b84fc7 --- .../functional/agent/linux/test_iptables.py | 76 +++++++++++++++++++ .../tests/functional/contrib/filters.template | 3 + 2 files changed, 79 insertions(+) create mode 100644 neutron/tests/functional/agent/linux/test_iptables.py diff --git a/neutron/tests/functional/agent/linux/test_iptables.py b/neutron/tests/functional/agent/linux/test_iptables.py new file mode 100644 index 000000000..6e600eb5f --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_iptables.py @@ -0,0 +1,76 @@ +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import ip_lib +from neutron.agent.linux import iptables_manager +from neutron.openstack.common import uuidutils +from neutron.tests.functional.agent.linux import base + +ICMP_BLOCK_RULE = '-p icmp -j DROP' +SRC_VETH_NAME = 'source' +DEST_VETH_NAME = 'destination' + + +class IpBase(base.BaseLinuxTestCase): + SRC_ADDRESS = '192.168.0.1' + DST_ADDRESS = '192.168.0.2' + + @staticmethod + def _set_ip_up(device, cidr, broadcast='192.168.0.255', ip_version=4): + device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast) + device.link.set_up() + + @staticmethod + def _ping_destination(src_namespace, dest_address, attempts=3): + src_namespace.netns.execute(['ping', '-c', attempts, dest_address]) + + def _create_namespace(self): + ip_cmd = ip_lib.IPWrapper(self.root_helper) + name = "func-%s" % uuidutils.generate_uuid() + namespace = ip_cmd.ensure_namespace(name) + self.addCleanup(namespace.netns.delete, namespace.namespace) + + return namespace + + def _prepare_veth_pairs(self): + src_ns = self._create_namespace() + dst_ns = self._create_namespace() + src_veth, dst_veth = src_ns.add_veth(SRC_VETH_NAME, + DEST_VETH_NAME, + dst_ns.namespace) + self._set_ip_up(src_veth, '%s/24' % self.SRC_ADDRESS) + self._set_ip_up(dst_veth, '%s/24' % self.DST_ADDRESS) + + return src_ns, dst_ns + + +class IptablesManagerTestCase(IpBase): + def setUp(self): + super(IptablesManagerTestCase, self).setUp() + self.check_sudo_enabled() + self.src_ns, self.dst_ns = self._prepare_veth_pairs() + self.iptables = iptables_manager.IptablesManager( + root_helper=self.root_helper, + namespace=self.dst_ns.namespace) + + def test_icmp(self): + self._ping_destination(self.src_ns, self.DST_ADDRESS) + self.iptables.ipv4['filter'].add_rule('INPUT', ICMP_BLOCK_RULE) + self.iptables.apply() + self.assertRaises(RuntimeError, self._ping_destination, self.src_ns, + self.DST_ADDRESS) + self.iptables.ipv4['filter'].remove_rule('INPUT', ICMP_BLOCK_RULE) + self.iptables.apply() + self._ping_destination(self.src_ns, self.DST_ADDRESS) diff --git a/neutron/tests/functional/contrib/filters.template b/neutron/tests/functional/contrib/filters.template index 2f0ebbedf..a57e71cf0 100644 --- a/neutron/tests/functional/contrib/filters.template +++ b/neutron/tests/functional/contrib/filters.template @@ -10,3 +10,6 @@ # processes that they launch with their containing tox environment's # python. kill_tox_python: KillFilter, root, $BASE_PATH/bin/python, -9 + +# enable ping from namespace +ping_filter: CommandFilter, ping, root -- 2.45.2