if rule_dict not in sg_info['security_groups'][security_group_id]:
sg_info['security_groups'][security_group_id].append(
rule_dict)
+ # Update the security groups info if they don't have any rules
+ sg_ids = self._select_sg_ids_for_ports(context, ports)
+ for (sg_id, ) in sg_ids:
+ if sg_id not in sg_info['security_groups']:
+ sg_info['security_groups'][sg_id] = []
sg_info['sg_member_ips'] = remote_security_group_info
# the provider rules do not belong to any security group, so these
sg_info['sg_member_ips'][sg_id][ethertype].add(ip)
return sg_info
+ def _select_sg_ids_for_ports(self, context, ports):
+ if not ports:
+ return []
+ sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
+ sg_binding_sgid = sg_db.SecurityGroupPortBinding.security_group_id
+ query = context.session.query(sg_binding_sgid)
+ query = query.filter(sg_binding_port.in_(ports.keys()))
+ return query.all()
+
def _select_rules_for_ports(self, context, ports):
if not ports:
return []
def test_sg_rules_for_devices_ipv4_ingress_port_range_min_port_1(self):
self._test_sg_rules_for_devices_ipv4_ingress_port_range(1, 10)
+ def test_security_group_info_for_ports_with_no_rules(self):
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg:
+ sg_id = sg['security_group']['id']
+ self._delete_default_security_group_egress_rules(sg_id)
+
+ res = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg_id])
+ ports_rest = self.deserialize(self.fmt, res)
+ port_id = ports_rest['port']['id']
+ self.rpc.devices = {port_id: ports_rest['port']}
+ devices = [port_id]
+ ctx = context.get_admin_context()
+ sg_info = self.rpc.security_group_info_for_devices(
+ ctx, devices=devices)
+
+ expected = {sg_id: []}
+ self.assertEqual(expected, sg_info['security_groups'])
+ self._delete('ports', port_id)
+
@contextlib.contextmanager
def _port_with_addr_pairs_and_security_group(self):
plugin_obj = manager.NeutronManager.get_plugin()