From: Miguel Angel Ajo Date: Fri, 6 Feb 2015 12:10:52 +0000 (+0000) Subject: Extend test coverage for iptables_firewall.py X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=f1fe1fe9129f0bb5a8dc9e95e17dca17fd0693b9;p=openstack-build%2Fneutron-build.git Extend test coverage for iptables_firewall.py Before refactoring _remove_unused_security_group_info, we found a few corner cases that were not tested. We ensure with this test that sg_rules and sg_members of the firewall object are properly cleaned up when the objects they keep are not referenced anymore. Partially implements blueprint refactor-iptables-firewall-driver Change-Id: I44da9ddddf7ba85ba4b76ecc835155c8fb813d90 --- diff --git a/neutron/tests/unit/test_iptables_firewall.py b/neutron/tests/unit/test_iptables_firewall.py index addcb72e5..294a06b1b 100644 --- a/neutron/tests/unit/test_iptables_firewall.py +++ b/neutron/tests/unit/test_iptables_firewall.py @@ -29,10 +29,13 @@ from neutron.tests.unit import test_api_v2 _uuid = test_api_v2._uuid +#TODO(mangelajo): replace all 'IPv4', 'IPv6' to constants FAKE_PREFIX = {'IPv4': '10.0.0.0/24', 'IPv6': 'fe80::/48'} FAKE_IP = {'IPv4': '10.0.0.1', 'IPv6': 'fe80::1'} +#TODO(mangelajo): replace all 'fake_sgid' strings for the constant +FAKE_SGID = 'fake_sgid' class BaseIptablesFirewallTestCase(base.BaseTestCase): @@ -1409,13 +1412,13 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase): ipset_manager.IpsetManager.get_name) self.firewall.ipset.set_exists.return_value = True - def _fake_port(self): + def _fake_port(self, sg_id=FAKE_SGID): return {'device': 'tapfake_dev', 'mac_address': 'ff:ff:ff:ff:ff:ff', 'fixed_ips': [FAKE_IP['IPv4'], FAKE_IP['IPv6']], - 'security_groups': ['fake_sgid'], - 'security_group_source_groups': ['fake_sgid']} + 'security_groups': [sg_id], + 'security_group_source_groups': [sg_id]} def _fake_sg_rule_for_ethertype(self, ethertype): return {'direction': 'ingress', 'remote_group_id': 'fake_sgid', @@ -1440,8 +1443,46 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase): ] self.firewall.ipset.assert_has_calls(calls) + def _setup_fake_firewall_members_and_rules(self, firewall): + firewall.sg_rules = self._fake_sg_rule() + firewall.pre_sg_rules = self._fake_sg_rule() + firewall.sg_members = {'fake_sgid': { + 'IPv4': ['10.0.0.1'], + 'IPv6': ['fe80::1']}} + firewall.pre_sg_members = firewall.sg_members + + def test_remove_unused_security_group_info_clears_unused_rules(self): + self._setup_fake_firewall_members_and_rules(self.firewall) + self.firewall.prepare_port_filter(self._fake_port()) + + # create another SG which won't be referenced by any filtered port + fake_sg_rules = self.firewall.sg_rules['fake_sgid'] + self.firewall.pre_sg_rules['other_sgid'] = fake_sg_rules + self.firewall.sg_rules['other_sgid'] = fake_sg_rules + + # call the cleanup function, and check the unused sg_rules are out + self.firewall._remove_unused_security_group_info() + self.assertNotIn('other_sgid', self.firewall.sg_rules) + + def test_remove_unused_sg_members(self): + self._setup_fake_firewall_members_and_rules(self.firewall) + # no filtered ports in 'fake_sgid', so all rules and members + # are not needed and we expect them to be cleaned up + self.firewall.prepare_port_filter(self._fake_port('other_sgid')) + self.firewall._remove_unused_security_group_info() + + self.assertNotIn('fake_sgid', self.firewall.sg_members) + + def test_remove_all_unused_info(self): + self._setup_fake_firewall_members_and_rules(self.firewall) + self.firewall.filtered_ports = {} + self.firewall._remove_unused_security_group_info() + self.assertFalse(self.firewall.sg_members) + self.assertFalse(self.firewall.sg_rules) + def test_prepare_port_filter_with_deleted_member(self): self.firewall.sg_rules = self._fake_sg_rule() + self.firewall.pre_sg_rules = self._fake_sg_rule() self.firewall.sg_members = {'fake_sgid': { 'IPv4': [ '10.0.0.1', '10.0.0.3', '10.0.0.4', '10.0.0.5'], @@ -1449,8 +1490,7 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase): self.firewall.pre_sg_members = {'fake_sgid': { 'IPv4': ['10.0.0.2'], 'IPv6': ['fe80::1']}} - port = self._fake_port() - self.firewall.prepare_port_filter(port) + self.firewall.prepare_port_filter(self._fake_port()) calls = [ mock.call.set_members('fake_sgid', 'IPv4', ['10.0.0.1', '10.0.0.3', '10.0.0.4',