return icmpv6_rules
def _select_sg_rules_for_port(self, port, direction):
- sg_ids = port.get('security_groups', [])
+ """Select rules from the security groups the port is member of."""
+ port_sg_ids = port.get('security_groups', [])
port_rules = []
- fixed_ips = port.get('fixed_ips', [])
- for sg_id in sg_ids:
+
+ for sg_id in port_sg_ids:
for rule in self.sg_rules.get(sg_id, []):
if rule['direction'] == direction:
if self.enable_ipset:
port_rules.append(rule)
- continue
- remote_group_id = rule.get('remote_group_id')
- if not remote_group_id:
- port_rules.append(rule)
- continue
- ethertype = rule['ethertype']
- for ip in self.sg_members[remote_group_id][ethertype]:
- if ip in fixed_ips:
- continue
- ip_rule = rule.copy()
- direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
- ip_rule[direction_ip_prefix] = str(
- netaddr.IPNetwork(ip).cidr)
- port_rules.append(ip_rule)
+ else:
+ port_rules.extend(
+ self._expand_sg_rule_with_remote_ips(
+ rule, port, direction))
return port_rules
+ def _expand_sg_rule_with_remote_ips(self, rule, port, direction):
+ """Expand a remote group rule to rule per remote group IP."""
+ remote_group_id = rule.get('remote_group_id')
+ if remote_group_id:
+ ethertype = rule['ethertype']
+ port_ips = port.get('fixed_ips', [])
+
+ for ip in self.sg_members[remote_group_id][ethertype]:
+ if ip not in port_ips:
+ ip_rule = rule.copy()
+ direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
+ ip_prefix = str(netaddr.IPNetwork(ip).cidr)
+ ip_rule[direction_ip_prefix] = ip_prefix
+ yield ip_rule
+ else:
+ yield rule
+
def _get_remote_sg_ids(self, port, direction):
sg_ids = port.get('security_groups', [])
remote_sg_ids = {constants.IPv4: [], constants.IPv6: []}
'security_groups': ['fake_sgid'],
'security_group_source_groups': ['fake_sgid']}
+ def _fake_sg_rule_for_ethertype(self, ethertype):
+ return {'direction': 'ingress', 'remote_group_id': 'fake_sgid',
+ 'ethertype': ethertype}
+
def _fake_sg_rule(self):
- return {'fake_sgid': [
- {'direction': 'ingress', 'remote_group_id': 'fake_sgid',
- 'ethertype': 'IPv4'},
- {'direction': 'ingress', 'remote_group_id': 'fake_sgid',
- 'ethertype': 'IPv6'}]}
+ return {'fake_sgid': [self._fake_sg_rule_for_ethertype('IPv4'),
+ self._fake_sg_rule_for_ethertype('IPv6')]}
def test_prepare_port_filter_with_new_members(self):
self.firewall.sg_rules = self._fake_sg_rule()
calls = [mock.call.destroy('fake_sgid', 'IPv4')]
self.firewall.ipset.assert_has_calls(calls, True)
+
+ def test_sg_rule_expansion_with_remote_ips(self):
+ other_ips = ['10.0.0.2', '10.0.0.3', '10.0.0.4']
+ self.firewall.sg_members = {'fake_sgid': {
+ 'IPv4': [FAKE_IP['IPv4']] + other_ips,
+ 'IPv6': [FAKE_IP['IPv6']]}}
+
+ port = self._fake_port()
+ rule = self._fake_sg_rule_for_ethertype('IPv4')
+ rules = self.firewall._expand_sg_rule_with_remote_ips(
+ rule, port, 'ingress')
+ self.assertEqual(list(rules),
+ [dict(rule.items() +
+ [('source_ip_prefix', '%s/32' % ip)])
+ for ip in other_ips])