_uuid = test_api_v2._uuid
+#TODO(mangelajo): replace all 'IPv4', 'IPv6' to constants
FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
'IPv6': 'fe80::/48'}
FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'}
+#TODO(mangelajo): replace all 'fake_sgid' strings for the constant
+FAKE_SGID = 'fake_sgid'
class BaseIptablesFirewallTestCase(base.BaseTestCase):
ipset_manager.IpsetManager.get_name)
self.firewall.ipset.set_exists.return_value = True
- def _fake_port(self):
+ def _fake_port(self, sg_id=FAKE_SGID):
return {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff:ff:ff',
'fixed_ips': [FAKE_IP['IPv4'],
FAKE_IP['IPv6']],
- 'security_groups': ['fake_sgid'],
- 'security_group_source_groups': ['fake_sgid']}
+ 'security_groups': [sg_id],
+ 'security_group_source_groups': [sg_id]}
def _fake_sg_rule_for_ethertype(self, ethertype):
return {'direction': 'ingress', 'remote_group_id': 'fake_sgid',
]
self.firewall.ipset.assert_has_calls(calls)
+ def _setup_fake_firewall_members_and_rules(self, firewall):
+ firewall.sg_rules = self._fake_sg_rule()
+ firewall.pre_sg_rules = self._fake_sg_rule()
+ firewall.sg_members = {'fake_sgid': {
+ 'IPv4': ['10.0.0.1'],
+ 'IPv6': ['fe80::1']}}
+ firewall.pre_sg_members = firewall.sg_members
+
+ def test_remove_unused_security_group_info_clears_unused_rules(self):
+ self._setup_fake_firewall_members_and_rules(self.firewall)
+ self.firewall.prepare_port_filter(self._fake_port())
+
+ # create another SG which won't be referenced by any filtered port
+ fake_sg_rules = self.firewall.sg_rules['fake_sgid']
+ self.firewall.pre_sg_rules['other_sgid'] = fake_sg_rules
+ self.firewall.sg_rules['other_sgid'] = fake_sg_rules
+
+ # call the cleanup function, and check the unused sg_rules are out
+ self.firewall._remove_unused_security_group_info()
+ self.assertNotIn('other_sgid', self.firewall.sg_rules)
+
+ def test_remove_unused_sg_members(self):
+ self._setup_fake_firewall_members_and_rules(self.firewall)
+ # no filtered ports in 'fake_sgid', so all rules and members
+ # are not needed and we expect them to be cleaned up
+ self.firewall.prepare_port_filter(self._fake_port('other_sgid'))
+ self.firewall._remove_unused_security_group_info()
+
+ self.assertNotIn('fake_sgid', self.firewall.sg_members)
+
+ def test_remove_all_unused_info(self):
+ self._setup_fake_firewall_members_and_rules(self.firewall)
+ self.firewall.filtered_ports = {}
+ self.firewall._remove_unused_security_group_info()
+ self.assertFalse(self.firewall.sg_members)
+ self.assertFalse(self.firewall.sg_rules)
+
def test_prepare_port_filter_with_deleted_member(self):
self.firewall.sg_rules = self._fake_sg_rule()
+ self.firewall.pre_sg_rules = self._fake_sg_rule()
self.firewall.sg_members = {'fake_sgid': {
'IPv4': [
'10.0.0.1', '10.0.0.3', '10.0.0.4', '10.0.0.5'],
self.firewall.pre_sg_members = {'fake_sgid': {
'IPv4': ['10.0.0.2'],
'IPv6': ['fe80::1']}}
- port = self._fake_port()
- self.firewall.prepare_port_filter(port)
+ self.firewall.prepare_port_filter(self._fake_port())
calls = [
mock.call.set_members('fake_sgid', 'IPv4',
['10.0.0.1', '10.0.0.3', '10.0.0.4',