# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import eventlet
import functools
import os
+import random
+import re
import select
import shlex
import subprocess
+import eventlet
+
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5)
+SS_SOURCE_PORT_PATTERN = re.compile(
+ r'^.*\s+\d+\s+.*:(?P<port>\d+)\s+[0-9:].*')
+
+
+def get_free_namespace_port(tcp=True, root_helper=None, namespace=None):
+ """Return an unused port from given namespace
+
+ WARNING: This function returns a port that is free at the execution time of
+ this function. If this port is used later for binding then there
+ is a potential danger that port will be no longer free. It's up to
+ the programmer to handle error if port is already in use.
+
+ :param tcp: Return free port for TCP protocol if set to True, return free
+ port for UDP protocol if set to False
+ """
+ if tcp:
+ param = '-tna'
+ else:
+ param = '-una'
+
+ ip_wrapper = ip_lib.IPWrapper(root_helper, namespace)
+ output = ip_wrapper.netns.execute(['ss', param])
+ used_ports = _get_source_ports_from_ss_output(output)
+
+ return get_unused_port(used_ports)
+
+
+def _get_source_ports_from_ss_output(output):
+ ports = set()
+ for line in output.splitlines():
+ match = SS_SOURCE_PORT_PATTERN.match(line)
+ if match:
+ ports.add(match.group('port'))
+ return ports
+
+
+def get_unused_port(used, start=1024, end=65535):
+ candidates = set(range(start, end + 1))
+ return random.choice(list(candidates - used))
+
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""
"in %d seconds" % (self.cmd, timeout)))
self.child_pid = utils.get_root_helper_child_pid(
self.pid, root_helper=self.root_helper)
+
+
+class NetcatTester(object):
+ TESTING_STRING = 'foo'
+
+ def __init__(self, client_namespace, server_namespace, address, port,
+ root_helper='', udp=False):
+ self.client_namespace = client_namespace
+ self.server_namespace = server_namespace
+ self._client_process = None
+ self._server_process = None
+ self.address = address
+ self.port = str(port)
+ self.root_helper = root_helper
+ self.udp = udp
+
+ @property
+ def client_process(self):
+ if not self._client_process:
+ if not self._server_process:
+ self._spawn_server_process()
+ self._client_process = self._spawn_nc_in_namespace(
+ self.client_namespace.namespace)
+ return self._client_process
+
+ @property
+ def server_process(self):
+ if not self._server_process:
+ self._spawn_server_process()
+ return self._server_process
+
+ def _spawn_server_process(self):
+ self._server_process = self._spawn_nc_in_namespace(
+ self.server_namespace.namespace, listen=True)
+
+ def test_connectivity(self, respawn=False):
+ stop_required = (respawn and self._client_process and
+ self._client_process.poll() is not None)
+ if stop_required:
+ self.stop_processes()
+
+ self.client_process.writeline(self.TESTING_STRING)
+ message = self.server_process.read_stdout(READ_TIMEOUT).strip()
+ self.server_process.writeline(message)
+ message = self.client_process.read_stdout(READ_TIMEOUT).strip()
+
+ return message == self.TESTING_STRING
+
+ def _spawn_nc_in_namespace(self, namespace, listen=False):
+ cmd = ['nc', self.address, self.port]
+ if self.udp:
+ cmd.append('-u')
+ if listen:
+ cmd.append('-l')
+ if not self.udp:
+ cmd.append('-k')
+ else:
+ cmd.extend(['-w', '20'])
+ proc = RootHelperProcess(cmd, namespace=namespace,
+ root_helper=self.root_helper)
+ return proc
+
+ def stop_processes(self):
+ for proc_attr in ('_client_process', '_server_process'):
+ proc = getattr(self, proc_attr)
+ if proc:
+ if proc.poll() is None:
+ proc.kill()
+ proc.wait()
+ setattr(self, proc_attr, None)
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import testtools
from neutron.agent.linux import iptables_manager
from neutron.tests.functional.agent.linux import base
+from neutron.tests.functional.agent.linux import helpers
class IptablesManagerTestCase(base.BaseIPVethTestCase):
+ DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
+ 'egress': 'OUTPUT'}
+ PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
+ PROTOCOL_PORT_BLOCK_RULE = '-p %s --dport %d -j DROP'
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
- self.src_ns, self.dst_ns = self.prepare_veth_pairs()
- self.iptables = iptables_manager.IptablesManager(
+ self.client_ns, self.server_ns = self.prepare_veth_pairs()
+ self.client_fw, self.server_fw = self.create_firewalls()
+ # The port is used in isolated namespace that precludes possibility of
+ # port conflicts
+ self.port = helpers.get_free_namespace_port(self.server_ns.namespace)
+
+ def create_firewalls(self):
+ client_iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ namespace=self.client_ns.namespace)
+ server_iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
- namespace=self.dst_ns.namespace)
+ namespace=self.server_ns.namespace)
+
+ return client_iptables, server_iptables
+
+ def filter_add_rule(self, fw_manager, address, direction, protocol, port):
+ self._ipv4_filter_execute(fw_manager, 'add_rule', direction, protocol,
+ port)
+
+ def filter_remove_rule(self, fw_manager, address, direction, protocol,
+ port):
+ self._ipv4_filter_execute(fw_manager, 'remove_rule', direction,
+ protocol, port)
+
+ def _ipv4_filter_execute(self, fw_manager, method, direction, protocol,
+ port):
+ chain, rule = self._get_chain_and_rule(direction, protocol, port)
+ method = getattr(fw_manager.ipv4['filter'], method)
+ method(chain, rule)
+ fw_manager.apply()
+
+ def _get_chain_and_rule(self, direction, protocol, port):
+ chain = self.DIRECTION_CHAIN_MAPPER[direction]
+ if port:
+ rule = self.PROTOCOL_PORT_BLOCK_RULE % (protocol, port)
+ else:
+ rule = self.PROTOCOL_BLOCK_RULE % protocol
+ return chain, rule
+
+ def _test_with_nc(self, fw_manager, direction, port, udp):
+ netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
+ self.DST_ADDRESS, self.port,
+ self.root_helper, udp=udp)
+ self.addCleanup(netcat.stop_processes)
+ protocol = 'tcp'
+ if udp:
+ protocol = 'udp'
+ self.assertTrue(netcat.test_connectivity())
+ self.filter_add_rule(
+ fw_manager, self.DST_ADDRESS, direction, protocol, port)
+ with testtools.ExpectedException(RuntimeError):
+ netcat.test_connectivity()
+ self.filter_remove_rule(
+ fw_manager, self.DST_ADDRESS, direction, protocol, port)
+ self.assertTrue(netcat.test_connectivity(True))
def test_icmp(self):
- self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
- self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
- self.iptables.apply()
- self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
- self.iptables.ipv4['filter'].remove_rule('INPUT',
+ self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
+ self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
+ self.server_fw.apply()
+ self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS)
+ self.server_fw.ipv4['filter'].remove_rule('INPUT',
base.ICMP_BLOCK_RULE)
- self.iptables.apply()
- self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+ self.server_fw.apply()
+ self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
def test_mangle_icmp(self):
- self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
- self.iptables.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
- self.iptables.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
- self.iptables.apply()
- self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
- self.iptables.ipv4['mangle'].remove_rule('INPUT',
- base.ICMP_MARK_RULE)
- self.iptables.ipv4['filter'].remove_rule('INPUT',
- base.MARKED_BLOCK_RULE)
- self.iptables.apply()
- self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+ self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
+ self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
+ self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
+ self.server_fw.apply()
+ self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS)
+ self.server_fw.ipv4['mangle'].remove_rule('INPUT',
+ base.ICMP_MARK_RULE)
+ self.server_fw.ipv4['filter'].remove_rule('INPUT',
+ base.MARKED_BLOCK_RULE)
+ self.server_fw.apply()
+ self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
+
+ def test_tcp_input_port(self):
+ self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False)
+
+ def test_tcp_output_port(self):
+ self._test_with_nc(self.client_fw, 'egress', self.port, udp=False)
+
+ def test_tcp_input(self):
+ self._test_with_nc(self.server_fw, 'ingress', port=None, udp=False)
+
+ def test_tcp_output(self):
+ self._test_with_nc(self.client_fw, 'egress', port=None, udp=False)
+
+ def test_udp_input_port(self):
+ self._test_with_nc(self.server_fw, 'ingress', self.port, udp=True)
+
+ def test_udp_output_port(self):
+ self._test_with_nc(self.client_fw, 'egress', self.port, udp=True)
+
+ def test_udp_input(self):
+ self._test_with_nc(self.server_fw, 'ingress', port=None, udp=True)
+
+ def test_udp_output(self):
+ self._test_with_nc(self.client_fw, 'egress', port=None, udp=True)