]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add test cases to testing firewall drivers
authorJakub Libosvar <libosvar@redhat.com>
Fri, 12 Jun 2015 15:40:21 +0000 (15:40 +0000)
committerJakub Libosvar <libosvar@redhat.com>
Wed, 23 Dec 2015 16:52:02 +0000 (16:52 +0000)
Part of this patch is also preparation for having common test plan for
firewall driver testing.

Following test cases were implemented:
 - dhcp works by default
 - dhcp server is prevented on vm by default
 - ip spoofing from vm
 - allowed address pairs allows traffic to given ip
 - arp can go through
 - ingress/egress traffic with src/dest port ranges

Related-bug: #1461000
Change-Id: Ib00c99f236855e6556f43f4ffc55014c73b077bb

neutron/tests/contrib/functional-testing.filters
neutron/tests/functional/agent/linux/test_iptables_firewall.py [deleted file]
neutron/tests/functional/agent/test_firewall.py [new file with mode: 0644]

index 40a45047953547249282c317506e3040f21d2fed..1b09f693ab91ec5d8cc5f23c4a680b7681ae35bb 100644 (file)
@@ -7,6 +7,7 @@
 # enable ping from namespace
 ping_filter: CommandFilter, ping, root
 ping6_filter: CommandFilter, ping6, root
+ping_kill: KillFilter, root, ping, -2
 
 # enable curl from namespace
 curl_filter: RegExpFilter, /usr/bin/curl, root, curl, --max-time, \d+, -D-, http://[0-9a-z:./-]+
