actions="NORMAL")
+def icmpv6_header_match_supported():
+ return ofctl_arg_supported(cmd='add-flow',
+ table=ovs_const.ARP_SPOOF_TABLE,
+ priority=1,
+ dl_type=n_consts.ETHERTYPE_IPV6,
+ nw_proto=n_consts.PROTO_NUM_ICMP_V6,
+ icmp_type=n_consts.ICMPV6_TYPE_NA,
+ nd_target='fdf8:f53b:82e4::10',
+ actions="NORMAL")
+
+
def vf_management_supported():
is_supported = True
required_caps = (
return result
+def check_icmpv6_header_match():
+ result = checks.icmpv6_header_match_supported()
+ if not result:
+ LOG.error(_LE('Check for Open vSwitch support of ICMPv6 header '
+ 'matching failed. ICMPv6 Neighbor Advt spoofing (part '
+ 'of arp spoofing) suppression will not work. A newer '
+ 'version of OVS is required.'))
+ return result
+
+
def check_vf_management():
result = checks.vf_management_supported()
if not result:
help=_('Check for ARP responder support')),
BoolOptCallback('arp_header_match', check_arp_header_match,
help=_('Check for ARP header match support')),
+ BoolOptCallback('icmpv6_header_match', check_icmpv6_header_match,
+ help=_('Check for ICMPv6 header match support')),
BoolOptCallback('vf_management', check_vf_management,
help=_('Check for VF management support')),
BoolOptCallback('read_netns', check_read_netns,
cfg.CONF.set_override('arp_responder', True)
if cfg.CONF.AGENT.prevent_arp_spoofing:
cfg.CONF.set_override('arp_header_match', True)
+ cfg.CONF.set_override('icmpv6_header_match', True)
if cfg.CONF.ml2_sriov.agent_required:
cfg.CONF.set_override('vf_management', True)
if not cfg.CONF.AGENT.use_helper_for_ns_read:
L3_HA_MODE_EXT_ALIAS = 'l3-ha'
SUBNET_ALLOCATION_EXT_ALIAS = 'subnet_allocation'
+ETHERTYPE_IPV6 = 0x86DD
+
# Protocol names and numbers for Security Groups/Firewalls
PROTO_NAME_TCP = 'tcp'
PROTO_NAME_ICMP = 'icmp'
# Neighbor Advertisement (136)
ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136]
ICMPV6_TYPE_RA = 134
+ICMPV6_TYPE_NA = 136
DHCPV6_STATEFUL = 'dhcpv6-stateful'
DHCPV6_STATELESS = 'dhcpv6-stateless'
from oslo_log import log as logging
from ryu.lib.packet import ether_types
+from ryu.lib.packet import icmpv6
+from ryu.lib.packet import in_proto
from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const
return ofpp.OFPMatch(in_port=port,
eth_type=ether_types.ETH_TYPE_ARP)
+ @staticmethod
+ def _icmpv6_reply_match(ofp, ofpp, port):
+ return ofpp.OFPMatch(in_port=port,
+ eth_type=ether_types.ETH_TYPE_IPV6,
+ ip_proto=in_proto.IPPROTO_ICMPV6,
+ icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT)
+
+ def install_icmpv6_na_spoofing_protection(self, port, ip_addresses):
+ # Allow neighbor advertisements as long as they match addresses
+ # that actually belong to the port.
+ for ip in ip_addresses:
+ masked_ip = self._cidr_to_ryu(ip)
+ self.install_normal(
+ table_id=constants.ARP_SPOOF_TABLE, priority=2,
+ eth_type=ether_types.ETH_TYPE_IPV6,
+ ip_proto=in_proto.IPPROTO_ICMPV6,
+ icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT,
+ ipv6_nd_target=masked_ip, in_port=port)
+
+ # Now that the rules are ready, direct icmpv6 neighbor advertisement
+ # traffic from the port into the anti-spoof table.
+ (_dp, ofp, ofpp) = self._get_dp()
+ match = self._icmpv6_reply_match(ofp, ofpp, port=port)
+ self.install_goto(table_id=constants.LOCAL_SWITCHING,
+ priority=10,
+ match=match,
+ dest_table_id=constants.ARP_SPOOF_TABLE)
+
def install_arp_spoofing_protection(self, port, ip_addresses):
# allow ARP replies as long as they match addresses that actually
# belong to the port.
def delete_arp_spoofing_protection(self, port):
(_dp, ofp, ofpp) = self._get_dp()
match = self._arp_reply_match(ofp, ofpp, port=port)
+ self.delete_flows(table_id=constants.LOCAL_SWITCHING,
+ match=match)
+ match = self._icmpv6_reply_match(ofp, ofpp, port=port)
self.delete_flows(table_id=constants.LOCAL_SWITCHING,
match=match)
self.delete_flows(table_id=constants.ARP_SPOOF_TABLE,
* references
** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic
"""
-
+from neutron.common import constants as const
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
self.delete_flows(table_id=constants.LOCAL_SWITCHING,
in_port=port, eth_src=mac)
+ def install_icmpv6_na_spoofing_protection(self, port, ip_addresses):
+ # Allow neighbor advertisements as long as they match addresses
+ # that actually belong to the port.
+ for ip in ip_addresses:
+ self.install_normal(
+ table_id=constants.ARP_SPOOF_TABLE, priority=2,
+ dl_type=const.ETHERTYPE_IPV6, nw_proto=const.PROTO_NUM_ICMP_V6,
+ icmp_type=const.ICMPV6_TYPE_NA, nd_target=ip, in_port=port)
+
+ # Now that the rules are ready, direct icmpv6 neighbor advertisement
+ # traffic from the port into the anti-spoof table.
+ self.add_flow(table=constants.LOCAL_SWITCHING,
+ priority=10, dl_type=const.ETHERTYPE_IPV6,
+ nw_proto=const.PROTO_NUM_ICMP_V6,
+ icmp_type=const.ICMPV6_TYPE_NA, in_port=port,
+ actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE))
+
def install_arp_spoofing_protection(self, port, ip_addresses):
# allow ARPs as long as they match addresses that actually
# belong to the port.
def delete_arp_spoofing_protection(self, port):
self.delete_flows(table_id=constants.LOCAL_SWITCHING,
in_port=port, proto='arp')
+ self.delete_flows(table_id=constants.LOCAL_SWITCHING,
+ in_port=port, nw_proto=const.PROTO_NUM_ICMP_V6,
+ icmp_type=const.ICMPV6_TYPE_NA)
self.delete_flows(table_id=constants.ARP_SPOOF_TABLE,
in_port=port)
from neutron.common import config
from neutron.common import constants as n_const
from neutron.common import exceptions
+from neutron.common import ipv6_utils as ipv6
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
pass
+def has_zero_prefixlen_address(ip_addresses):
+ return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses)
+
+
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2population_rpc.L2populationRpcCallBackTunnelMixin,
dvr_rpc.DVRAgentRpcCallbackMixin):
return
# collect all of the addresses and cidrs that belong to the port
addresses = {f['ip_address'] for f in port_details['fixed_ips']}
+ mac_addresses = {vif.vif_mac}
if port_details.get('allowed_address_pairs'):
addresses |= {p['ip_address']
for p in port_details['allowed_address_pairs']}
-
- addresses = {ip for ip in addresses
- if netaddr.IPNetwork(ip).version == 4}
- if any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in addresses):
- # don't try to install protection because a /0 prefix allows any
- # address anyway and the ARP_SPA can only match on /1 or more.
- return
-
- bridge.install_arp_spoofing_protection(port=vif.ofport,
- ip_addresses=addresses)
+ mac_addresses |= {p['mac_address']
+ for p in port_details['allowed_address_pairs']
+ if p.get('mac_address')}
+
+ ipv6_addresses = {ip for ip in addresses
+ if netaddr.IPNetwork(ip).version == 6}
+ # Allow neighbor advertisements for LLA address.
+ ipv6_addresses |= {str(ipv6.get_ipv6_addr_by_EUI64(
+ n_const.IPV6_LLA_PREFIX, mac))
+ for mac in mac_addresses}
+ if not has_zero_prefixlen_address(ipv6_addresses):
+ # Install protection only when prefix is not zero because a /0
+ # prefix allows any address anyway and the nd_target can only
+ # match on /1 or more.
+ bridge.install_icmpv6_na_spoofing_protection(port=vif.ofport,
+ ip_addresses=ipv6_addresses)
+
+ ipv4_addresses = {ip for ip in addresses
+ if netaddr.IPNetwork(ip).version == 4}
+ if not has_zero_prefixlen_address(ipv4_addresses):
+ # Install protection only when prefix is not zero because a /0
+ # prefix allows any address anyway and the ARP_SPA can only
+ # match on /1 or more.
+ bridge.install_arp_spoofing_protection(port=vif.ofport,
+ ip_addresses=ipv4_addresses)
def port_unbound(self, vif_id, net_uuid=None):
'''Unbind port.
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
+ def test_arp_spoof_blocks_icmpv6_neigh_advt(self):
+ self.src_addr = '2000::1'
+ self.dst_addr = '2000::2'
+ # this will prevent the destination from responding (i.e., icmpv6
+ # neighbour advertisement) to the icmpv6 neighbour solicitation
+ # request for it's own address (2000::2) as spoofing rules added
+ # below only allow '2000::3'.
+ self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3'])
+ self.src_p.addr.add('%s/64' % self.src_addr)
+ self.dst_p.addr.add('%s/64' % self.dst_addr)
+ # make sure the IPv6 addresses are ready before pinging
+ self.src_p.addr.wait_until_address_ready(self.src_addr)
+ self.dst_p.addr.wait_until_address_ready(self.dst_addr)
+ net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
+
def test_arp_spoof_blocks_request(self):
# this will prevent the source from sending an ARP
# request with its own address
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
+ def test_arp_spoof_icmpv6_neigh_advt_allowed_address_pairs(self):
+ self.src_addr = '2000::1'
+ self.dst_addr = '2000::2'
+ self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3',
+ self.dst_addr])
+ self.src_p.addr.add('%s/64' % self.src_addr)
+ self.dst_p.addr.add('%s/64' % self.dst_addr)
+ # make sure the IPv6 addresses are ready before pinging
+ self.src_p.addr.wait_until_address_ready(self.src_addr)
+ self.dst_p.addr.wait_until_address_ready(self.dst_addr)
+ net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
+
def test_arp_spoof_allowed_address_pairs_0cidr(self):
self._setup_arp_spoof_for_port(self.dst_p.name, ['9.9.9.9/0',
'1.2.3.4'])
def test_arp_header_match_runs(self):
checks.arp_header_match_supported()
+ def test_icmpv6_header_match_runs(self):
+ checks.icmpv6_header_match_supported()
+
def test_vf_management_runs(self):
checks.vf_management_supported()
]
self.assertEqual(expected, self.mock.mock_calls)
+ def test_install_icmpv6_na_spoofing_protection(self):
+ port = 8888
+ ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128']
+ self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses)
+ (dp, ofp, ofpp) = self._get_dp()
+ expected = [
+ call._send_msg(ofpp.OFPFlowMod(dp,
+ cookie=0,
+ instructions=[
+ ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
+ ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
+ ]),
+ ],
+ match=ofpp.OFPMatch(
+ eth_type=self.ether_types.ETH_TYPE_IPV6,
+ icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
+ ip_proto=self.in_proto.IPPROTO_ICMPV6,
+ ipv6_nd_target='2001:db8::1',
+ in_port=8888,
+ ),
+ priority=2,
+ table_id=24)),
+ call._send_msg(ofpp.OFPFlowMod(dp,
+ cookie=0,
+ instructions=[
+ ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
+ ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
+ ]),
+ ],
+ match=ofpp.OFPMatch(
+ eth_type=self.ether_types.ETH_TYPE_IPV6,
+ icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
+ ip_proto=self.in_proto.IPPROTO_ICMPV6,
+ ipv6_nd_target='fdf8:f53b:82e4::1',
+ in_port=8888,
+ ),
+ priority=2,
+ table_id=24)),
+ call._send_msg(ofpp.OFPFlowMod(dp,
+ cookie=0,
+ instructions=[
+ ofpp.OFPInstructionGotoTable(table_id=24),
+ ],
+ match=ofpp.OFPMatch(
+ eth_type=self.ether_types.ETH_TYPE_IPV6,
+ icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
+ ip_proto=self.in_proto.IPPROTO_ICMPV6,
+ in_port=8888,
+ ),
+ priority=10,
+ table_id=0)),
+ ]
+ self.assertEqual(expected, self.mock.mock_calls)
+
def test_install_arp_spoofing_protection(self):
port = 8888
ip_addresses = ['192.0.2.1', '192.0.2.2/32']
call.delete_flows(table_id=0, match=ofpp.OFPMatch(
eth_type=self.ether_types.ETH_TYPE_ARP,
in_port=8888)),
+ call.delete_flows(table_id=0, match=ofpp.OFPMatch(
+ eth_type=self.ether_types.ETH_TYPE_IPV6,
+ icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
+ in_port=8888,
+ ip_proto=self.in_proto.IPPROTO_ICMPV6)),
call.delete_flows(table_id=24, in_port=port),
]
self.assertEqual(expected, self.mock.mock_calls)
import mock
+from neutron.common import constants as const
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent.\
openflow.ovs_ofctl import ovs_bridge_test_base
]
self.assertEqual(expected, self.mock.mock_calls)
+ def test_install_icmpv6_na_spoofing_protection(self):
+ port = 8888
+ ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128']
+ self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses)
+ expected = [
+ call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal',
+ icmp_type=const.ICMPV6_TYPE_NA,
+ nw_proto=const.PROTO_NUM_ICMP_V6,
+ nd_target='2001:db8::1',
+ priority=2, table=24, in_port=8888),
+ call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal',
+ icmp_type=const.ICMPV6_TYPE_NA,
+ nw_proto=const.PROTO_NUM_ICMP_V6,
+ nd_target='fdf8:f53b:82e4::1/128',
+ priority=2, table=24, in_port=8888),
+ call.add_flow(dl_type=const.ETHERTYPE_IPV6,
+ icmp_type=const.ICMPV6_TYPE_NA,
+ nw_proto=const.PROTO_NUM_ICMP_V6,
+ priority=10, table=0, in_port=8888,
+ actions='resubmit(,24)')
+ ]
+ self.assertEqual(expected, self.mock.mock_calls)
+
def test_install_arp_spoofing_protection(self):
port = 8888
ip_addresses = ['192.0.2.1', '192.0.2.2/32']
self.br.delete_arp_spoofing_protection(port)
expected = [
call.delete_flows(table_id=0, in_port=8888, proto='arp'),
+ call.delete_flows(table_id=0, in_port=8888, icmp_type=136,
+ nw_proto=58),
call.delete_flows(table_id=24, in_port=8888),
]
self.assertEqual(expected, self.mock.mock_calls)
class FakeVif(object):
ofport = 99
port_name = 'name'
+ vif_mac = 'aa:bb:cc:11:22:33'
class MockFixedIntervalLoopingCall(object):
[mock.call(ip_addresses=set(), port=vif.ofport)],
int_br.install_arp_spoofing_protection.mock_calls)
+ def test_arp_spoofing_basic_rule_setup_fixed_ipv6(self):
+ vif = FakeVif()
+ fake_details = {'fixed_ips': [{'ip_address': 'fdf8:f53b:82e4::1'}],
+ 'device_owner': 'nobody'}
+ self.agent.prevent_arp_spoofing = True
+ br = mock.create_autospec(self.agent.int_br)
+ self.agent.setup_arp_spoofing_protection(br, vif, fake_details)
+ self.assertEqual(
+ [mock.call(port=vif.ofport)],
+ br.delete_arp_spoofing_protection.mock_calls)
+ self.assertTrue(br.install_icmpv6_na_spoofing_protection.called)
+
def test_arp_spoofing_fixed_and_allowed_addresses(self):
vif = FakeVif()
fake_details = {
[mock.call(port=vif.ofport, ip_addresses=addresses)],
int_br.install_arp_spoofing_protection.mock_calls)
+ def test_arp_spoofing_fixed_and_allowed_addresses_ipv6(self):
+ vif = FakeVif()
+ fake_details = {
+ 'device_owner': 'nobody',
+ 'fixed_ips': [{'ip_address': '2001:db8::1'},
+ {'ip_address': '2001:db8::2'}],
+ 'allowed_address_pairs': [{'ip_address': '2001:db8::200',
+ 'mac_address': 'aa:22:33:44:55:66'}]
+ }
+ self.agent.prevent_arp_spoofing = True
+ int_br = mock.create_autospec(self.agent.int_br)
+ self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details)
+ # make sure all addresses are allowed including ipv6 LLAs
+ addresses = {'2001:db8::1', '2001:db8::2', '2001:db8::200',
+ 'fe80::a822:33ff:fe44:5566', 'fe80::a8bb:ccff:fe11:2233'}
+ self.assertEqual(
+ [mock.call(port=vif.ofport, ip_addresses=addresses)],
+ int_br.install_icmpv6_na_spoofing_protection.mock_calls)
+
def test__get_ofport_moves(self):
previous = {'port1': 1, 'port2': 2}
current = {'port1': 5, 'port2': 2}