del_ips = self._get_deleted_sg_member_ips(sg_id, ethertype)
cur_member_ips = self._get_cur_sg_member_ips(sg_id, ethertype)
chain_name = ethertype + sg_id[:IPSET_CHAIN_LEN]
- if chain_name not in self.ipset_chains:
+ if chain_name not in self.ipset_chains and cur_member_ips:
self.ipset_chains[chain_name] = []
self.ipset.create_ipset_chain(
chain_name, ethertype)
'IPv6fake_sgid', ['fe80::1'], 'IPv6')]
self.firewall.ipset.assert_has_calls(calls)
+
+ def test_prepare_port_filter_with_sg_no_member(self):
+ self.firewall.sg_rules = self._fake_sg_rule()
+ self.firewall.sg_rules['fake_sgid'].append(
+ {'direction': 'ingress', 'remote_group_id': 'fake_sgid2'})
+ self.firewall.sg_rules.update()
+ self.firewall.sg_members = {'fake_sgid': {
+ 'IPv4': ['10.0.0.1', '10.0.0.2'], 'IPv6': ['fe80::1']}}
+ self.firewall.pre_sg_members = {}
+ port = self._fake_port()
+ port['security_group_source_groups'].append('fake_sgid2')
+ self.firewall.prepare_port_filter(port)
+ calls = [mock.call.create_ipset_chain('IPv4fake_sgid', 'IPv4'),
+ mock.call.refresh_ipset_chain_by_name(
+ 'IPv4fake_sgid', ['10.0.0.1', '10.0.0.2'], 'IPv4'),
+ mock.call.create_ipset_chain('IPv6fake_sgid', 'IPv6'),
+ mock.call.refresh_ipset_chain_by_name(
+ 'IPv6fake_sgid', ['fe80::1'], 'IPv6')]
+
+ self.firewall.ipset.assert_has_calls(calls)