diff --git a/neutron/tests/functional/agent/linux/test_iptables_firewall.py b/neutron/tests/functional/agent/linux/test_iptables_firewall.py
deleted file mode 100644 (file)
index 9921aac..0000000
+++ /dev/null
@@ -1,197 +0,0 @@
-# Copyright 2015 Intel Corporation.
-# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
-#                               <isaku.yamahata at gmail com>
-# All Rights Reserved.
-#
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-import copy
-
-from neutron.agent.linux import iptables_firewall
-from neutron.agent import securitygroups_rpc as sg_cfg
-from neutron.common import constants
-from neutron.tests.common import machine_fixtures
-from neutron.tests.common import net_helpers
-from neutron.tests.functional import base
-from oslo_config import cfg
-
-DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
-
-
-class IptablesFirewallTestCase(base.BaseSudoTestCase):
-    MAC_REAL = "fa:16:3e:9a:2f:49"
-    MAC_SPOOFED = "fa:16:3e:9a:2f:48"
-    FAKE_SECURITY_GROUP_ID = "fake_sg_id"
-
-    def _set_src_mac(self, mac):
-        self.client.port.link.set_down()
-        self.client.port.link.set_address(mac)
-        self.client.port.link.set_up()
-
-    def setUp(self):
-        cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
-        super(IptablesFirewallTestCase, self).setUp()
-
-        bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
-        self.client, self.server = self.useFixture(
-            machine_fixtures.PeerMachines(bridge)).machines
-
-        self.firewall = iptables_firewall.IptablesFirewallDriver(
-            namespace=bridge.namespace)
-
-        self._set_src_mac(self.MAC_REAL)
-
-        client_br_port_name = net_helpers.VethFixture.get_peer_name(
-            self.client.port.name)
-        self.src_port_desc = {'admin_state_up': True,
-                              'device': client_br_port_name,
-                              'device_owner': DEVICE_OWNER_COMPUTE,
-                              'fixed_ips': [self.client.ip],
-                              'mac_address': self.MAC_REAL,
-                              'port_security_enabled': True,
-                              'security_groups': [self.FAKE_SECURITY_GROUP_ID],
-                              'status': 'ACTIVE'}
-
-    # setup firewall on bridge and send packet from src_veth and observe
-    # if sent packet can be observed on dst_veth
-    def test_port_sec_within_firewall(self):
-
-        # update the sg_group to make ping pass
-        sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
-                     'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
-                    {'ethertype': 'IPv4', 'direction': 'egress'}]
-
-        with self.firewall.defer_apply():
-            self.firewall.update_security_group_rules(
-                                                self.FAKE_SECURITY_GROUP_ID,
-                                                sg_rules)
-        self.firewall.prepare_port_filter(self.src_port_desc)
-        self.client.assert_ping(self.server.ip)
-
-        # modify the src_veth's MAC and test again
-        self._set_src_mac(self.MAC_SPOOFED)
-        self.client.assert_no_ping(self.server.ip)
-
-        # update the port's port_security_enabled value and test again
-        self.src_port_desc['port_security_enabled'] = False
-        self.firewall.update_port_filter(self.src_port_desc)
-        self.client.assert_ping(self.server.ip)
-
-    def test_rule_application_converges(self):
-        sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress'},
-                    {'ethertype': 'IPv6', 'direction': 'egress'},
-                    {'ethertype': 'IPv4', 'direction': 'ingress',
-                     'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
-                    {'ethertype': 'IPv6', 'direction': 'ingress',
-                     'source_ip_prefix': '0::0/0', 'protocol': 'ipv6-icmp'}]
-        # make sure port ranges converge on all protocols with and without
-        # port ranges (prevents regression of bug 1502924)
-        for proto in ('tcp', 'udp', 'icmp'):
-            for version in ('IPv4', 'IPv6'):
-                if proto == 'icmp' and version == 'IPv6':
-                    proto = 'ipv6-icmp'
-                base = {'ethertype': version, 'direction': 'ingress',
-                        'protocol': proto}
-                sg_rules.append(copy.copy(base))
-                base['port_range_min'] = 50
-                base['port_range_max'] = 50
-                sg_rules.append(copy.copy(base))
-                base['port_range_max'] = 55
-                sg_rules.append(copy.copy(base))
-                base['source_port_range_min'] = 60
-                base['source_port_range_max'] = 60
-                sg_rules.append(copy.copy(base))
-                base['source_port_range_max'] = 65
-                sg_rules.append(copy.copy(base))
-
-        # add some single-host rules to prevent regression of bug 1502917
-        sg_rules.append({'ethertype': 'IPv4', 'direction': 'ingress',
-                         'source_ip_prefix': '77.77.77.77/32'})
-        sg_rules.append({'ethertype': 'IPv6', 'direction': 'ingress',
-                         'source_ip_prefix': 'fe80::1/128'})
-        self.firewall.update_security_group_rules(
-            self.FAKE_SECURITY_GROUP_ID, sg_rules)
-        self.firewall.prepare_port_filter(self.src_port_desc)
-        # after one prepare call, another apply should be a NOOP
-        self.assertEqual([], self.firewall.iptables._apply())
-
-        orig_sg_rules = copy.copy(sg_rules)
-        for proto in ('tcp', 'udp', 'icmp'):
-            for version in ('IPv4', 'IPv6'):
-                if proto == 'icmp' and version == 'IPv6':
-                    proto = 'ipv6-icmp'
-                # make sure firewall is in converged state
-                self.firewall.update_security_group_rules(
-                    self.FAKE_SECURITY_GROUP_ID, orig_sg_rules)
-                self.firewall.update_port_filter(self.src_port_desc)
-                sg_rules = copy.copy(orig_sg_rules)
-
-                # remove one rule and add another to make sure it results in
-                # exactly one delete and insert
-                sg_rules.pop(0 if version == 'IPv4' else 1)
-                sg_rules.append({'ethertype': version, 'direction': 'egress',
-                                 'protocol': proto})
-                self.firewall.update_security_group_rules(
-                    self.FAKE_SECURITY_GROUP_ID, sg_rules)
-                result = self.firewall.update_port_filter(self.src_port_desc)
-                deletes = [r for r in result if r.startswith('-D ')]
-                creates = [r for r in result if r.startswith('-I ')]
-                self.assertEqual(1, len(deletes))
-                self.assertEqual(1, len(creates))
-                # quick sanity check to make sure the insert was for the
-                # correct proto
-                self.assertIn('-p %s' % proto, creates[0])
-                # another apply should be a NOOP if the right rule was removed
-                # and the new one was inserted in the correct position
-                self.assertEqual([], self.firewall.iptables._apply())
-
-    def test_rule_ordering_correct(self):
-        sg_rules = [
-            {'ethertype': 'IPv4', 'direction': 'egress', 'protocol': 'tcp',
-             'port_range_min': i, 'port_range_max': i}
-            for i in range(50, 61)
-        ]
-        self.firewall.update_security_group_rules(
-            self.FAKE_SECURITY_GROUP_ID, sg_rules)
-        self.firewall.prepare_port_filter(self.src_port_desc)
-        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
-        # remove a rule and add a new one
-        sg_rules.pop(5)
-        sg_rules.insert(8, {'ethertype': 'IPv4', 'direction': 'egress',
-                            'protocol': 'tcp', 'port_range_min': 400,
-                            'port_range_max': 400})
-        self.firewall.update_security_group_rules(
-            self.FAKE_SECURITY_GROUP_ID, sg_rules)
-        self.firewall.prepare_port_filter(self.src_port_desc)
-        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
-
-        # reverse all of the rules (requires lots of deletes and inserts)
-        sg_rules = list(reversed(sg_rules))
-        self.firewall.update_security_group_rules(
-            self.FAKE_SECURITY_GROUP_ID, sg_rules)
-        self.firewall.prepare_port_filter(self.src_port_desc)
-        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
-
-    def _assert_sg_out_tcp_rules_appear_in_order(self, sg_rules):
-        outgoing_rule_pref = '-A %s-o%s' % (self.firewall.iptables.wrap_name,
-                                            self.src_port_desc['device'][3:13])
-        rules = [
-            r for r in self.firewall.iptables.get_rules_for_table('filter')
-            if r.startswith(outgoing_rule_pref)
-        ]
-        # we want to ensure the rules went in in the same order we sent
-        indexes = [rules.index('%s -p tcp -m tcp --dport %s -j RETURN' %
-                               (outgoing_rule_pref, i['port_range_min']))
-                   for i in sg_rules]
-        # all indexes should be in order with no unexpected rules in between
-        self.assertEqual(range(indexes[0], indexes[-1] + 1), indexes)
diff --git a/neutron/tests/functional/agent/test_firewall.py b/neutron/tests/functional/agent/test_firewall.py
new file mode 100644 (file)
index 0000000..fc7b6cd
--- /dev/null
@@ -0,0 +1,424 @@
+# Copyright 2015 Intel Corporation.
+# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
+#                               <isaku.yamahata at gmail com>
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import copy
+import testscenarios
+
+import netaddr
+from oslo_config import cfg
+
+from neutron.agent import firewall
+from neutron.agent.linux import iptables_firewall
+from neutron.agent import securitygroups_rpc as sg_cfg
+from neutron.common import constants
+from neutron.tests.common import conn_testers
+from neutron.tests.functional import base
+
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+reverse_direction = {
+    conn_testers.ConnectionTester.INGRESS:
+        conn_testers.ConnectionTester.EGRESS,
+    conn_testers.ConnectionTester.EGRESS:
+        conn_testers.ConnectionTester.INGRESS}
+reverse_transport_protocol = {
+    conn_testers.ConnectionTester.TCP: conn_testers.ConnectionTester.UDP,
+    conn_testers.ConnectionTester.UDP: conn_testers.ConnectionTester.TCP}
+
+
+DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
+
+
+def _add_rule(sg_rules, base, port_range_min=None, port_range_max=None):
+    rule = copy.copy(base)
+    if port_range_min:
+        rule['port_range_min'] = port_range_min
+    if port_range_max:
+        rule['port_range_max'] = port_range_max
+    sg_rules.append(rule)
+
+
+class FirewallTestCase(base.BaseSudoTestCase):
+    FAKE_SECURITY_GROUP_ID = 'fake_sg_id'
+    MAC_SPOOFED = "fa:16:3e:9a:2f:48"
+    scenarios = [('IptablesFirewallDriver without ipset',
+                  {'enable_ipset': False}),
+                 ('IptablesFirewallDriver with ipset',
+                  {'enable_ipset': True})]
+
+    def create_iptables_firewall(self):
+        cfg.CONF.set_override('enable_ipset', self.enable_ipset,
+                              'SECURITYGROUP')
+        return iptables_firewall.IptablesFirewallDriver(
+            namespace=self.tester.bridge_namespace)
+
+    @staticmethod
+    def _create_port_description(port_id, ip_addresses, mac_address, sg_ids):
+        return {'admin_state_up': True,
+                'device': port_id,
+                'device_owner': DEVICE_OWNER_COMPUTE,
+                'fixed_ips': ip_addresses,
+                'mac_address': mac_address,
+                'port_security_enabled': True,
+                'security_groups': sg_ids,
+                'status': 'ACTIVE'}
+
+    def setUp(self):
+        cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
+        super(FirewallTestCase, self).setUp()
+        self.tester = self.useFixture(
+            conn_testers.LinuxBridgeConnectionTester())
+        self.firewall = self.create_iptables_firewall()
+        vm_mac = self.tester.vm_mac_address
+        vm_port_id = self.tester.vm_port_id
+        self.src_port_desc = self._create_port_description(
+            vm_port_id, [self.tester.vm_ip_address], vm_mac,
+            [self.FAKE_SECURITY_GROUP_ID])
+        self.firewall.prepare_port_filter(self.src_port_desc)
+
+    def _apply_security_group_rules(self, sg_id, sg_rules):
+        with self.firewall.defer_apply():
+            self.firewall.update_security_group_rules(sg_id, sg_rules)
+
+    def test_rule_application_converges(self):
+        sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress'},
+                    {'ethertype': 'IPv6', 'direction': 'egress'},
+                    {'ethertype': 'IPv4', 'direction': 'ingress',
+                     'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
+                    {'ethertype': 'IPv6', 'direction': 'ingress',
+                     'source_ip_prefix': '0::0/0', 'protocol': 'ipv6-icmp'}]
+        # make sure port ranges converge on all protocols with and without
+        # port ranges (prevents regression of bug 1502924)
+        for proto in ('tcp', 'udp', 'icmp'):
+            for version in ('IPv4', 'IPv6'):
+                if proto == 'icmp' and version == 'IPv6':
+                    proto = 'ipv6-icmp'
+                base = {'ethertype': version, 'direction': 'ingress',
+                        'protocol': proto}
+                sg_rules.append(copy.copy(base))
+                _add_rule(sg_rules, base, port_range_min=50,
+                          port_range_max=50)
+                _add_rule(sg_rules, base, port_range_max=55)
+                _add_rule(sg_rules, base, port_range_min=60,
+                          port_range_max=60)
+                _add_rule(sg_rules, base, port_range_max=65)
+
+        # add some single-host rules to prevent regression of bug 1502917
+        sg_rules.append({'ethertype': 'IPv4', 'direction': 'ingress',
+                         'source_ip_prefix': '77.77.77.77/32'})
+        sg_rules.append({'ethertype': 'IPv6', 'direction': 'ingress',
+                         'source_ip_prefix': 'fe80::1/128'})
+        self.firewall.update_security_group_rules(
+            self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.firewall.prepare_port_filter(self.src_port_desc)
+        # after one prepare call, another apply should be a NOOP
+        self.assertEqual([], self.firewall.iptables._apply())
+
+        orig_sg_rules = copy.copy(sg_rules)
+        for proto in ('tcp', 'udp', 'icmp'):
+            for version in ('IPv4', 'IPv6'):
+                if proto == 'icmp' and version == 'IPv6':
+                    proto = 'ipv6-icmp'
+                # make sure firewall is in converged state
+                self.firewall.update_security_group_rules(
+                    self.FAKE_SECURITY_GROUP_ID, orig_sg_rules)
+                self.firewall.update_port_filter(self.src_port_desc)
+                sg_rules = copy.copy(orig_sg_rules)
+
+                # remove one rule and add another to make sure it results in
+                # exactly one delete and insert
+                sg_rules.pop(0 if version == 'IPv4' else 1)
+                sg_rules.append({'ethertype': version, 'direction': 'egress',
+                                 'protocol': proto})
+                self.firewall.update_security_group_rules(
+                    self.FAKE_SECURITY_GROUP_ID, sg_rules)
+                result = self.firewall.update_port_filter(self.src_port_desc)
+                deletes = [r for r in result if r.startswith('-D ')]
+                creates = [r for r in result if r.startswith('-I ')]
+                self.assertEqual(1, len(deletes))
+                self.assertEqual(1, len(creates))
+                # quick sanity check to make sure the insert was for the
+                # correct proto
+                self.assertIn('-p %s' % proto, creates[0])
+                # another apply should be a NOOP if the right rule was removed
+                # and the new one was inserted in the correct position
+                self.assertEqual([], self.firewall.iptables._apply())
+
+    def test_rule_ordering_correct(self):
+        sg_rules = [
+            {'ethertype': 'IPv4', 'direction': 'egress', 'protocol': 'tcp',
+             'port_range_min': i, 'port_range_max': i}
+            for i in range(50, 61)
+        ]
+        self.firewall.update_security_group_rules(
+            self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.firewall.prepare_port_filter(self.src_port_desc)
+        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
+        # remove a rule and add a new one
+        sg_rules.pop(5)
+        sg_rules.insert(8, {'ethertype': 'IPv4', 'direction': 'egress',
+                            'protocol': 'tcp', 'port_range_min': 400,
+                            'port_range_max': 400})
+        self.firewall.update_security_group_rules(
+            self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.firewall.prepare_port_filter(self.src_port_desc)
+        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
+
+        # reverse all of the rules (requires lots of deletes and inserts)
+        sg_rules = list(reversed(sg_rules))
+        self.firewall.update_security_group_rules(
+            self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.firewall.prepare_port_filter(self.src_port_desc)
+        self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
+
+    def _assert_sg_out_tcp_rules_appear_in_order(self, sg_rules):
+        outgoing_rule_pref = '-A %s-o%s' % (self.firewall.iptables.wrap_name,
+                                            self.src_port_desc['device'][3:13])
+        rules = [
+            r for r in self.firewall.iptables.get_rules_for_table('filter')
+            if r.startswith(outgoing_rule_pref)
+        ]
+        # we want to ensure the rules went in in the same order we sent
+        indexes = [rules.index('%s -p tcp -m tcp --dport %s -j RETURN' %
+                               (outgoing_rule_pref, i['port_range_min']))
+                   for i in sg_rules]
+        # all indexes should be in order with no unexpected rules in between
+        self.assertEqual(range(indexes[0], indexes[-1] + 1), indexes)
+
+    def test_ingress_icmp_secgroup(self):
+        # update the sg_group to make ping pass
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_ICMP},
+                    {'ethertype': constants.IPv4,
+                     'direction': firewall.EGRESS_DIRECTION}]
+
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.INGRESS)
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+
+    def test_mac_spoofing(self):
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_ICMP},
+                    {'ethertype': constants.IPv4,
+                     'direction': firewall.EGRESS_DIRECTION}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_mac_address = self.MAC_SPOOFED
+        self.tester.flush_arp_tables()
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.INGRESS)
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.EGRESS)
+
+    def test_mac_spoofing_works_without_port_security_enabled(self):
+        self.src_port_desc['port_security_enabled'] = False
+        self.firewall.update_port_filter(self.src_port_desc)
+
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_mac_address = self.MAC_SPOOFED
+        self.tester.flush_arp_tables()
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.EGRESS)
+
+    def test_port_security_enabled_set_to_false(self):
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.INGRESS)
+        self.src_port_desc['port_security_enabled'] = False
+        self.firewall.update_port_filter(self.src_port_desc)
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+
+    def test_dhcp_requests_from_vm(self):
+        # DHCPv4 uses source port 67, destination port 68
+        self.tester.assert_connection(direction=self.tester.EGRESS,
+                                      protocol=self.tester.UDP,
+                                      src_port=68, dst_port=67)
+
+    def test_dhcp_server_forbidden_on_vm(self):
+        self.tester.assert_no_connection(direction=self.tester.EGRESS,
+                                         protocol=self.tester.UDP,
+                                         src_port=67, dst_port=68)
+        self.tester.assert_no_connection(direction=self.tester.INGRESS,
+                                         protocol=self.tester.UDP,
+                                         src_port=68, dst_port=67)
+
+    def test_ip_spoofing(self):
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_ICMP}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        not_allowed_ip = "%s/24" % (
+            netaddr.IPAddress(self.tester.vm_ip_address) + 1)
+
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_ip_cidr = not_allowed_ip
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.INGRESS)
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.EGRESS)
+
+    def test_ip_spoofing_works_without_port_security_enabled(self):
+        self.src_port_desc['port_security_enabled'] = False
+        self.firewall.update_port_filter(self.src_port_desc)
+
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_ICMP}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        not_allowed_ip = "%s/24" % (
+            netaddr.IPAddress(self.tester.vm_ip_address) + 1)
+
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_ip_cidr = not_allowed_ip
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.EGRESS)
+
+    def test_allowed_address_pairs(self):
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_ICMP},
+                    {'ethertype': constants.IPv4,
+                     'direction': firewall.EGRESS_DIRECTION}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+
+        port_mac = self.tester.vm_mac_address
+        allowed_ip = netaddr.IPAddress(self.tester.vm_ip_address) + 1
+        not_allowed_ip = "%s/24" % (allowed_ip + 1)
+        self.src_port_desc['allowed_address_pairs'] = [
+            {'mac_address': port_mac,
+             'ip_address': allowed_ip}]
+        allowed_ip = "%s/24" % allowed_ip
+
+        self.firewall.update_port_filter(self.src_port_desc)
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_ip_cidr = allowed_ip
+        self.tester.assert_connection(protocol=self.tester.ICMP,
+                                      direction=self.tester.INGRESS)
+        self.tester.vm_ip_cidr = not_allowed_ip
+        self.tester.assert_no_connection(protocol=self.tester.ICMP,
+                                         direction=self.tester.INGRESS)
+
+    def test_arp_is_allowed(self):
+        self.tester.assert_connection(protocol=self.tester.ARP,
+                                      direction=self.tester.EGRESS)
+        self.tester.assert_connection(protocol=self.tester.ARP,
+                                      direction=self.tester.INGRESS)
+
+    def _test_rule(self, direction, protocol):
+        sg_rules = [{'ethertype': constants.IPv4, 'direction': direction,
+                     'protocol': protocol}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        not_allowed_direction = reverse_direction[direction]
+        not_allowed_protocol = reverse_transport_protocol[protocol]
+
+        self.tester.assert_connection(protocol=protocol,
+                                      direction=direction)
+        self.tester.assert_no_connection(protocol=not_allowed_protocol,
+                                         direction=direction)
+        self.tester.assert_no_connection(protocol=protocol,
+                                         direction=not_allowed_direction)
+
+    def test_ingress_tcp_rule(self):
+        self._test_rule(self.tester.INGRESS, self.tester.TCP)
+
+    def test_ingress_udp_rule(self):
+        self._test_rule(self.tester.INGRESS, self.tester.UDP)
+
+    def test_egress_tcp_rule(self):
+        self._test_rule(self.tester.EGRESS, self.tester.TCP)
+
+    def test_egress_udp_rule(self):
+        self._test_rule(self.tester.EGRESS, self.tester.UDP)
+
+    def test_connection_with_destination_port_range(self):
+        port_min = 12345
+        port_max = 12346
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_TCP,
+                     'port_range_min': port_min,
+                     'port_range_max': port_max}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+
+        self.tester.assert_connection(protocol=self.tester.TCP,
+                                      direction=self.tester.INGRESS,
+                                      dst_port=port_min)
+        self.tester.assert_connection(protocol=self.tester.TCP,
+                                      direction=self.tester.INGRESS,
+                                      dst_port=port_max)
+        self.tester.assert_no_connection(protocol=self.tester.TCP,
+                                         direction=self.tester.INGRESS,
+                                         dst_port=port_min - 1)
+        self.tester.assert_no_connection(protocol=self.tester.TCP,
+                                         direction=self.tester.INGRESS,
+                                         dst_port=port_max + 1)
+
+    def test_connection_with_source_port_range(self):
+        source_port_min = 12345
+        source_port_max = 12346
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.EGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_TCP,
+                     'source_port_range_min': source_port_min,
+                     'source_port_range_max': source_port_max}]
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+
+        self.tester.assert_connection(protocol=self.tester.TCP,
+                                      direction=self.tester.EGRESS,
+                                      src_port=source_port_min)
+        self.tester.assert_connection(protocol=self.tester.TCP,
+                                      direction=self.tester.EGRESS,
+                                      src_port=source_port_max)
+        self.tester.assert_no_connection(protocol=self.tester.TCP,
+                                         direction=self.tester.EGRESS,
+                                         src_port=source_port_min - 1)
+        self.tester.assert_no_connection(protocol=self.tester.TCP,
+                                         direction=self.tester.EGRESS,
+                                         src_port=source_port_max + 1)
+
+    def test_established_connection_is_not_cut(self):
+        port = 12345
+        sg_rules = [{'ethertype': constants.IPv4,
+                     'direction': firewall.INGRESS_DIRECTION,
+                     'protocol': constants.PROTO_NAME_TCP,
+                     'port_range_min': port,
+                     'port_range_max': port}]
+        connection = {'protocol': self.tester.TCP,
+                      'direction': self.tester.INGRESS,
+                      'dst_port': port}
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+        self.tester.establish_connection(**connection)
+
+        self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, list())
+        self.tester.assert_established_connection(**connection)