def test_security_group_rules_for_devices_ipv4_ingress(self):
fake_prefix = FAKE_PREFIX[const.IPv4]
- with self.network() as n:
- with contextlib.nested(
- self.subnet(n),
- self.security_group()) as (subnet_v4, sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22')
- rule2 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '23',
- '23', fake_prefix)
- rules = {
- 'security_group_rules': [rule1['security_group_rule'],
- rule2['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv4,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv4,
- 'port_range_max': 23, 'security_group_id': sg1_id,
- 'port_range_min': 23,
- 'source_ip_prefix': fake_prefix},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22')
+ rule2 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '23',
+ '23', fake_prefix)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule'],
+ rule2['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
+ 'port_range_max': 23, 'security_group_id': sg1_id,
+ 'port_range_min': 23,
+ 'source_ip_prefix': fake_prefix},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
@contextlib.contextmanager
def _port_with_addr_pairs_and_security_group(self):
not in plugin_obj.supported_extension_aliases):
self.skipTest("Test depends on allowed-address-pairs extension")
fake_prefix = FAKE_PREFIX['IPv4']
- with self.network() as n:
- with contextlib.nested(
- self.subnet(n),
- self.security_group()
- ) as (subnet_v4, sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', 'tcp', '22',
- '22', remote_group_id=sg1_id)
- rule2 = self._build_security_group_rule(
- sg1_id,
- 'ingress', 'tcp', '23',
- '23', fake_prefix)
- rules = {
- 'security_group_rules': [rule1['security_group_rule'],
- rule2['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
- address_pairs = [{'mac_address': '00:00:00:00:00:01',
- 'ip_address': '10.0.1.0/24'},
- {'mac_address': '00:00:00:00:00:01',
- 'ip_address': '11.0.0.1'}]
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id],
- arg_list=(addr_pair.ADDRESS_PAIRS,),
- allowed_address_pairs=address_pairs)
- yield self.deserialize(self.fmt, res1)
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', 'tcp', '22',
+ '22', remote_group_id=sg1_id)
+ rule2 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', 'tcp', '23',
+ '23', fake_prefix)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule'],
+ rule2['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, 201)
+ address_pairs = [{'mac_address': '00:00:00:00:00:01',
+ 'ip_address': '10.0.1.0/24'},
+ {'mac_address': '00:00:00:00:00:01',
+ 'ip_address': '11.0.0.1'}]
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id],
+ arg_list=(addr_pair.ADDRESS_PAIRS,),
+ allowed_address_pairs=address_pairs)
+ yield self.deserialize(self.fmt, res1)
def test_security_group_info_for_devices_ipv4_addr_pair(self):
with self._port_with_addr_pairs_and_security_group() as port:
def test_security_group_rules_for_devices_ipv4_egress(self):
fake_prefix = FAKE_PREFIX[const.IPv4]
- with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group()) as (subnet_v4,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'egress', const.PROTO_NAME_TCP, '22',
- '22')
- rule2 = self._build_security_group_rule(
- sg1_id,
- 'egress', const.PROTO_NAME_UDP, '23',
- '23', fake_prefix)
- rules = {
- 'security_group_rules': [rule1['security_group_rule'],
- rule2['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'egress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv4,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'egress',
- 'protocol': const.PROTO_NAME_UDP,
- 'ethertype': const.IPv4,
- 'port_range_max': 23, 'security_group_id': sg1_id,
- 'port_range_min': 23,
- 'dest_ip_prefix': fake_prefix},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'egress', const.PROTO_NAME_TCP, '22',
+ '22')
+ rule2 = self._build_security_group_rule(
+ sg1_id,
+ 'egress', const.PROTO_NAME_UDP, '23',
+ '23', fake_prefix)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule'],
+ rule2['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv4,
+ 'port_range_max': 23, 'security_group_id': sg1_id,
+ 'port_range_min': 23,
+ 'dest_ip_prefix': fake_prefix},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv4_source_group(self):
- with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group(),
- self.security_group()) as (subnet_v4,
- sg1,
- sg2):
- sg1_id = sg1['security_group']['id']
- sg2_id = sg2['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '24',
- '25', remote_group_id=sg2['security_group']['id'])
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id,
- sg2_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
-
- res2 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg2_id])
- ports_rest2 = self.deserialize(self.fmt, res2)
- port_id2 = ports_rest2['port']['id']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg2_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg2_id},
- {'direction': u'ingress',
- 'source_ip_prefix': u'10.0.0.3/32',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv4,
- 'port_range_max': 25, 'port_range_min': 24,
- 'remote_group_id': sg2_id,
- 'security_group_id': sg1_id},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
- self._delete('ports', port_id2)
-
- def test_security_group_info_for_devices_ipv4_source_group(self):
-
- with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group(),
- self.security_group()) as (subnet_v4,
- sg1,
- sg2):
- sg1_id = sg1['security_group']['id']
- sg2_id = sg2['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '24',
- '25', remote_group_id=sg2['security_group']['id'])
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
-
- res2 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg2_id])
- ports_rest2 = self.deserialize(self.fmt, res2)
- port_id2 = ports_rest2['port']['id']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_info_for_devices(
- ctx, devices=devices)
- expected = {
- 'security_groups': {sg1_id: [
- {'direction': 'egress', 'ethertype': const.IPv4},
- {'direction': 'egress', 'ethertype': const.IPv6},
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1,\
+ self.security_group() as sg2:
+ sg1_id = sg1['security_group']['id']
+ sg2_id = sg2['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '24',
+ '25', remote_group_id=sg2['security_group']['id'])
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id,
+ sg2_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ res2 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg2_id])
+ ports_rest2 = self.deserialize(self.fmt, res2)
+ port_id2 = ports_rest2['port']['id']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg2_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg2_id},
{'direction': u'ingress',
+ 'source_ip_prefix': u'10.0.0.3/32',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
- 'remote_group_id': sg2_id}
- ]},
- 'sg_member_ips': {sg2_id: {
- 'IPv4': set([u'10.0.0.3']),
- 'IPv6': set(),
- }}
- }
- self.assertEqual(expected['security_groups'],
- ports_rpc['security_groups'])
- self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'],
- ports_rpc['sg_member_ips'][sg2_id]['IPv4'])
- self._delete('ports', port_id1)
- self._delete('ports', port_id2)
+ 'remote_group_id': sg2_id,
+ 'security_group_id': sg1_id},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ self._delete('ports', port_id2)
+
+ def test_security_group_info_for_devices_ipv4_source_group(self):
+
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1,\
+ self.security_group() as sg2:
+ sg1_id = sg1['security_group']['id']
+ sg2_id = sg2['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '24',
+ '25', remote_group_id=sg2['security_group']['id'])
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ res2 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg2_id])
+ ports_rest2 = self.deserialize(self.fmt, res2)
+ port_id2 = ports_rest2['port']['id']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_info_for_devices(
+ ctx, devices=devices)
+ expected = {
+ 'security_groups': {sg1_id: [
+ {'direction': 'egress', 'ethertype': const.IPv4},
+ {'direction': 'egress', 'ethertype': const.IPv6},
+ {'direction': u'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
+ 'port_range_max': 25, 'port_range_min': 24,
+ 'remote_group_id': sg2_id}
+ ]},
+ 'sg_member_ips': {sg2_id: {
+ 'IPv4': set([u'10.0.0.3']),
+ 'IPv6': set(),
+ }}
+ }
+ self.assertEqual(expected['security_groups'],
+ ports_rpc['security_groups'])
+ self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'],
+ ports_rpc['sg_member_ips'][sg2_id]['IPv4'])
+ self._delete('ports', port_id1)
+ self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP[const.IPv6]
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rule2 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_UDP, '23',
- '23', fake_prefix,
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule'],
- rule2['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
-
- dhcp_port = self._create_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
- 'ip_address': FAKE_IP['IPv6_DHCP']}],
- device_owner=const.DEVICE_OWNER_DHCP,
- security_groups=[sg1_id])
- dhcp_rest = self.deserialize(self.fmt, dhcp_port)
- dhcp_mac = dhcp_rest['port']['mac_address']
- dhcp_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
- const.IPV6_LLA_PREFIX,
- dhcp_mac))
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- source_port, dest_port, ethertype = sg_db_rpc.DHCP_RULE_PORT[6]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_UDP,
- 'ethertype': const.IPv6,
- 'port_range_max': 23,
- 'security_group_id': sg1_id,
- 'port_range_min': 23,
- 'source_ip_prefix': fake_prefix},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': fake_gateway,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- {'direction': 'ingress',
- 'ethertype': ethertype,
- 'port_range_max': dest_port,
- 'port_range_min': dest_port,
- 'protocol': const.PROTO_NAME_UDP,
- 'source_ip_prefix': dhcp_lla_ip,
- 'source_port_range_max': source_port,
- 'source_port_range_min': source_port}
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
-
- def test_security_group_info_for_devices_only_ipv6_rule(self):
- with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group()) as (subnet_v4,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22', remote_group_id=sg1_id,
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- res1 = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
-
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_info_for_devices(
- ctx, devices=devices)
- expected = {
- 'security_groups': {sg1_id: [
- {'direction': 'egress', 'ethertype': const.IPv4},
- {'direction': 'egress', 'ethertype': const.IPv6},
- {'direction': u'ingress',
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rule2 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_UDP, '23',
+ '23', fake_prefix,
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule'],
+ rule2['security_group_rule']]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+ dhcp_port = self._create_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': FAKE_IP['IPv6_DHCP']}],
+ device_owner=const.DEVICE_OWNER_DHCP,
+ security_groups=[sg1_id])
+ dhcp_rest = self.deserialize(self.fmt, dhcp_port)
+ dhcp_mac = dhcp_rest['port']['mac_address']
+ dhcp_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ dhcp_mac))
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ source_port, dest_port, ethertype = sg_db_rpc.DHCP_RULE_PORT[6]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
- 'port_range_max': 22, 'port_range_min': 22,
- 'remote_group_id': sg1_id}
- ]},
- 'sg_member_ips': {sg1_id: {
- 'IPv6': set(),
- }}
- }
- self.assertEqual(expected['security_groups'],
- ports_rpc['security_groups'])
- self.assertEqual(expected['sg_member_ips'][sg1_id]['IPv6'],
- ports_rpc['sg_member_ips'][sg1_id]['IPv6'])
- self._delete('ports', port_id1)
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 23,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 23,
+ 'source_ip_prefix': fake_prefix},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ {'direction': 'ingress',
+ 'ethertype': ethertype,
+ 'port_range_max': dest_port,
+ 'port_range_min': dest_port,
+ 'protocol': const.PROTO_NAME_UDP,
+ 'source_ip_prefix': dhcp_lla_ip,
+ 'source_port_range_max': source_port,
+ 'source_port_range_min': source_port}
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+
+ def test_security_group_info_for_devices_only_ipv6_rule(self):
+ with self.network() as n,\
+ self.subnet(n),\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22', remote_group_id=sg1_id,
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ res1 = self._create_port(
+ self.fmt, n['network']['id'],
+ security_groups=[sg1_id])
+ ports_rest1 = self.deserialize(self.fmt, res1)
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_info_for_devices(
+ ctx, devices=devices)
+ expected = {
+ 'security_groups': {sg1_id: [
+ {'direction': 'egress', 'ethertype': const.IPv4},
+ {'direction': 'egress', 'ethertype': const.IPv6},
+ {'direction': u'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22, 'port_range_min': 22,
+ 'remote_group_id': sg1_id}
+ ]},
+ 'sg_member_ips': {sg1_id: {
+ 'IPv6': set(),
+ }}
+ }
+ self.assertEqual(expected['security_groups'],
+ ports_rpc['security_groups'])
+ self.assertEqual(expected['sg_member_ips'][sg1_id]['IPv6'],
+ ports_rpc['sg_member_ips'][sg1_id]['IPv6'])
+ self._delete('ports', port_id1)
def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_GLOBAL']
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6,
- ipv6_ra_mode=const.IPV6_SLAAC),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- # Create gateway port
- gateway_res = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
- 'ip_address': fake_gateway}],
- device_owner='network:router_interface')
- gateway_mac = gateway_res['port']['mac_address']
- gateway_port_id = gateway_res['port']['id']
- gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
- const.IPV6_LLA_PREFIX,
- gateway_mac))
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': gateway_lla_ip,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
- # Note(xuhanp): remove gateway port's fixed_ips or gateway port
- # deletion will be prevented.
- data = {'port': {'fixed_ips': []}}
- req = self.new_update_request('ports', data, gateway_port_id)
- self.deserialize(self.fmt, req.get_response(self.api))
- self._delete('ports', gateway_port_id)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ # Create gateway port
+ gateway_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': fake_gateway}],
+ device_owner='network:router_interface')
+ gateway_mac = gateway_res['port']['mac_address']
+ gateway_port_id = gateway_res['port']['id']
+ gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ gateway_mac))
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': gateway_lla_ip,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ # Note(xuhanp): remove gateway port's fixed_ips or gateway port
+ # deletion will be prevented.
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, gateway_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ self._delete('ports', gateway_port_id)
def test_security_group_rule_for_device_ipv6_multi_router_interfaces(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_GLOBAL']
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6,
- ipv6_ra_mode=const.IPV6_SLAAC),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- # Create gateway port
- gateway_res = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
- 'ip_address': fake_gateway}],
- device_owner='network:router_interface')
- gateway_mac = gateway_res['port']['mac_address']
- gateway_port_id = gateway_res['port']['id']
- gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
- const.IPV6_LLA_PREFIX,
- gateway_mac))
- # Create another router interface port
- interface_res = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- device_owner='network:router_interface')
- interface_port_id = interface_res['port']['id']
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': gateway_lla_ip,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
- data = {'port': {'fixed_ips': []}}
- req = self.new_update_request('ports', data, gateway_port_id)
- self.deserialize(self.fmt, req.get_response(self.api))
- req = self.new_update_request('ports', data, interface_port_id)
- self.deserialize(self.fmt, req.get_response(self.api))
- self._delete('ports', gateway_port_id)
- self._delete('ports', interface_port_id)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ # Create gateway port
+ gateway_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': fake_gateway}],
+ device_owner='network:router_interface')
+ gateway_mac = gateway_res['port']['mac_address']
+ gateway_port_id = gateway_res['port']['id']
+ gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ gateway_mac))
+ # Create another router interface port
+ interface_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ device_owner='network:router_interface')
+ interface_port_id = interface_res['port']['id']
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': gateway_lla_ip,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, gateway_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ req = self.new_update_request('ports', data, interface_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ self._delete('ports', gateway_port_id)
+ self._delete('ports', interface_port_id)
def test_security_group_ra_rules_for_devices_ipv6_dvr(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_GLOBAL']
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6,
- ipv6_ra_mode=const.IPV6_SLAAC),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- # Create DVR router interface port
- gateway_res = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
- 'ip_address': fake_gateway}],
- device_owner=const.DEVICE_OWNER_DVR_INTERFACE)
- gateway_mac = gateway_res['port']['mac_address']
- gateway_port_id = gateway_res['port']['id']
- gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
- const.IPV6_LLA_PREFIX,
- gateway_mac))
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': gateway_lla_ip,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
- # Note(xuhanp): remove gateway port's fixed_ips or gateway port
- # deletion will be prevented.
- data = {'port': {'fixed_ips': []}}
- req = self.new_update_request('ports', data, gateway_port_id)
- self.deserialize(self.fmt, req.get_response(self.api))
- self._delete('ports', gateway_port_id)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ # Create DVR router interface port
+ gateway_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': fake_gateway}],
+ device_owner=const.DEVICE_OWNER_DVR_INTERFACE)
+ gateway_mac = gateway_res['port']['mac_address']
+ gateway_port_id = gateway_res['port']['id']
+ gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ gateway_mac))
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': gateway_lla_ip,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ # Note(xuhanp): remove gateway port's fixed_ips or gateway port
+ # deletion will be prevented.
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, gateway_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ self._delete('ports', gateway_port_id)
def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_LLA']
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6,
- ipv6_ra_mode=const.IPV6_SLAAC),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': fake_gateway,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
def test_security_group_ra_rules_for_devices_ipv6_no_gateway_port(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=None,
- cidr=fake_prefix,
- ip_version=6,
- ipv6_ra_mode=const.IPV6_SLAAC),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=None, cidr=fake_prefix,
+ ip_version=6, ipv6_ra_mode=const.IPV6_SLAAC
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_egress(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP[const.IPv6]
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6),
- self.security_group()) as (subnet_v6,
- sg1):
- sg1_id = sg1['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'egress', const.PROTO_NAME_TCP, '22',
- '22',
- ethertype=const.IPv6)
- rule2 = self._build_security_group_rule(
- sg1_id,
- 'egress', const.PROTO_NAME_UDP, '23',
- '23', fake_prefix,
- ethertype=const.IPv6)
- rules = {
- 'security_group_rules': [rule1['security_group_rule'],
- rule2['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
-
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'egress',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'egress',
- 'protocol': const.PROTO_NAME_UDP,
- 'ethertype': const.IPv6,
- 'port_range_max': 23,
- 'security_group_id': sg1_id,
- 'port_range_min': 23,
- 'dest_ip_prefix': fake_prefix},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': fake_gateway,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6
+ ) as subnet_v6,\
+ self.security_group() as sg1:
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'egress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rule2 = self._build_security_group_rule(
+ sg1_id,
+ 'egress', const.PROTO_NAME_UDP, '23',
+ '23', fake_prefix,
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule'],
+ rule2['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 23,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 23,
+ 'dest_ip_prefix': fake_prefix},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_source_group(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP[const.IPv6]
- with self.network() as n:
- with contextlib.nested(self.subnet(n,
- gateway_ip=fake_gateway,
- cidr=fake_prefix,
- ip_version=6),
- self.security_group(),
- self.security_group()) as (subnet_v6,
- sg1,
- sg2):
- sg1_id = sg1['security_group']['id']
- sg2_id = sg2['security_group']['id']
- rule1 = self._build_security_group_rule(
- sg1_id,
- 'ingress', const.PROTO_NAME_TCP, '24',
- '25',
- ethertype=const.IPv6,
- remote_group_id=sg2['security_group']['id'])
- rules = {
- 'security_group_rules': [rule1['security_group_rule']]}
- self._make_security_group_rule(self.fmt, rules)
-
- ports_rest1 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg1_id,
- sg2_id])
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
-
- ports_rest2 = self._make_port(
- self.fmt, n['network']['id'],
- fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
- security_groups=[sg2_id])
- port_id2 = ports_rest2['port']['id']
-
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': const.IPv4,
- 'security_group_id': sg2_id},
- {'direction': 'egress', 'ethertype': const.IPv6,
- 'security_group_id': sg2_id},
- {'direction': 'ingress',
- 'source_ip_prefix': '2001:db8::2/128',
- 'protocol': const.PROTO_NAME_TCP,
- 'ethertype': const.IPv6,
- 'port_range_max': 25, 'port_range_min': 24,
- 'remote_group_id': sg2_id,
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': const.PROTO_NAME_ICMP_V6,
- 'ethertype': const.IPv6,
- 'source_ip_prefix': fake_gateway,
- 'source_port_range_min': const.ICMPV6_TYPE_RA},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self._delete('ports', port_id1)
- self._delete('ports', port_id2)
+ with self.network() as n,\
+ self.subnet(n, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6
+ ) as subnet_v6,\
+ self.security_group() as sg1,\
+ self.security_group() as sg2:
+ sg1_id = sg1['security_group']['id']
+ sg2_id = sg2['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '24',
+ '25',
+ ethertype=const.IPv6,
+ remote_group_id=sg2['security_group']['id'])
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id,
+ sg2_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+
+ ports_rest2 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg2_id])
+ port_id2 = ports_rest2['port']['id']
+
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg2_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg2_id},
+ {'direction': 'ingress',
+ 'source_ip_prefix': '2001:db8::2/128',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 25, 'port_range_min': 24,
+ 'remote_group_id': sg2_id,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP_V6,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ self._delete('ports', port_id2)
class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):