from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as const
+from neutron.common import ipv6_utils as ipv6
from neutron import context
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.openstack.common.rpc import proxy
from neutron.tests import base
from neutron.tests.unit import test_extension_security_group as test_sg
-from neutron.tests.unit import test_iptables_firewall as test_fw
+
+
+FAKE_PREFIX = {const.IPv4: '10.0.0.0/24',
+ const.IPv6: '2001:0db8::/64'}
+FAKE_IP = {const.IPv4: '10.0.0.1',
+ const.IPv6: 'fe80::1',
+ 'IPv6_GLOBAL': '2001:0db8::1',
+ 'IPv6_LLA': 'fe80::123'}
class FakeSGCallback(sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
self.rpc = FakeSGCallback()
def test_security_group_rules_for_devices_ipv4_ingress(self):
- fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
+ fake_prefix = FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
if ('allowed-address-pairs'
not in plugin_obj.supported_extension_aliases):
self.skipTest("Test depeneds on allowed-address-pairs extension")
- fake_prefix = test_fw.FAKE_PREFIX['IPv4']
+ fake_prefix = FAKE_PREFIX['IPv4']
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv4_egress(self):
- fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
+ fake_prefix = FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self):
- fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
cidr=fake_prefix,
ip_version=6),
self.security_group()) as (subnet_v6,
{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv6,
- 'port_range_max': 23, 'security_group_id': sg1_id,
+ 'port_range_max': 23,
+ 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+
+ def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self):
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP['IPv6_GLOBAL']
+ with self.network() as n:
+ with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
+ cidr=fake_prefix,
+ ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC),
+ self.security_group()) as (subnet_v6,
+ sg1):
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ # Create gateway port
+ gateway_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': fake_gateway}],
+ device_owner='network:router_interface')
+ gateway_mac = gateway_res['port']['mac_address']
+ gateway_port_id = gateway_res['port']['id']
+ gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ gateway_mac))
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': gateway_lla_ip,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ # Note(xuhanp): remove gateway port's fixed_ips or gateway port
+ # deletion will be prevented.
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, gateway_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ self._delete('ports', gateway_port_id)
+
+ def test_security_group_rule_for_device_ipv6_multi_router_interfaces(self):
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP['IPv6_GLOBAL']
+ with self.network() as n:
+ with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
+ cidr=fake_prefix,
+ ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC),
+ self.security_group()) as (subnet_v6,
+ sg1):
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ # Create gateway port
+ gateway_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': fake_gateway}],
+ device_owner='network:router_interface')
+ gateway_mac = gateway_res['port']['mac_address']
+ gateway_port_id = gateway_res['port']['id']
+ gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ gateway_mac))
+ # Create another router interface port
+ interface_res = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ device_owner='network:router_interface')
+ interface_port_id = interface_res['port']['id']
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': gateway_lla_ip,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, gateway_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ req = self.new_update_request('ports', data, interface_port_id)
+ self.deserialize(self.fmt, req.get_response(self.api))
+ self._delete('ports', gateway_port_id)
+ self._delete('ports', interface_port_id)
+
+ def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self):
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP['IPv6_LLA']
+ with self.network() as n:
+ with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
+ cidr=fake_prefix,
+ ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC),
+ self.security_group()) as (subnet_v6,
+ sg1):
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
+ ]
+ self.assertEqual(port_rpc['security_group_rules'],
+ expected)
+ self._delete('ports', port_id1)
+
+ def test_security_group_ra_rules_for_devices_ipv6_no_gateway_port(self):
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ with self.network() as n:
+ with nested(self.subnet(n,
+ gateway_ip=None,
+ cidr=fake_prefix,
+ ip_version=6,
+ ipv6_ra_mode=const.IPV6_SLAAC),
+ self.security_group()) as (subnet_v6,
+ sg1):
+ sg1_id = sg1['security_group']['id']
+ rule1 = self._build_security_group_rule(
+ sg1_id,
+ 'ingress', const.PROTO_NAME_TCP, '22',
+ '22',
+ ethertype=const.IPv6)
+ rules = {
+ 'security_group_rules': [rule1['security_group_rule']]}
+ self._make_security_group_rule(self.fmt, rules)
+
+ ports_rest1 = self._make_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
+ security_groups=[sg1_id])
+ port_id1 = ports_rest1['port']['id']
+ self.rpc.devices = {port_id1: ports_rest1['port']}
+ devices = [port_id1, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+ port_rpc = ports_rpc[port_id1]
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': const.IPv6,
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 22,
+ 'security_group_id': sg1_id,
+ 'port_range_min': 22},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_egress(self):
- fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
cidr=fake_prefix,
ip_version=6),
self.security_group()) as (subnet_v6,
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self._make_security_group_rule(self.fmt, rules)
- res1 = self._create_port(
+ ports_rest1 = self._make_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg1_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
port_id1 = ports_rest1['port']['id']
self.rpc.devices = {port_id1: ports_rest1['port']}
devices = [port_id1, 'no_exist_device']
'security_group_id': sg1_id,
'port_range_min': 23,
'dest_ip_prefix': fake_prefix},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_source_group(self):
- fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
+ fake_prefix = FAKE_PREFIX[const.IPv6]
+ fake_gateway = FAKE_IP[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
+ gateway_ip=fake_gateway,
cidr=fake_prefix,
ip_version=6),
self.security_group(),
remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
- res = self._create_security_group_rule(self.fmt, rules)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self._make_security_group_rule(self.fmt, rules)
- res1 = self._create_port(
+ ports_rest1 = self._make_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg1_id,
sg2_id])
- ports_rest1 = self.deserialize(self.fmt, res1)
port_id1 = ports_rest1['port']['id']
self.rpc.devices = {port_id1: ports_rest1['port']}
devices = [port_id1, 'no_exist_device']
- res2 = self._create_port(
+ ports_rest2 = self._make_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg2_id])
- ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
ctx = context.get_admin_context()
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': 'ingress',
- 'source_ip_prefix': 'fe80::3/128',
+ 'source_ip_prefix': '2001:db8::2/128',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv6,
+ 'source_ip_prefix': fake_gateway,
+ 'source_port_range_min': const.ICMPV6_TYPE_RA},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 130 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 131 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 132 -j RETURN
-[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 134 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 135 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 136 -j RETURN
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 130 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 131 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 132 -j RETURN
-[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 134 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 135 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmpv6 --icmpv6-type 136 -j RETURN
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 130 -j RETURN
[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 131 -j RETURN
[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 132 -j RETURN
-[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 134 -j RETURN
[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 135 -j RETURN
[0:0] -A %(bn)s-i_port2 -p icmpv6 --icmpv6-type 136 -j RETURN
[0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP