From e3fa0112820e6ae95c61b5ebc150e4ba206b3c3c Mon Sep 17 00:00:00 2001 From: Jakub Libosvar Date: Fri, 3 Oct 2014 18:31:10 +0200 Subject: [PATCH] Add functional tests for IptablesManager using tcp/udp This commit adds tests for filter table using tcp and udp protocols. Part of it is a NetcatTester class providing ability to test connection between two veth pairs in namespaces. Change-Id: I6db1e7ffef5e1243478e15a994787396b0908656 Related-Bug: 1243216 --- neutron/tests/contrib/filters.template | 6 + .../tests/functional/agent/linux/helpers.py | 116 +++++++++++++++- .../functional/agent/linux/test_iptables.py | 124 +++++++++++++++--- 3 files changed, 224 insertions(+), 22 deletions(-) diff --git a/neutron/tests/contrib/filters.template b/neutron/tests/contrib/filters.template index 78189d760..45835118f 100644 --- a/neutron/tests/contrib/filters.template +++ b/neutron/tests/contrib/filters.template @@ -18,3 +18,9 @@ ping_filter: CommandFilter, ping, root curl_filter: CommandFilter, curl, root tee_filter: CommandFilter, tee, root tee_kill: KillFilter, root, tee, -9 +nc_filter: CommandFilter, nc, root +# netcat has different binaries depending on linux distribution +nc_kill: KillFilter, root, nc, -9 +ncbsd_kill: KillFilter, root, nc.openbsd, -9 +ncat_kill: KillFilter, root, ncat, -9 +ss_filter: CommandFilter, ss, root diff --git a/neutron/tests/functional/agent/linux/helpers.py b/neutron/tests/functional/agent/linux/helpers.py index 5e4b9bb35..8726448a8 100644 --- a/neutron/tests/functional/agent/linux/helpers.py +++ b/neutron/tests/functional/agent/linux/helpers.py @@ -12,19 +12,63 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import eventlet import functools import os +import random +import re import select import shlex import subprocess +import eventlet + +from neutron.agent.linux import ip_lib from neutron.agent.linux import utils CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20) CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5) READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5) +SS_SOURCE_PORT_PATTERN = re.compile( + r'^.*\s+\d+\s+.*:(?P\d+)\s+[0-9:].*') + + +def get_free_namespace_port(tcp=True, root_helper=None, namespace=None): + """Return an unused port from given namespace + + WARNING: This function returns a port that is free at the execution time of + this function. If this port is used later for binding then there + is a potential danger that port will be no longer free. It's up to + the programmer to handle error if port is already in use. + + :param tcp: Return free port for TCP protocol if set to True, return free + port for UDP protocol if set to False + """ + if tcp: + param = '-tna' + else: + param = '-una' + + ip_wrapper = ip_lib.IPWrapper(root_helper, namespace) + output = ip_wrapper.netns.execute(['ss', param]) + used_ports = _get_source_ports_from_ss_output(output) + + return get_unused_port(used_ports) + + +def _get_source_ports_from_ss_output(output): + ports = set() + for line in output.splitlines(): + match = SS_SOURCE_PORT_PATTERN.match(line) + if match: + ports.add(match.group('port')) + return ports + + +def get_unused_port(used, start=1024, end=65535): + candidates = set(range(start, end + 1)) + return random.choice(list(candidates - used)) + def wait_until_true(predicate, timeout=60, sleep=1, exception=None): """ @@ -165,3 +209,73 @@ class RootHelperProcess(subprocess.Popen): "in %d seconds" % (self.cmd, timeout))) self.child_pid = utils.get_root_helper_child_pid( self.pid, root_helper=self.root_helper) + + +class NetcatTester(object): + TESTING_STRING = 'foo' + + def __init__(self, client_namespace, server_namespace, address, port, + root_helper='', udp=False): + self.client_namespace = client_namespace + self.server_namespace = server_namespace + self._client_process = None + self._server_process = None + self.address = address + self.port = str(port) + self.root_helper = root_helper + self.udp = udp + + @property + def client_process(self): + if not self._client_process: + if not self._server_process: + self._spawn_server_process() + self._client_process = self._spawn_nc_in_namespace( + self.client_namespace.namespace) + return self._client_process + + @property + def server_process(self): + if not self._server_process: + self._spawn_server_process() + return self._server_process + + def _spawn_server_process(self): + self._server_process = self._spawn_nc_in_namespace( + self.server_namespace.namespace, listen=True) + + def test_connectivity(self, respawn=False): + stop_required = (respawn and self._client_process and + self._client_process.poll() is not None) + if stop_required: + self.stop_processes() + + self.client_process.writeline(self.TESTING_STRING) + message = self.server_process.read_stdout(READ_TIMEOUT).strip() + self.server_process.writeline(message) + message = self.client_process.read_stdout(READ_TIMEOUT).strip() + + return message == self.TESTING_STRING + + def _spawn_nc_in_namespace(self, namespace, listen=False): + cmd = ['nc', self.address, self.port] + if self.udp: + cmd.append('-u') + if listen: + cmd.append('-l') + if not self.udp: + cmd.append('-k') + else: + cmd.extend(['-w', '20']) + proc = RootHelperProcess(cmd, namespace=namespace, + root_helper=self.root_helper) + return proc + + def stop_processes(self): + for proc_attr in ('_client_process', '_server_process'): + proc = getattr(self, proc_attr) + if proc: + if proc.poll() is None: + proc.kill() + proc.wait() + setattr(self, proc_attr, None) diff --git a/neutron/tests/functional/agent/linux/test_iptables.py b/neutron/tests/functional/agent/linux/test_iptables.py index 71da1d7ae..f22dcab1d 100644 --- a/neutron/tests/functional/agent/linux/test_iptables.py +++ b/neutron/tests/functional/agent/linux/test_iptables.py @@ -12,39 +12,121 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import testtools from neutron.agent.linux import iptables_manager from neutron.tests.functional.agent.linux import base +from neutron.tests.functional.agent.linux import helpers class IptablesManagerTestCase(base.BaseIPVethTestCase): + DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT', + 'egress': 'OUTPUT'} + PROTOCOL_BLOCK_RULE = '-p %s -j DROP' + PROTOCOL_PORT_BLOCK_RULE = '-p %s --dport %d -j DROP' def setUp(self): super(IptablesManagerTestCase, self).setUp() - self.src_ns, self.dst_ns = self.prepare_veth_pairs() - self.iptables = iptables_manager.IptablesManager( + self.client_ns, self.server_ns = self.prepare_veth_pairs() + self.client_fw, self.server_fw = self.create_firewalls() + # The port is used in isolated namespace that precludes possibility of + # port conflicts + self.port = helpers.get_free_namespace_port(self.server_ns.namespace) + + def create_firewalls(self): + client_iptables = iptables_manager.IptablesManager( + root_helper=self.root_helper, + namespace=self.client_ns.namespace) + server_iptables = iptables_manager.IptablesManager( root_helper=self.root_helper, - namespace=self.dst_ns.namespace) + namespace=self.server_ns.namespace) + + return client_iptables, server_iptables + + def filter_add_rule(self, fw_manager, address, direction, protocol, port): + self._ipv4_filter_execute(fw_manager, 'add_rule', direction, protocol, + port) + + def filter_remove_rule(self, fw_manager, address, direction, protocol, + port): + self._ipv4_filter_execute(fw_manager, 'remove_rule', direction, + protocol, port) + + def _ipv4_filter_execute(self, fw_manager, method, direction, protocol, + port): + chain, rule = self._get_chain_and_rule(direction, protocol, port) + method = getattr(fw_manager.ipv4['filter'], method) + method(chain, rule) + fw_manager.apply() + + def _get_chain_and_rule(self, direction, protocol, port): + chain = self.DIRECTION_CHAIN_MAPPER[direction] + if port: + rule = self.PROTOCOL_PORT_BLOCK_RULE % (protocol, port) + else: + rule = self.PROTOCOL_BLOCK_RULE % protocol + return chain, rule + + def _test_with_nc(self, fw_manager, direction, port, udp): + netcat = helpers.NetcatTester(self.client_ns, self.server_ns, + self.DST_ADDRESS, self.port, + self.root_helper, udp=udp) + self.addCleanup(netcat.stop_processes) + protocol = 'tcp' + if udp: + protocol = 'udp' + self.assertTrue(netcat.test_connectivity()) + self.filter_add_rule( + fw_manager, self.DST_ADDRESS, direction, protocol, port) + with testtools.ExpectedException(RuntimeError): + netcat.test_connectivity() + self.filter_remove_rule( + fw_manager, self.DST_ADDRESS, direction, protocol, port) + self.assertTrue(netcat.test_connectivity(True)) def test_icmp(self): - self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS) - self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE) - self.iptables.apply() - self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS) - self.iptables.ipv4['filter'].remove_rule('INPUT', + self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS) + self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE) + self.server_fw.apply() + self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS) + self.server_fw.ipv4['filter'].remove_rule('INPUT', base.ICMP_BLOCK_RULE) - self.iptables.apply() - self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS) + self.server_fw.apply() + self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS) def test_mangle_icmp(self): - self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS) - self.iptables.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE) - self.iptables.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE) - self.iptables.apply() - self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS) - self.iptables.ipv4['mangle'].remove_rule('INPUT', - base.ICMP_MARK_RULE) - self.iptables.ipv4['filter'].remove_rule('INPUT', - base.MARKED_BLOCK_RULE) - self.iptables.apply() - self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS) + self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS) + self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE) + self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE) + self.server_fw.apply() + self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS) + self.server_fw.ipv4['mangle'].remove_rule('INPUT', + base.ICMP_MARK_RULE) + self.server_fw.ipv4['filter'].remove_rule('INPUT', + base.MARKED_BLOCK_RULE) + self.server_fw.apply() + self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS) + + def test_tcp_input_port(self): + self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False) + + def test_tcp_output_port(self): + self._test_with_nc(self.client_fw, 'egress', self.port, udp=False) + + def test_tcp_input(self): + self._test_with_nc(self.server_fw, 'ingress', port=None, udp=False) + + def test_tcp_output(self): + self._test_with_nc(self.client_fw, 'egress', port=None, udp=False) + + def test_udp_input_port(self): + self._test_with_nc(self.server_fw, 'ingress', self.port, udp=True) + + def test_udp_output_port(self): + self._test_with_nc(self.client_fw, 'egress', self.port, udp=True) + + def test_udp_input(self): + self._test_with_nc(self.server_fw, 'ingress', port=None, udp=True) + + def test_udp_output(self): + self._test_with_nc(self.client_fw, 'egress', port=None, udp=True) -- 2.45.2