]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Extend test coverage for iptables_firewall.py
authorMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 6 Feb 2015 12:10:52 +0000 (12:10 +0000)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Tue, 3 Mar 2015 15:16:37 +0000 (15:16 +0000)
Before refactoring _remove_unused_security_group_info, we
found a few corner cases that were not tested. We ensure
with this test that sg_rules and sg_members of the firewall
object are properly cleaned up when the objects they
keep are not referenced anymore.

Partially implements blueprint refactor-iptables-firewall-driver

Change-Id: I44da9ddddf7ba85ba4b76ecc835155c8fb813d90

neutron/tests/unit/test_iptables_firewall.py

index addcb72e5d62d0ed40c0ec1912c06d8f764691e6..294a06b1b4df1b451a810e2c72996d892177f8e9 100644 (file)
@@ -29,10 +29,13 @@ from neutron.tests.unit import test_api_v2
 
 
 _uuid = test_api_v2._uuid
+#TODO(mangelajo): replace all 'IPv4', 'IPv6' to constants
 FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
                'IPv6': 'fe80::/48'}
 FAKE_IP = {'IPv4': '10.0.0.1',
            'IPv6': 'fe80::1'}
+#TODO(mangelajo): replace all 'fake_sgid' strings for the constant
+FAKE_SGID = 'fake_sgid'
 
 
 class BaseIptablesFirewallTestCase(base.BaseTestCase):
@@ -1409,13 +1412,13 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
             ipset_manager.IpsetManager.get_name)
         self.firewall.ipset.set_exists.return_value = True
 
-    def _fake_port(self):
+    def _fake_port(self, sg_id=FAKE_SGID):
         return {'device': 'tapfake_dev',
                 'mac_address': 'ff:ff:ff:ff:ff:ff',
                 'fixed_ips': [FAKE_IP['IPv4'],
                               FAKE_IP['IPv6']],
-                'security_groups': ['fake_sgid'],
-                'security_group_source_groups': ['fake_sgid']}
+                'security_groups': [sg_id],
+                'security_group_source_groups': [sg_id]}
 
     def _fake_sg_rule_for_ethertype(self, ethertype):
         return {'direction': 'ingress', 'remote_group_id': 'fake_sgid',
@@ -1440,8 +1443,46 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
         ]
         self.firewall.ipset.assert_has_calls(calls)
 
+    def _setup_fake_firewall_members_and_rules(self, firewall):
+        firewall.sg_rules = self._fake_sg_rule()
+        firewall.pre_sg_rules = self._fake_sg_rule()
+        firewall.sg_members = {'fake_sgid': {
+            'IPv4': ['10.0.0.1'],
+            'IPv6': ['fe80::1']}}
+        firewall.pre_sg_members = firewall.sg_members
+
+    def test_remove_unused_security_group_info_clears_unused_rules(self):
+        self._setup_fake_firewall_members_and_rules(self.firewall)
+        self.firewall.prepare_port_filter(self._fake_port())
+
+        # create another SG which won't be referenced by any filtered port
+        fake_sg_rules = self.firewall.sg_rules['fake_sgid']
+        self.firewall.pre_sg_rules['other_sgid'] = fake_sg_rules
+        self.firewall.sg_rules['other_sgid'] = fake_sg_rules
+
+        # call the cleanup function, and check the unused sg_rules are out
+        self.firewall._remove_unused_security_group_info()
+        self.assertNotIn('other_sgid', self.firewall.sg_rules)
+
+    def test_remove_unused_sg_members(self):
+        self._setup_fake_firewall_members_and_rules(self.firewall)
+        # no filtered ports in 'fake_sgid', so all rules and members
+        # are not needed and we expect them to be cleaned up
+        self.firewall.prepare_port_filter(self._fake_port('other_sgid'))
+        self.firewall._remove_unused_security_group_info()
+
+        self.assertNotIn('fake_sgid', self.firewall.sg_members)
+
+    def test_remove_all_unused_info(self):
+        self._setup_fake_firewall_members_and_rules(self.firewall)
+        self.firewall.filtered_ports = {}
+        self.firewall._remove_unused_security_group_info()
+        self.assertFalse(self.firewall.sg_members)
+        self.assertFalse(self.firewall.sg_rules)
+
     def test_prepare_port_filter_with_deleted_member(self):
         self.firewall.sg_rules = self._fake_sg_rule()
+        self.firewall.pre_sg_rules = self._fake_sg_rule()
         self.firewall.sg_members = {'fake_sgid': {
             'IPv4': [
                 '10.0.0.1', '10.0.0.3', '10.0.0.4', '10.0.0.5'],
@@ -1449,8 +1490,7 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
         self.firewall.pre_sg_members = {'fake_sgid': {
             'IPv4': ['10.0.0.2'],
             'IPv6': ['fe80::1']}}
-        port = self._fake_port()
-        self.firewall.prepare_port_filter(port)
+        self.firewall.prepare_port_filter(self._fake_port())
         calls = [
             mock.call.set_members('fake_sgid', 'IPv4',
                                   ['10.0.0.1', '10.0.0.3', '10.0.0.4',