]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Introduce connection testers module
authorJakub Libosvar <libosvar@redhat.com>
Tue, 9 Jun 2015 10:51:10 +0000 (10:51 +0000)
committerJakub Libosvar <libosvar@redhat.com>
Thu, 9 Jul 2015 09:52:55 +0000 (09:52 +0000)
This module provides tools for testing simple connectivity between two
endpoints via given technology. Current patch implements endpoints
connected through either linux bridge or openvswitch bridge.
Connectivity can be tested using icmp, arp, tcp and udp protocols.

Change-Id: I00e19fd9b80dc6f6743eb735523bd8f5ff096136

neutron/agent/firewall.py
neutron/agent/linux/iptables_firewall.py
neutron/tests/common/conn_testers.py [new file with mode: 0644]
neutron/tests/common/machine_fixtures.py
neutron/tests/common/net_helpers.py

index 8ce8e7b16bf51ee93ac0c34e391b4eb551783cbd..afb0f18f59e0d1949caf09e176a67c1ddf8c3ef7 100644 (file)
@@ -19,6 +19,10 @@ import contextlib
 import six
 
 
+INGRESS_DIRECTION = 'ingress'
+EGRESS_DIRECTION = 'egress'
+
+
 @six.add_metaclass(abc.ABCMeta)
 class FirewallDriver(object):
     """Firewall Driver base class.
index ff12802e1631f9d1fec73d8a40ae4482042954df..44dd35c0103fc122f2835d161e896d02242b7481 100644 (file)
@@ -32,24 +32,22 @@ from neutron.i18n import _LI
 
 LOG = logging.getLogger(__name__)
 SG_CHAIN = 'sg-chain'
-INGRESS_DIRECTION = 'ingress'
-EGRESS_DIRECTION = 'egress'
 SPOOF_FILTER = 'spoof-filter'
-CHAIN_NAME_PREFIX = {INGRESS_DIRECTION: 'i',
-                     EGRESS_DIRECTION: 'o',
+CHAIN_NAME_PREFIX = {firewall.INGRESS_DIRECTION: 'i',
+                     firewall.EGRESS_DIRECTION: 'o',
                      SPOOF_FILTER: 's'}
-DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
-                       'egress': 'dest_ip_prefix'}
-IPSET_DIRECTION = {INGRESS_DIRECTION: 'src',
-                   EGRESS_DIRECTION: 'dst'}
+DIRECTION_IP_PREFIX = {firewall.INGRESS_DIRECTION: 'source_ip_prefix',
+                       firewall.EGRESS_DIRECTION: 'dest_ip_prefix'}
+IPSET_DIRECTION = {firewall.INGRESS_DIRECTION: 'src',
+                   firewall.EGRESS_DIRECTION: 'dst'}
 LINUX_DEV_LEN = 14
 comment_rule = iptables_manager.comment_rule
 
 
 class IptablesFirewallDriver(firewall.FirewallDriver):
     """Driver which enforces security groups through iptables rules."""
-    IPTABLES_DIRECTION = {INGRESS_DIRECTION: 'physdev-out',
-                          EGRESS_DIRECTION: 'physdev-in'}
+    IPTABLES_DIRECTION = {firewall.INGRESS_DIRECTION: 'physdev-out',
+                          firewall.EGRESS_DIRECTION: 'physdev-in'}
 
     def __init__(self, namespace=None):
         self.iptables = iptables_manager.IptablesManager(
@@ -180,14 +178,14 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
     def _setup_chains_apply(self, ports, unfiltered_ports):
         self._add_chain_by_name_v4v6(SG_CHAIN)
         for port in ports.values():
-            self._setup_chain(port, INGRESS_DIRECTION)
-            self._setup_chain(port, EGRESS_DIRECTION)
+            self._setup_chain(port, firewall.INGRESS_DIRECTION)
+            self._setup_chain(port, firewall.EGRESS_DIRECTION)
             self.iptables.ipv4['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
             self.iptables.ipv6['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
 
         for port in unfiltered_ports.values():
-            self._add_accept_rule_port_sec(port, INGRESS_DIRECTION)
-            self._add_accept_rule_port_sec(port, EGRESS_DIRECTION)
+            self._add_accept_rule_port_sec(port, firewall.INGRESS_DIRECTION)
+            self._add_accept_rule_port_sec(port, firewall.EGRESS_DIRECTION)
 
     def _remove_chains(self):
         """Remove ingress and egress chain for a port."""
@@ -197,12 +195,12 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
 
     def _remove_chains_apply(self, ports, unfiltered_ports):
         for port in ports.values():
-            self._remove_chain(port, INGRESS_DIRECTION)
-            self._remove_chain(port, EGRESS_DIRECTION)
+            self._remove_chain(port, firewall.INGRESS_DIRECTION)
+            self._remove_chain(port, firewall.EGRESS_DIRECTION)
             self._remove_chain(port, SPOOF_FILTER)
         for port in unfiltered_ports.values():
-            self._remove_rule_port_sec(port, INGRESS_DIRECTION)
-            self._remove_rule_port_sec(port, EGRESS_DIRECTION)
+            self._remove_rule_port_sec(port, firewall.INGRESS_DIRECTION)
+            self._remove_rule_port_sec(port, firewall.EGRESS_DIRECTION)
         self._remove_chain_by_name_v4v6(SG_CHAIN)
 
     def _setup_chain(self, port, DIRECTION):
@@ -263,7 +261,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
         else:
             self._remove_rule_from_chain_v4v6('FORWARD', jump_rule, jump_rule)
 
-        if direction == EGRESS_DIRECTION:
+        if direction == firewall.EGRESS_DIRECTION:
             jump_rule = ['-m physdev --%s %s --physdev-is-bridged '
                          '-j ACCEPT' % (self.IPTABLES_DIRECTION[direction],
                                         device)]
@@ -300,7 +298,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
         self._add_rules_to_chain_v4v6(SG_CHAIN, jump_rule, jump_rule,
                                       comment=ic.SG_TO_VM_SG)
 
-        if direction == EGRESS_DIRECTION:
+        if direction == firewall.EGRESS_DIRECTION:
             self._add_rules_to_chain_v4v6('INPUT', jump_rule, jump_rule,
                                           comment=ic.INPUT_TO_SG)
 
@@ -458,11 +456,11 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
         ipv4_iptables_rules = []
         ipv6_iptables_rules = []
         # include fixed egress/ingress rules
-        if direction == EGRESS_DIRECTION:
+        if direction == firewall.EGRESS_DIRECTION:
             self._add_fixed_egress_rules(port,
                                          ipv4_iptables_rules,
                                          ipv6_iptables_rules)
-        elif direction == INGRESS_DIRECTION:
+        elif direction == firewall.INGRESS_DIRECTION:
             ipv6_iptables_rules += self._accept_inbound_icmpv6()
         # include IPv4 and IPv6 iptable rules from security group
         ipv4_iptables_rules += self._convert_sgr_to_iptables_rules(
@@ -717,7 +715,7 @@ class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
         return ('qvb' + port['device'])[:LINUX_DEV_LEN]
 
     def _get_jump_rule(self, port, direction):
-        if direction == INGRESS_DIRECTION:
+        if direction == firewall.INGRESS_DIRECTION:
             device = self._get_br_device_name(port)
         else:
             device = self._get_device_name(port)
@@ -740,11 +738,13 @@ class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
     def _add_chain(self, port, direction):
         super(OVSHybridIptablesFirewallDriver, self)._add_chain(port,
                                                                 direction)
-        if direction in [INGRESS_DIRECTION, EGRESS_DIRECTION]:
+        if direction in [firewall.INGRESS_DIRECTION,
+                         firewall.EGRESS_DIRECTION]:
             self._add_raw_chain_rules(port, direction)
 
     def _remove_chain(self, port, direction):
         super(OVSHybridIptablesFirewallDriver, self)._remove_chain(port,
                                                                    direction)
-        if direction in [INGRESS_DIRECTION, EGRESS_DIRECTION]:
+        if direction in [firewall.INGRESS_DIRECTION,
+                         firewall.EGRESS_DIRECTION]:
             self._remove_raw_chain_rules(port, direction)
diff --git a/neutron/tests/common/conn_testers.py b/neutron/tests/common/conn_testers.py
new file mode 100644 (file)
index 0000000..2de8f42
--- /dev/null
@@ -0,0 +1,265 @@
+# All Rights Reserved.
+#
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import functools
+
+import fixtures
+
+from neutron.agent import firewall
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
+
+
+class ConnectionTesterException(Exception):
+    pass
+
+
+def _validate_direction(f):
+    @functools.wraps(f)
+    def wrap(self, direction, *args, **kwargs):
+        if direction not in (firewall.INGRESS_DIRECTION,
+                             firewall.EGRESS_DIRECTION):
+            raise ConnectionTesterException('Unknown direction %s' % direction)
+        return f(self, direction, *args, **kwargs)
+    return wrap
+
+
+class ConnectionTester(fixtures.Fixture):
+    """Base class for testers
+
+    This class implements API for various methods for testing connectivity. The
+    concrete implementation relies on how encapsulated resources are
+    configured. That means child classes should define resources by themselves
+    (e.g. endpoints connected through linux bridge or ovs bridge).
+
+    """
+
+    UDP = net_helpers.NetcatTester.UDP
+    TCP = net_helpers.NetcatTester.TCP
+    ICMP = 'icmp'
+    ARP = 'arp'
+    INGRESS = firewall.INGRESS_DIRECTION
+    EGRESS = firewall.EGRESS_DIRECTION
+
+    def _setUp(self):
+        self._protocol_to_method = {
+            self.UDP: self._test_transport_connectivity,
+            self.TCP: self._test_transport_connectivity,
+            self.ICMP: self._test_icmp_connectivity,
+            self.ARP: self._test_arp_connectivity}
+        self._nc_testers = dict()
+        self.addCleanup(self.cleanup)
+
+    def cleanup(self):
+        for nc in self._nc_testers.values():
+            nc.stop_processes()
+
+    @property
+    def vm_namespace(self):
+        return self._vm.namespace
+
+    @property
+    def vm_ip_address(self):
+        return self._vm.ip
+
+    @property
+    def vm_ip_cidr(self):
+        return self._vm.ip_cidr
+
+    @vm_ip_cidr.setter
+    def vm_ip_cidr(self, ip_cidr):
+        self._vm.ip_cidr = ip_cidr
+
+    @property
+    def vm_mac_address(self):
+        return self._vm.port.link.address
+
+    @vm_mac_address.setter
+    def vm_mac_address(self, mac_address):
+        self._vm.mac_address = mac_address
+
+    @property
+    def peer_namespace(self):
+        return self._peer.namespace
+
+    @property
+    def peer_ip_address(self):
+        return self._peer.ip
+
+    def flush_arp_tables(self):
+        """Flush arptables in all used namespaces"""
+        for machine in (self._peer, self._vm):
+            machine.port.neigh.flush(4, 'all')
+
+    def _test_transport_connectivity(self, direction, protocol, src_port,
+                                     dst_port):
+        nc_tester = self._create_nc_tester(direction, protocol, src_port,
+                                           dst_port)
+        try:
+            nc_tester.test_connectivity()
+        except RuntimeError as exc:
+            raise ConnectionTesterException(
+                "%s connection over %s protocol with %s source port and "
+                "%s destination port can't be established: %s" % (
+                    direction, protocol, src_port, dst_port, exc))
+
+    @_validate_direction
+    def _get_namespace_and_address(self, direction):
+        if direction == self.INGRESS:
+            return self.peer_namespace, self.vm_ip_address
+        return self.vm_namespace, self.peer_ip_address
+
+    def _test_icmp_connectivity(self, direction, protocol, src_port, dst_port):
+        src_namespace, ip_address = self._get_namespace_and_address(direction)
+        try:
+            net_helpers.assert_ping(src_namespace, ip_address)
+        except RuntimeError:
+            raise ConnectionTesterException(
+                "ICMP packets can't get from %s namespace to %s address" % (
+                    src_namespace, ip_address))
+
+    def _test_arp_connectivity(self, direction, protocol, src_port, dst_port):
+        src_namespace, ip_address = self._get_namespace_and_address(direction)
+        try:
+            net_helpers.assert_arping(src_namespace, ip_address)
+        except RuntimeError:
+            raise ConnectionTesterException(
+                "ARP queries to %s address have no response from %s namespace"
+                % (ip_address, src_namespace))
+
+    @_validate_direction
+    def assert_connection(self, direction, protocol, src_port=None,
+                          dst_port=None):
+        testing_method = self._protocol_to_method[protocol]
+        testing_method(direction, protocol, src_port, dst_port)
+
+    def assert_no_connection(self, direction, protocol, src_port=None,
+                             dst_port=None):
+        try:
+            self.assert_connection(direction, protocol, src_port, dst_port)
+        except ConnectionTesterException:
+            pass
+        else:
+            dst_port_info = str()
+            src_port_info = str()
+            if dst_port is not None:
+                dst_port_info = " and destionation port %d" % dst_port
+            if src_port is not None:
+                src_port_info = " and source port %d" % src_port
+            raise ConnectionTesterException("%s connection with %s protocol%s"
+                                            "%s was established but it "
+                                            "shouldn't be possible" % (
+                                                direction, protocol,
+                                                src_port_info, dst_port_info))
+
+    @_validate_direction
+    def assert_established_connection(self, direction, protocol, src_port=None,
+                                      dst_port=None):
+        nc_params = (direction, protocol, src_port, dst_port)
+        nc_tester = self._nc_testers.get(nc_params)
+        if nc_tester:
+            if nc_tester.is_established:
+                nc_tester.test_connectivity()
+            else:
+                raise ConnectionTesterException(
+                    '%s connection with protocol %s, source port %s and '
+                    'destination port %s is not established' % nc_params)
+        else:
+            raise ConnectionTesterException(
+                "Attempting to test established %s connection with protocol %s"
+                ", source port %s and destination port %s that hasn't been "
+                "established yet by calling establish_connection()"
+                % nc_params)
+
+    def assert_no_established_connection(self, direction, protocol,
+                                         src_port=None, dst_port=None):
+        try:
+            self.assert_established_connection(direction, protocol, src_port,
+                                               dst_port)
+        except ConnectionTesterException:
+            pass
+        else:
+            raise ConnectionTesterException(
+                'Established %s connection with protocol %s, source port  %s, '
+                'destination port %s can still send packets throught' % (
+                    direction, protocol, src_port, dst_port))
+
+    @_validate_direction
+    def establish_connection(self, direction, protocol, src_port=None,
+                             dst_port=None):
+        nc_tester = self._create_nc_tester(direction, protocol, src_port,
+                                           dst_port)
+        nc_tester.establish_connection()
+        self.addCleanup(nc_tester.stop_processes)
+
+    def _create_nc_tester(self, direction, protocol, src_port, dst_port):
+        """Create netcat tester
+
+        If there already exists a netcat tester that has established
+        connection, exception is raised.
+        """
+        nc_key = (direction, protocol, src_port, dst_port)
+        nc_tester = self._nc_testers.get(nc_key)
+        if nc_tester and nc_tester.is_established:
+            raise ConnectionTesterException(
+                '%s connection using %s protocol, source port %s and '
+                'destination port %s is already established' % (
+                    direction, protocol, src_port, dst_port))
+
+        if direction == self.INGRESS:
+            client_ns = self.peer_namespace
+            server_ns = self.vm_namespace
+            server_addr = self.vm_ip_address
+        else:
+            client_ns = self.vm_namespace
+            server_ns = self.peer_namespace
+            server_addr = self.peer_ip_address
+
+        server_port = dst_port or net_helpers.get_free_namespace_port(
+            protocol, server_ns)
+        nc_tester = net_helpers.NetcatTester(client_namespace=client_ns,
+                                             server_namespace=server_ns,
+                                             address=server_addr,
+                                             protocol=protocol,
+                                             src_port=src_port,
+                                             dst_port=server_port)
+        self._nc_testers[nc_key] = nc_tester
+        return nc_tester
+
+
+class LinuxBridgeConnectionTester(ConnectionTester):
+    """Tester with linux bridge in the middle
+
+    Both endpoints are placed in their separated namespace connected to
+    bridge's namespace via veth pair.
+
+    """
+
+    def _setUp(self):
+        super(LinuxBridgeConnectionTester, self)._setUp()
+        self._bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
+        self._peer, self._vm = self.useFixture(
+            machine_fixtures.PeerMachines(self._bridge)).machines
+
+    @property
+    def bridge_namespace(self):
+        return self._bridge.namespace
+
+    @property
+    def vm_port_id(self):
+        return net_helpers.VethFixture.get_peer_name(self._vm.port.name)
+
+    def flush_arp_tables(self):
+        self._bridge.neigh.flush(4, 'all')
+        super(LinuxBridgeConnectionTester, self).flush_arp_tables()
index de1089d1aa55c73afd2b723e8c439e0eb7a5c0f5..969cb4064096938a3af94575086c046f956bca80 100644 (file)
@@ -38,8 +38,7 @@ class FakeMachine(tools.SafeFixture):
     def __init__(self, bridge, ip_cidr, gateway_ip=None):
         super(FakeMachine, self).__init__()
         self.bridge = bridge
-        self.ip_cidr = ip_cidr
-        self.ip = self.ip_cidr.partition('/')[0]
+        self._ip_cidr = ip_cidr
         self.gateway_ip = gateway_ip
 
     def setUp(self):
@@ -50,11 +49,35 @@ class FakeMachine(tools.SafeFixture):
 
         self.port = self.useFixture(
             net_helpers.PortFixture.get(self.bridge, self.namespace)).port
-        self.port.addr.add(self.ip_cidr)
+        self.port.addr.add(self._ip_cidr)
 
         if self.gateway_ip:
             net_helpers.set_namespace_gateway(self.port, self.gateway_ip)
 
+    @property
+    def ip(self):
+        return self._ip_cidr.partition('/')[0]
+
+    @property
+    def ip_cidr(self):
+        return self._ip_cidr
+
+    @ip_cidr.setter
+    def ip_cidr(self, ip_cidr):
+        self.port.addr.add(ip_cidr)
+        self.port.addr.delete(self._ip_cidr)
+        self._ip_cidr = ip_cidr
+
+    @property
+    def mac_address(self):
+        return self.port.link.address
+
+    @mac_address.setter
+    def mac_address(self, mac_address):
+        self.port.link.set_down()
+        self.port.link.set_address(mac_address)
+        self.port.link.set_up()
+
     def execute(self, *args, **kwargs):
         ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
         return ns_ip_wrapper.netns.execute(*args, **kwargs)
index 170d1b3a9b0798a110f2656c98678ad573077a29..069d55189f9fdc66ddb388881e49fc6b0a0548ea 100644 (file)
@@ -274,6 +274,10 @@ class NetcatTester(object):
             address=self.server_address,
             listen=True)
 
+    @property
+    def is_established(self):
+        return bool(self._client_process and not self._client_process.poll())
+
     def establish_connection(self):
         if self._client_process:
             raise RuntimeError('%(proto)s connection to $(ip_addr)s is already'