expected)
self._delete('ports', port_id1)
+ def test_security_group_info_for_devices_only_ipv6_rule(self):
+ with self.network() as n:
+ with contextlib.nested(self.subnet(n),
+ self.security_group()) as (subnet_v4,
+ sg1):
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22', remote_group_id=sg1_id,
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_info_for_devices(
+ ctx, devices=devices)
+ expected = {
+ 'security_groups': {sg1_id: [
+ {'direction': 'egress', 'ethertype': const.IPv4},
+ {'direction': 'egress', 'ethertype': const.IPv6},
+ {'direction': u'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22, 'port_range_min': 22,
+ 'remote_group_id': sg1_id}
+ ]},
+ 'sg_member_ips': {sg1_id: {
+ 'IPv6': [],
+ }}
+ }
+ self.assertEqual(expected['security_groups'],
+ ports_rpc['security_groups'])
+ self.assertEqual(expected['sg_member_ips'][sg1_id]['IPv6'],
+ ports_rpc['sg_member_ips'][sg1_id]['IPv6'])
+ self._delete('ports', port_id1)
+
def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_GLOBAL']