From: ankitagrawal Date: Thu, 14 May 2015 09:06:39 +0000 (-0700) Subject: Remove use of contextlib.nested X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=f3f2e59ae76ab2a52ee448bf53722be5503f0d43;p=openstack-build%2Fneutron-build.git Remove use of contextlib.nested Removed use of contextlib.nested call from codebase, as it has been deprecated since Python 2.7. There are also known issues with contextlib.nested that were addressed by the native support for multiple "with" variables. For instance, if the first object is created but the second one throws an exception, the first object's __exit__ is never called. For more information see https://docs.python.org/2/library/contextlib.html#contextlib.nested contextlib.nested is also not compatible with Python 3. Multi-patch set for easier chunks. This one addresses the neutron/tests/unit/agent/test_securitygroups_rpc.py tests. Line continuation markers (e.g. '\') had to be used or syntax errors were thrown. While using parentheses is the preferred way for multiple line statements, but in case of long with statements backslashes are acceptable. Partial-Bug: 1428424 Change-Id: Ia66b98423b14fc7d1bbf6d8a673a49f798d328fa --- diff --git a/neutron/hacking/checks.py b/neutron/hacking/checks.py index eb8f0da29..062f83961 100644 --- a/neutron/hacking/checks.py +++ b/neutron/hacking/checks.py @@ -147,7 +147,6 @@ def check_no_contextlib_nested(logical_line, filename): # when bug 1428424 is closed. ignore_dirs = [ "neutron/plugins/ml2", - "neutron/tests/unit/agent/test_securitygroups_rpc.py", "neutron/tests/unit/api", "neutron/tests/unit/db", "neutron/tests/unit/extensions", diff --git a/neutron/tests/unit/agent/test_securitygroups_rpc.py b/neutron/tests/unit/agent/test_securitygroups_rpc.py index 0cb587f7d..31dd3993e 100644 --- a/neutron/tests/unit/agent/test_securitygroups_rpc.py +++ b/neutron/tests/unit/agent/test_securitygroups_rpc.py @@ -183,57 +183,56 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): def test_security_group_rules_for_devices_ipv4_ingress(self): fake_prefix = FAKE_PREFIX[const.IPv4] - with self.network() as n: - with contextlib.nested( - self.subnet(n), - self.security_group()) as (subnet_v4, sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22') - rule2 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '23', - '23', fake_prefix) - rules = { - 'security_group_rules': [rule1['security_group_rule'], - rule2['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) - - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv4, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv4, - 'port_range_max': 23, 'security_group_id': sg1_id, - 'port_range_min': 23, - 'source_ip_prefix': fake_prefix}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22') + rule2 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '23', + '23', fake_prefix) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, + 'port_range_max': 23, 'security_group_id': sg1_id, + 'port_range_min': 23, + 'source_ip_prefix': fake_prefix}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) @contextlib.contextmanager def _port_with_addr_pairs_and_security_group(self): @@ -242,36 +241,34 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): not in plugin_obj.supported_extension_aliases): self.skipTest("Test depends on allowed-address-pairs extension") fake_prefix = FAKE_PREFIX['IPv4'] - with self.network() as n: - with contextlib.nested( - self.subnet(n), - self.security_group() - ) as (subnet_v4, sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', 'tcp', '22', - '22', remote_group_id=sg1_id) - rule2 = self._build_security_group_rule( - sg1_id, - 'ingress', 'tcp', '23', - '23', fake_prefix) - rules = { - 'security_group_rules': [rule1['security_group_rule'], - rule2['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) - address_pairs = [{'mac_address': '00:00:00:00:00:01', - 'ip_address': '10.0.1.0/24'}, - {'mac_address': '00:00:00:00:00:01', - 'ip_address': '11.0.0.1'}] - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id], - arg_list=(addr_pair.ADDRESS_PAIRS,), - allowed_address_pairs=address_pairs) - yield self.deserialize(self.fmt, res1) + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', 'tcp', '22', + '22', remote_group_id=sg1_id) + rule2 = self._build_security_group_rule( + sg1_id, + 'ingress', 'tcp', '23', + '23', fake_prefix) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, 201) + address_pairs = [{'mac_address': '00:00:00:00:00:01', + 'ip_address': '10.0.1.0/24'}, + {'mac_address': '00:00:00:00:00:01', + 'ip_address': '11.0.0.1'}] + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id], + arg_list=(addr_pair.ADDRESS_PAIRS,), + allowed_address_pairs=address_pairs) + yield self.deserialize(self.fmt, res1) def test_security_group_info_for_devices_ipv4_addr_pair(self): with self._port_with_addr_pairs_and_security_group() as port: @@ -332,757 +329,733 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): def test_security_group_rules_for_devices_ipv4_egress(self): fake_prefix = FAKE_PREFIX[const.IPv4] - with self.network() as n: - with contextlib.nested(self.subnet(n), - self.security_group()) as (subnet_v4, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'egress', const.PROTO_NAME_TCP, '22', - '22') - rule2 = self._build_security_group_rule( - sg1_id, - 'egress', const.PROTO_NAME_UDP, '23', - '23', fake_prefix) - rules = { - 'security_group_rules': [rule1['security_group_rule'], - rule2['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) - - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'egress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv4, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'egress', - 'protocol': const.PROTO_NAME_UDP, - 'ethertype': const.IPv4, - 'port_range_max': 23, 'security_group_id': sg1_id, - 'port_range_min': 23, - 'dest_ip_prefix': fake_prefix}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'egress', const.PROTO_NAME_TCP, '22', + '22') + rule2 = self._build_security_group_rule( + sg1_id, + 'egress', const.PROTO_NAME_UDP, '23', + '23', fake_prefix) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'egress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'egress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv4, + 'port_range_max': 23, 'security_group_id': sg1_id, + 'port_range_min': 23, + 'dest_ip_prefix': fake_prefix}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv4_source_group(self): - with self.network() as n: - with contextlib.nested(self.subnet(n), - self.security_group(), - self.security_group()) as (subnet_v4, - sg1, - sg2): - sg1_id = sg1['security_group']['id'] - sg2_id = sg2['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '24', - '25', remote_group_id=sg2['security_group']['id']) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) - - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id, - sg2_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - - res2 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg2_id]) - ports_rest2 = self.deserialize(self.fmt, res2) - port_id2 = ports_rest2['port']['id'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg2_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg2_id}, - {'direction': u'ingress', - 'source_ip_prefix': u'10.0.0.3/32', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv4, - 'port_range_max': 25, 'port_range_min': 24, - 'remote_group_id': sg2_id, - 'security_group_id': sg1_id}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - self._delete('ports', port_id2) - - def test_security_group_info_for_devices_ipv4_source_group(self): - - with self.network() as n: - with contextlib.nested(self.subnet(n), - self.security_group(), - self.security_group()) as (subnet_v4, - sg1, - sg2): - sg1_id = sg1['security_group']['id'] - sg2_id = sg2['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '24', - '25', remote_group_id=sg2['security_group']['id']) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(webob.exc.HTTPCreated.code, res.status_int) - - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - - res2 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg2_id]) - ports_rest2 = self.deserialize(self.fmt, res2) - port_id2 = ports_rest2['port']['id'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_info_for_devices( - ctx, devices=devices) - expected = { - 'security_groups': {sg1_id: [ - {'direction': 'egress', 'ethertype': const.IPv4}, - {'direction': 'egress', 'ethertype': const.IPv6}, + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1,\ + self.security_group() as sg2: + sg1_id = sg1['security_group']['id'] + sg2_id = sg2['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '24', + '25', remote_group_id=sg2['security_group']['id']) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id, + sg2_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + + res2 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg2_id]) + ports_rest2 = self.deserialize(self.fmt, res2) + port_id2 = ports_rest2['port']['id'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg2_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg2_id}, {'direction': u'ingress', + 'source_ip_prefix': u'10.0.0.3/32', 'protocol': const.PROTO_NAME_TCP, 'ethertype': const.IPv4, 'port_range_max': 25, 'port_range_min': 24, - 'remote_group_id': sg2_id} - ]}, - 'sg_member_ips': {sg2_id: { - 'IPv4': set([u'10.0.0.3']), - 'IPv6': set(), - }} - } - self.assertEqual(expected['security_groups'], - ports_rpc['security_groups']) - self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'], - ports_rpc['sg_member_ips'][sg2_id]['IPv4']) - self._delete('ports', port_id1) - self._delete('ports', port_id2) + 'remote_group_id': sg2_id, + 'security_group_id': sg1_id}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + self._delete('ports', port_id2) + + def test_security_group_info_for_devices_ipv4_source_group(self): + + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1,\ + self.security_group() as sg2: + sg1_id = sg1['security_group']['id'] + sg2_id = sg2['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '24', + '25', remote_group_id=sg2['security_group']['id']) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(webob.exc.HTTPCreated.code, res.status_int) + + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + + res2 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg2_id]) + ports_rest2 = self.deserialize(self.fmt, res2) + port_id2 = ports_rest2['port']['id'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_info_for_devices( + ctx, devices=devices) + expected = { + 'security_groups': {sg1_id: [ + {'direction': 'egress', 'ethertype': const.IPv4}, + {'direction': 'egress', 'ethertype': const.IPv6}, + {'direction': u'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, + 'port_range_max': 25, 'port_range_min': 24, + 'remote_group_id': sg2_id} + ]}, + 'sg_member_ips': {sg2_id: { + 'IPv4': set([u'10.0.0.3']), + 'IPv6': set(), + }} + } + self.assertEqual(expected['security_groups'], + ports_rpc['security_groups']) + self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'], + ports_rpc['sg_member_ips'][sg2_id]['IPv4']) + self._delete('ports', port_id1) + self._delete('ports', port_id2) def test_security_group_rules_for_devices_ipv6_ingress(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP[const.IPv6] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rule2 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_UDP, '23', - '23', fake_prefix, - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule'], - rule2['security_group_rule']]} - res = self._create_security_group_rule(self.fmt, rules) - self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) - - dhcp_port = self._create_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], - 'ip_address': FAKE_IP['IPv6_DHCP']}], - device_owner=const.DEVICE_OWNER_DHCP, - security_groups=[sg1_id]) - dhcp_rest = self.deserialize(self.fmt, dhcp_port) - dhcp_mac = dhcp_rest['port']['mac_address'] - dhcp_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( - const.IPV6_LLA_PREFIX, - dhcp_mac)) - - res1 = self._create_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - source_port, dest_port, ethertype = sg_db_rpc.DHCP_RULE_PORT[6] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_UDP, - 'ethertype': const.IPv6, - 'port_range_max': 23, - 'security_group_id': sg1_id, - 'port_range_min': 23, - 'source_ip_prefix': fake_prefix}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': fake_gateway, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - {'direction': 'ingress', - 'ethertype': ethertype, - 'port_range_max': dest_port, - 'port_range_min': dest_port, - 'protocol': const.PROTO_NAME_UDP, - 'source_ip_prefix': dhcp_lla_ip, - 'source_port_range_max': source_port, - 'source_port_range_min': source_port} - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - - def test_security_group_info_for_devices_only_ipv6_rule(self): - with self.network() as n: - with contextlib.nested(self.subnet(n), - self.security_group()) as (subnet_v4, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', remote_group_id=sg1_id, - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - res1 = self._create_port( - self.fmt, n['network']['id'], - security_groups=[sg1_id]) - ports_rest1 = self.deserialize(self.fmt, res1) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_info_for_devices( - ctx, devices=devices) - expected = { - 'security_groups': {sg1_id: [ - {'direction': 'egress', 'ethertype': const.IPv4}, - {'direction': 'egress', 'ethertype': const.IPv6}, - {'direction': u'ingress', + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6 + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rule2 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_UDP, '23', + '23', fake_prefix, + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + + dhcp_port = self._create_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], + 'ip_address': FAKE_IP['IPv6_DHCP']}], + device_owner=const.DEVICE_OWNER_DHCP, + security_groups=[sg1_id]) + dhcp_rest = self.deserialize(self.fmt, dhcp_port) + dhcp_mac = dhcp_rest['port']['mac_address'] + dhcp_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( + const.IPV6_LLA_PREFIX, + dhcp_mac)) + + res1 = self._create_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + source_port, dest_port, ethertype = sg_db_rpc.DHCP_RULE_PORT[6] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', 'protocol': const.PROTO_NAME_TCP, 'ethertype': const.IPv6, - 'port_range_max': 22, 'port_range_min': 22, - 'remote_group_id': sg1_id} - ]}, - 'sg_member_ips': {sg1_id: { - 'IPv6': set(), - }} - } - self.assertEqual(expected['security_groups'], - ports_rpc['security_groups']) - self.assertEqual(expected['sg_member_ips'][sg1_id]['IPv6'], - ports_rpc['sg_member_ips'][sg1_id]['IPv6']) - self._delete('ports', port_id1) + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv6, + 'port_range_max': 23, + 'security_group_id': sg1_id, + 'port_range_min': 23, + 'source_ip_prefix': fake_prefix}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': fake_gateway, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + {'direction': 'ingress', + 'ethertype': ethertype, + 'port_range_max': dest_port, + 'port_range_min': dest_port, + 'protocol': const.PROTO_NAME_UDP, + 'source_ip_prefix': dhcp_lla_ip, + 'source_port_range_max': source_port, + 'source_port_range_min': source_port} + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + + def test_security_group_info_for_devices_only_ipv6_rule(self): + with self.network() as n,\ + self.subnet(n),\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', remote_group_id=sg1_id, + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + res1 = self._create_port( + self.fmt, n['network']['id'], + security_groups=[sg1_id]) + ports_rest1 = self.deserialize(self.fmt, res1) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_info_for_devices( + ctx, devices=devices) + expected = { + 'security_groups': {sg1_id: [ + {'direction': 'egress', 'ethertype': const.IPv4}, + {'direction': 'egress', 'ethertype': const.IPv6}, + {'direction': u'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, 'port_range_min': 22, + 'remote_group_id': sg1_id} + ]}, + 'sg_member_ips': {sg1_id: { + 'IPv6': set(), + }} + } + self.assertEqual(expected['security_groups'], + ports_rpc['security_groups']) + self.assertEqual(expected['sg_member_ips'][sg1_id]['IPv6'], + ports_rpc['sg_member_ips'][sg1_id]['IPv6']) + self._delete('ports', port_id1) def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP['IPv6_GLOBAL'] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6, - ipv6_ra_mode=const.IPV6_SLAAC), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - # Create gateway port - gateway_res = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], - 'ip_address': fake_gateway}], - device_owner='network:router_interface') - gateway_mac = gateway_res['port']['mac_address'] - gateway_port_id = gateway_res['port']['id'] - gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( - const.IPV6_LLA_PREFIX, - gateway_mac)) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': gateway_lla_ip, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - # Note(xuhanp): remove gateway port's fixed_ips or gateway port - # deletion will be prevented. - data = {'port': {'fixed_ips': []}} - req = self.new_update_request('ports', data, gateway_port_id) - self.deserialize(self.fmt, req.get_response(self.api)) - self._delete('ports', gateway_port_id) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6, + ipv6_ra_mode=const.IPV6_SLAAC + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + # Create gateway port + gateway_res = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], + 'ip_address': fake_gateway}], + device_owner='network:router_interface') + gateway_mac = gateway_res['port']['mac_address'] + gateway_port_id = gateway_res['port']['id'] + gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( + const.IPV6_LLA_PREFIX, + gateway_mac)) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': gateway_lla_ip, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + # Note(xuhanp): remove gateway port's fixed_ips or gateway port + # deletion will be prevented. + data = {'port': {'fixed_ips': []}} + req = self.new_update_request('ports', data, gateway_port_id) + self.deserialize(self.fmt, req.get_response(self.api)) + self._delete('ports', gateway_port_id) def test_security_group_rule_for_device_ipv6_multi_router_interfaces(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP['IPv6_GLOBAL'] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6, - ipv6_ra_mode=const.IPV6_SLAAC), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - # Create gateway port - gateway_res = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], - 'ip_address': fake_gateway}], - device_owner='network:router_interface') - gateway_mac = gateway_res['port']['mac_address'] - gateway_port_id = gateway_res['port']['id'] - gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( - const.IPV6_LLA_PREFIX, - gateway_mac)) - # Create another router interface port - interface_res = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - device_owner='network:router_interface') - interface_port_id = interface_res['port']['id'] - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': gateway_lla_ip, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - data = {'port': {'fixed_ips': []}} - req = self.new_update_request('ports', data, gateway_port_id) - self.deserialize(self.fmt, req.get_response(self.api)) - req = self.new_update_request('ports', data, interface_port_id) - self.deserialize(self.fmt, req.get_response(self.api)) - self._delete('ports', gateway_port_id) - self._delete('ports', interface_port_id) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6, + ipv6_ra_mode=const.IPV6_SLAAC + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + # Create gateway port + gateway_res = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], + 'ip_address': fake_gateway}], + device_owner='network:router_interface') + gateway_mac = gateway_res['port']['mac_address'] + gateway_port_id = gateway_res['port']['id'] + gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( + const.IPV6_LLA_PREFIX, + gateway_mac)) + # Create another router interface port + interface_res = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + device_owner='network:router_interface') + interface_port_id = interface_res['port']['id'] + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': gateway_lla_ip, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + data = {'port': {'fixed_ips': []}} + req = self.new_update_request('ports', data, gateway_port_id) + self.deserialize(self.fmt, req.get_response(self.api)) + req = self.new_update_request('ports', data, interface_port_id) + self.deserialize(self.fmt, req.get_response(self.api)) + self._delete('ports', gateway_port_id) + self._delete('ports', interface_port_id) def test_security_group_ra_rules_for_devices_ipv6_dvr(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP['IPv6_GLOBAL'] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6, - ipv6_ra_mode=const.IPV6_SLAAC), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - # Create DVR router interface port - gateway_res = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], - 'ip_address': fake_gateway}], - device_owner=const.DEVICE_OWNER_DVR_INTERFACE) - gateway_mac = gateway_res['port']['mac_address'] - gateway_port_id = gateway_res['port']['id'] - gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( - const.IPV6_LLA_PREFIX, - gateway_mac)) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': gateway_lla_ip, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - # Note(xuhanp): remove gateway port's fixed_ips or gateway port - # deletion will be prevented. - data = {'port': {'fixed_ips': []}} - req = self.new_update_request('ports', data, gateway_port_id) - self.deserialize(self.fmt, req.get_response(self.api)) - self._delete('ports', gateway_port_id) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6, + ipv6_ra_mode=const.IPV6_SLAAC + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + # Create DVR router interface port + gateway_res = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'], + 'ip_address': fake_gateway}], + device_owner=const.DEVICE_OWNER_DVR_INTERFACE) + gateway_mac = gateway_res['port']['mac_address'] + gateway_port_id = gateway_res['port']['id'] + gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64( + const.IPV6_LLA_PREFIX, + gateway_mac)) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': gateway_lla_ip, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + # Note(xuhanp): remove gateway port's fixed_ips or gateway port + # deletion will be prevented. + data = {'port': {'fixed_ips': []}} + req = self.new_update_request('ports', data, gateway_port_id) + self.deserialize(self.fmt, req.get_response(self.api)) + self._delete('ports', gateway_port_id) def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP['IPv6_LLA'] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6, - ipv6_ra_mode=const.IPV6_SLAAC), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': fake_gateway, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6, + ipv6_ra_mode=const.IPV6_SLAAC + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': fake_gateway, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) def test_security_group_ra_rules_for_devices_ipv6_no_gateway_port(self): fake_prefix = FAKE_PREFIX[const.IPv6] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=None, - cidr=fake_prefix, - ip_version=6, - ipv6_ra_mode=const.IPV6_SLAAC), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) + with self.network() as n,\ + self.subnet(n, gateway_ip=None, cidr=fake_prefix, + ip_version=6, ipv6_ra_mode=const.IPV6_SLAAC + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv6_egress(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP[const.IPv6] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6), - self.security_group()) as (subnet_v6, - sg1): - sg1_id = sg1['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'egress', const.PROTO_NAME_TCP, '22', - '22', - ethertype=const.IPv6) - rule2 = self._build_security_group_rule( - sg1_id, - 'egress', const.PROTO_NAME_UDP, '23', - '23', fake_prefix, - ethertype=const.IPv6) - rules = { - 'security_group_rules': [rule1['security_group_rule'], - rule2['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'egress', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 22, - 'security_group_id': sg1_id, - 'port_range_min': 22}, - {'direction': 'egress', - 'protocol': const.PROTO_NAME_UDP, - 'ethertype': const.IPv6, - 'port_range_max': 23, - 'security_group_id': sg1_id, - 'port_range_min': 23, - 'dest_ip_prefix': fake_prefix}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': fake_gateway, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6 + ) as subnet_v6,\ + self.security_group() as sg1: + sg1_id = sg1['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'egress', const.PROTO_NAME_TCP, '22', + '22', + ethertype=const.IPv6) + rule2 = self._build_security_group_rule( + sg1_id, + 'egress', const.PROTO_NAME_UDP, '23', + '23', fake_prefix, + ethertype=const.IPv6) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'egress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 22, + 'security_group_id': sg1_id, + 'port_range_min': 22}, + {'direction': 'egress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv6, + 'port_range_max': 23, + 'security_group_id': sg1_id, + 'port_range_min': 23, + 'dest_ip_prefix': fake_prefix}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': fake_gateway, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv6_source_group(self): fake_prefix = FAKE_PREFIX[const.IPv6] fake_gateway = FAKE_IP[const.IPv6] - with self.network() as n: - with contextlib.nested(self.subnet(n, - gateway_ip=fake_gateway, - cidr=fake_prefix, - ip_version=6), - self.security_group(), - self.security_group()) as (subnet_v6, - sg1, - sg2): - sg1_id = sg1['security_group']['id'] - sg2_id = sg2['security_group']['id'] - rule1 = self._build_security_group_rule( - sg1_id, - 'ingress', const.PROTO_NAME_TCP, '24', - '25', - ethertype=const.IPv6, - remote_group_id=sg2['security_group']['id']) - rules = { - 'security_group_rules': [rule1['security_group_rule']]} - self._make_security_group_rule(self.fmt, rules) - - ports_rest1 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg1_id, - sg2_id]) - port_id1 = ports_rest1['port']['id'] - self.rpc.devices = {port_id1: ports_rest1['port']} - devices = [port_id1, 'no_exist_device'] - - ports_rest2 = self._make_port( - self.fmt, n['network']['id'], - fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], - security_groups=[sg2_id]) - port_id2 = ports_rest2['port']['id'] - - ctx = context.get_admin_context() - ports_rpc = self.rpc.security_group_rules_for_devices( - ctx, devices=devices) - port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': const.IPv4, - 'security_group_id': sg2_id}, - {'direction': 'egress', 'ethertype': const.IPv6, - 'security_group_id': sg2_id}, - {'direction': 'ingress', - 'source_ip_prefix': '2001:db8::2/128', - 'protocol': const.PROTO_NAME_TCP, - 'ethertype': const.IPv6, - 'port_range_max': 25, 'port_range_min': 24, - 'remote_group_id': sg2_id, - 'security_group_id': sg1_id}, - {'direction': 'ingress', - 'protocol': const.PROTO_NAME_ICMP_V6, - 'ethertype': const.IPv6, - 'source_ip_prefix': fake_gateway, - 'source_port_range_min': const.ICMPV6_TYPE_RA}, - ] - self.assertEqual(port_rpc['security_group_rules'], - expected) - self._delete('ports', port_id1) - self._delete('ports', port_id2) + with self.network() as n,\ + self.subnet(n, gateway_ip=fake_gateway, + cidr=fake_prefix, ip_version=6 + ) as subnet_v6,\ + self.security_group() as sg1,\ + self.security_group() as sg2: + sg1_id = sg1['security_group']['id'] + sg2_id = sg2['security_group']['id'] + rule1 = self._build_security_group_rule( + sg1_id, + 'ingress', const.PROTO_NAME_TCP, '24', + '25', + ethertype=const.IPv6, + remote_group_id=sg2['security_group']['id']) + rules = { + 'security_group_rules': [rule1['security_group_rule']]} + self._make_security_group_rule(self.fmt, rules) + + ports_rest1 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg1_id, + sg2_id]) + port_id1 = ports_rest1['port']['id'] + self.rpc.devices = {port_id1: ports_rest1['port']} + devices = [port_id1, 'no_exist_device'] + + ports_rest2 = self._make_port( + self.fmt, n['network']['id'], + fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], + security_groups=[sg2_id]) + port_id2 = ports_rest2['port']['id'] + + ctx = context.get_admin_context() + ports_rpc = self.rpc.security_group_rules_for_devices( + ctx, devices=devices) + port_rpc = ports_rpc[port_id1] + expected = [{'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg1_id}, + {'direction': 'egress', 'ethertype': const.IPv4, + 'security_group_id': sg2_id}, + {'direction': 'egress', 'ethertype': const.IPv6, + 'security_group_id': sg2_id}, + {'direction': 'ingress', + 'source_ip_prefix': '2001:db8::2/128', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, + 'port_range_max': 25, 'port_range_min': 24, + 'remote_group_id': sg2_id, + 'security_group_id': sg1_id}, + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_ICMP_V6, + 'ethertype': const.IPv6, + 'source_ip_prefix': fake_gateway, + 'source_port_range_min': const.ICMPV6_TYPE_RA}, + ] + self.assertEqual(port_rpc['security_group_rules'], + expected) + self._delete('ports', port_id1) + self._delete('ports', port_id2) class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):