return iptables_rule
def _port_arg(self, direction, protocol, port_range_min, port_range_max):
- if not (protocol in ['udp', 'tcp'] and port_range_min):
+ if (protocol not in ['udp', 'tcp', 'icmp', 'icmpv6']
+ or not port_range_min):
return []
- if port_range_min == port_range_max:
+ if protocol in ['icmp', 'icmpv6']:
+ # Note(xuhanp): port_range_min/port_range_max represent
+ # icmp type/code when protocal is icmp or icmpv6
+ # icmp code can be 0 so we cannot use "if port_range_max" here
+ if port_range_max is not None:
+ return ['--%s-type' % protocol,
+ '%s/%s' % (port_range_min, port_range_max)]
+ return ['--%s-type' % protocol, '%s' % port_range_min]
+ elif port_range_min == port_range_max:
return ['--%s' % direction, '%s' % (port_range_min,)]
else:
return ['-m', 'multiport',
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
+ def test_filter_ipv4_egress_icmp_type(self):
+ prefix = FAKE_PREFIX['IPv4']
+ rule = {'ethertype': 'IPv4',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 8,
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmp --icmp-type 8 -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
+ def test_filter_ipv4_egress_icmp_type_name(self):
+ prefix = FAKE_PREFIX['IPv4']
+ rule = {'ethertype': 'IPv4',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 'echo-request',
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmp --icmp-type echo-request -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
+ def test_filter_ipv4_egress_icmp_type_code(self):
+ prefix = FAKE_PREFIX['IPv4']
+ rule = {'ethertype': 'IPv4',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 8,
+ 'source_port_range_max': 0,
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmp --icmp-type 8/0 -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
def test_filter_ipv4_egress_tcp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
+ def test_filter_ipv6_egress_icmp_type(self):
+ prefix = FAKE_PREFIX['IPv6']
+ rule = {'ethertype': 'IPv6',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 8,
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmpv6 --icmpv6-type 8 -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
+ def test_filter_ipv6_egress_icmp_type_name(self):
+ prefix = FAKE_PREFIX['IPv6']
+ rule = {'ethertype': 'IPv6',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 'echo-request',
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmpv6 --icmpv6-type echo-request -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
+ def test_filter_ipv6_egress_icmp_type_code(self):
+ prefix = FAKE_PREFIX['IPv6']
+ rule = {'ethertype': 'IPv6',
+ 'direction': 'egress',
+ 'protocol': 'icmp',
+ 'source_port_range_min': 8,
+ 'source_port_range_max': 0,
+ 'source_ip_prefix': prefix}
+ egress = call.add_rule(
+ 'ofake_dev',
+ '-s %s -p icmpv6 --icmpv6-type 8/0 -j RETURN' % prefix)
+ ingress = None
+ self._test_prepare_port_filter(rule, ingress, egress)
+
def test_filter_ipv6_egress_tcp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',