for ip_version, sg_ids in security_group_ids.items():
for sg_id in sg_ids:
current_ips = self.sg_members[sg_id][ip_version]
- if current_ips:
- self.ipset.set_members(sg_id, ip_version, current_ips)
+ self.ipset.set_members(sg_id, ip_version, current_ips)
def _generate_ipset_rule_args(self, sg_rule, remote_gid):
ethertype = sg_rule.get('ethertype')
self.firewall._build_ipv4v6_mac_ip_list(mac_oth, ipv6,
mac_ipv4_pairs, mac_ipv6_pairs)
self.assertEqual(fake_ipv6_pair, mac_ipv6_pairs)
+
+ def test_update_ipset_members(self):
+ self.firewall.sg_members[FAKE_SGID][_IPv4] = []
+ self.firewall.sg_members[FAKE_SGID][_IPv6] = []
+ sg_info = {constants.IPv4: [FAKE_SGID]}
+ self.firewall._update_ipset_members(sg_info)
+ calls = [mock.call.set_members(FAKE_SGID, constants.IPv4, [])]
+ self.firewall.ipset.assert_has_calls(calls)