From 17765114292217d109c15b220be57fea6c9eed4a Mon Sep 17 00:00:00 2001 From: sridhargaddam Date: Tue, 14 Jul 2015 16:18:06 +0000 Subject: [PATCH] Add IPv6 Address Resolution protection Similar to IPv4 arp protection support, this patch adds the necessary OVS rules to prevent ports attached to agent from sending any icmpv6 neighbor advertisement messages that contain an IPv6 address not belonging to the port. For details please refer to "Figure 3. Attack against IPv6 Address Resolution" http://www.cisco.com/web/about/security/intelligence/ipv6_first_hop.html DocImpact SecurityImpact Closes-Bug: #1491690 Change-Id: I1f8311f1b9ae1be02afde3e9078e49c6da373a88 --- neutron/cmd/sanity/checks.py | 11 ++++ neutron/cmd/sanity_check.py | 13 ++++ neutron/common/constants.py | 3 + .../agent/openflow/native/br_int.py | 33 +++++++++++ .../agent/openflow/ovs_ofctl/br_int.py | 22 ++++++- .../openvswitch/agent/ovs_neutron_agent.py | 41 +++++++++---- .../tests/functional/agent/test_ovs_flows.py | 27 +++++++++ .../tests/functional/sanity/test_sanity.py | 3 + .../agent/openflow/native/test_br_int.py | 59 +++++++++++++++++++ .../agent/openflow/ovs_ofctl/test_br_int.py | 26 ++++++++ .../agent/test_ovs_neutron_agent.py | 32 ++++++++++ 11 files changed, 259 insertions(+), 11 deletions(-) diff --git a/neutron/cmd/sanity/checks.py b/neutron/cmd/sanity/checks.py index 819d00c23..39c97066e 100644 --- a/neutron/cmd/sanity/checks.py +++ b/neutron/cmd/sanity/checks.py @@ -134,6 +134,17 @@ def arp_header_match_supported(): actions="NORMAL") +def icmpv6_header_match_supported(): + return ofctl_arg_supported(cmd='add-flow', + table=ovs_const.ARP_SPOOF_TABLE, + priority=1, + dl_type=n_consts.ETHERTYPE_IPV6, + nw_proto=n_consts.PROTO_NUM_ICMP_V6, + icmp_type=n_consts.ICMPV6_TYPE_NA, + nd_target='fdf8:f53b:82e4::10', + actions="NORMAL") + + def vf_management_supported(): is_supported = True required_caps = ( diff --git a/neutron/cmd/sanity_check.py b/neutron/cmd/sanity_check.py index 0cf80a103..9eca181fa 100644 --- a/neutron/cmd/sanity_check.py +++ b/neutron/cmd/sanity_check.py @@ -165,6 +165,16 @@ def check_arp_header_match(): return result +def check_icmpv6_header_match(): + result = checks.icmpv6_header_match_supported() + if not result: + LOG.error(_LE('Check for Open vSwitch support of ICMPv6 header ' + 'matching failed. ICMPv6 Neighbor Advt spoofing (part ' + 'of arp spoofing) suppression will not work. A newer ' + 'version of OVS is required.')) + return result + + def check_vf_management(): result = checks.vf_management_supported() if not result: @@ -206,6 +216,8 @@ OPTS = [ help=_('Check for ARP responder support')), BoolOptCallback('arp_header_match', check_arp_header_match, help=_('Check for ARP header match support')), + BoolOptCallback('icmpv6_header_match', check_icmpv6_header_match, + help=_('Check for ICMPv6 header match support')), BoolOptCallback('vf_management', check_vf_management, help=_('Check for VF management support')), BoolOptCallback('read_netns', check_read_netns, @@ -247,6 +259,7 @@ def enable_tests_from_config(): cfg.CONF.set_override('arp_responder', True) if cfg.CONF.AGENT.prevent_arp_spoofing: cfg.CONF.set_override('arp_header_match', True) + cfg.CONF.set_override('icmpv6_header_match', True) if cfg.CONF.ml2_sriov.agent_required: cfg.CONF.set_override('vf_management', True) if not cfg.CONF.AGENT.use_helper_for_ns_read: diff --git a/neutron/common/constants.py b/neutron/common/constants.py index 9a4ada150..f15dd3641 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -112,6 +112,8 @@ L3_DISTRIBUTED_EXT_ALIAS = 'dvr' L3_HA_MODE_EXT_ALIAS = 'l3-ha' SUBNET_ALLOCATION_EXT_ALIAS = 'subnet_allocation' +ETHERTYPE_IPV6 = 0x86DD + # Protocol names and numbers for Security Groups/Firewalls PROTO_NAME_TCP = 'tcp' PROTO_NAME_ICMP = 'icmp' @@ -130,6 +132,7 @@ PROTO_NUM_UDP = 17 # Neighbor Advertisement (136) ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136] ICMPV6_TYPE_RA = 134 +ICMPV6_TYPE_NA = 136 DHCPV6_STATEFUL = 'dhcpv6-stateful' DHCPV6_STATELESS = 'dhcpv6-stateless' diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py index 76eaf8601..ce4b79028 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py @@ -21,6 +21,8 @@ from oslo_log import log as logging from ryu.lib.packet import ether_types +from ryu.lib.packet import icmpv6 +from ryu.lib.packet import in_proto from neutron.i18n import _LE from neutron.plugins.common import constants as p_const @@ -146,6 +148,34 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): return ofpp.OFPMatch(in_port=port, eth_type=ether_types.ETH_TYPE_ARP) + @staticmethod + def _icmpv6_reply_match(ofp, ofpp, port): + return ofpp.OFPMatch(in_port=port, + eth_type=ether_types.ETH_TYPE_IPV6, + ip_proto=in_proto.IPPROTO_ICMPV6, + icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT) + + def install_icmpv6_na_spoofing_protection(self, port, ip_addresses): + # Allow neighbor advertisements as long as they match addresses + # that actually belong to the port. + for ip in ip_addresses: + masked_ip = self._cidr_to_ryu(ip) + self.install_normal( + table_id=constants.ARP_SPOOF_TABLE, priority=2, + eth_type=ether_types.ETH_TYPE_IPV6, + ip_proto=in_proto.IPPROTO_ICMPV6, + icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT, + ipv6_nd_target=masked_ip, in_port=port) + + # Now that the rules are ready, direct icmpv6 neighbor advertisement + # traffic from the port into the anti-spoof table. + (_dp, ofp, ofpp) = self._get_dp() + match = self._icmpv6_reply_match(ofp, ofpp, port=port) + self.install_goto(table_id=constants.LOCAL_SWITCHING, + priority=10, + match=match, + dest_table_id=constants.ARP_SPOOF_TABLE) + def install_arp_spoofing_protection(self, port, ip_addresses): # allow ARP replies as long as they match addresses that actually # belong to the port. @@ -171,6 +201,9 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): def delete_arp_spoofing_protection(self, port): (_dp, ofp, ofpp) = self._get_dp() match = self._arp_reply_match(ofp, ofpp, port=port) + self.delete_flows(table_id=constants.LOCAL_SWITCHING, + match=match) + match = self._icmpv6_reply_match(ofp, ofpp, port=port) self.delete_flows(table_id=constants.LOCAL_SWITCHING, match=match) self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py index 952513e71..ef232cd0d 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py @@ -18,7 +18,7 @@ * references ** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic """ - +from neutron.common import constants as const from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \ @@ -110,6 +110,23 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): self.delete_flows(table_id=constants.LOCAL_SWITCHING, in_port=port, eth_src=mac) + def install_icmpv6_na_spoofing_protection(self, port, ip_addresses): + # Allow neighbor advertisements as long as they match addresses + # that actually belong to the port. + for ip in ip_addresses: + self.install_normal( + table_id=constants.ARP_SPOOF_TABLE, priority=2, + dl_type=const.ETHERTYPE_IPV6, nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA, nd_target=ip, in_port=port) + + # Now that the rules are ready, direct icmpv6 neighbor advertisement + # traffic from the port into the anti-spoof table. + self.add_flow(table=constants.LOCAL_SWITCHING, + priority=10, dl_type=const.ETHERTYPE_IPV6, + nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA, in_port=port, + actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE)) + def install_arp_spoofing_protection(self, port, ip_addresses): # allow ARPs as long as they match addresses that actually # belong to the port. @@ -129,5 +146,8 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): def delete_arp_spoofing_protection(self, port): self.delete_flows(table_id=constants.LOCAL_SWITCHING, in_port=port, proto='arp') + self.delete_flows(table_id=constants.LOCAL_SWITCHING, + in_port=port, nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA) self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, in_port=port) diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 76f0b1dbf..636da4925 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -40,6 +40,7 @@ from neutron.api.rpc.handlers import dvr_rpc from neutron.common import config from neutron.common import constants as n_const from neutron.common import exceptions +from neutron.common import ipv6_utils as ipv6 from neutron.common import topics from neutron.common import utils as n_utils from neutron import context @@ -96,6 +97,10 @@ class OVSPluginApi(agent_rpc.PluginApi): pass +def has_zero_prefixlen_address(ip_addresses): + return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses) + + class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, l2population_rpc.L2populationRpcCallBackTunnelMixin, dvr_rpc.DVRAgentRpcCallbackMixin): @@ -867,19 +872,35 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, return # collect all of the addresses and cidrs that belong to the port addresses = {f['ip_address'] for f in port_details['fixed_ips']} + mac_addresses = {vif.vif_mac} if port_details.get('allowed_address_pairs'): addresses |= {p['ip_address'] for p in port_details['allowed_address_pairs']} - - addresses = {ip for ip in addresses - if netaddr.IPNetwork(ip).version == 4} - if any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in addresses): - # don't try to install protection because a /0 prefix allows any - # address anyway and the ARP_SPA can only match on /1 or more. - return - - bridge.install_arp_spoofing_protection(port=vif.ofport, - ip_addresses=addresses) + mac_addresses |= {p['mac_address'] + for p in port_details['allowed_address_pairs'] + if p.get('mac_address')} + + ipv6_addresses = {ip for ip in addresses + if netaddr.IPNetwork(ip).version == 6} + # Allow neighbor advertisements for LLA address. + ipv6_addresses |= {str(ipv6.get_ipv6_addr_by_EUI64( + n_const.IPV6_LLA_PREFIX, mac)) + for mac in mac_addresses} + if not has_zero_prefixlen_address(ipv6_addresses): + # Install protection only when prefix is not zero because a /0 + # prefix allows any address anyway and the nd_target can only + # match on /1 or more. + bridge.install_icmpv6_na_spoofing_protection(port=vif.ofport, + ip_addresses=ipv6_addresses) + + ipv4_addresses = {ip for ip in addresses + if netaddr.IPNetwork(ip).version == 4} + if not has_zero_prefixlen_address(ipv4_addresses): + # Install protection only when prefix is not zero because a /0 + # prefix allows any address anyway and the ARP_SPA can only + # match on /1 or more. + bridge.install_arp_spoofing_protection(port=vif.ofport, + ip_addresses=ipv4_addresses) def port_unbound(self, vif_id, net_uuid=None): '''Unbind port. diff --git a/neutron/tests/functional/agent/test_ovs_flows.py b/neutron/tests/functional/agent/test_ovs_flows.py index e0ddbb710..94ebb6429 100644 --- a/neutron/tests/functional/agent/test_ovs_flows.py +++ b/neutron/tests/functional/agent/test_ovs_flows.py @@ -162,6 +162,21 @@ class _ARPSpoofTestCase(object): self.dst_p.addr.add('%s/24' % self.dst_addr) net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_blocks_icmpv6_neigh_advt(self): + self.src_addr = '2000::1' + self.dst_addr = '2000::2' + # this will prevent the destination from responding (i.e., icmpv6 + # neighbour advertisement) to the icmpv6 neighbour solicitation + # request for it's own address (2000::2) as spoofing rules added + # below only allow '2000::3'. + self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3']) + self.src_p.addr.add('%s/64' % self.src_addr) + self.dst_p.addr.add('%s/64' % self.dst_addr) + # make sure the IPv6 addresses are ready before pinging + self.src_p.addr.wait_until_address_ready(self.src_addr) + self.dst_p.addr.wait_until_address_ready(self.dst_addr) + net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_blocks_request(self): # this will prevent the source from sending an ARP # request with its own address @@ -184,6 +199,18 @@ class _ARPSpoofTestCase(object): self.dst_p.addr.add('%s/24' % self.dst_addr) net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_icmpv6_neigh_advt_allowed_address_pairs(self): + self.src_addr = '2000::1' + self.dst_addr = '2000::2' + self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3', + self.dst_addr]) + self.src_p.addr.add('%s/64' % self.src_addr) + self.dst_p.addr.add('%s/64' % self.dst_addr) + # make sure the IPv6 addresses are ready before pinging + self.src_p.addr.wait_until_address_ready(self.src_addr) + self.dst_p.addr.wait_until_address_ready(self.dst_addr) + net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_allowed_address_pairs_0cidr(self): self._setup_arp_spoof_for_port(self.dst_p.name, ['9.9.9.9/0', '1.2.3.4']) diff --git a/neutron/tests/functional/sanity/test_sanity.py b/neutron/tests/functional/sanity/test_sanity.py index a47bb4e27..888469071 100644 --- a/neutron/tests/functional/sanity/test_sanity.py +++ b/neutron/tests/functional/sanity/test_sanity.py @@ -65,6 +65,9 @@ class SanityTestCaseRoot(functional_base.BaseSudoTestCase): def test_arp_header_match_runs(self): checks.arp_header_match_supported() + def test_icmpv6_header_match_runs(self): + checks.icmpv6_header_match_supported() + def test_vf_management_runs(self): checks.vf_management_supported() diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py index fab1f247e..17a865a55 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py @@ -283,6 +283,60 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): ] self.assertEqual(expected, self.mock.mock_calls) + def test_install_icmpv6_na_spoofing_protection(self): + port = 8888 + ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128'] + self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses) + (dp, ofp, ofpp) = self._get_dp() + expected = [ + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [ + ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), + ]), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + ipv6_nd_target='2001:db8::1', + in_port=8888, + ), + priority=2, + table_id=24)), + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [ + ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), + ]), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + ipv6_nd_target='fdf8:f53b:82e4::1', + in_port=8888, + ), + priority=2, + table_id=24)), + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionGotoTable(table_id=24), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + in_port=8888, + ), + priority=10, + table_id=0)), + ] + self.assertEqual(expected, self.mock.mock_calls) + def test_install_arp_spoofing_protection(self): port = 8888 ip_addresses = ['192.0.2.1', '192.0.2.2/32'] @@ -339,6 +393,11 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): call.delete_flows(table_id=0, match=ofpp.OFPMatch( eth_type=self.ether_types.ETH_TYPE_ARP, in_port=8888)), + call.delete_flows(table_id=0, match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + in_port=8888, + ip_proto=self.in_proto.IPPROTO_ICMPV6)), call.delete_flows(table_id=24, in_port=port), ] self.assertEqual(expected, self.mock.mock_calls) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py index 9bb3c8f23..8c77e185c 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py @@ -16,6 +16,7 @@ import mock +from neutron.common import constants as const from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent.\ openflow.ovs_ofctl import ovs_bridge_test_base @@ -186,6 +187,29 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): ] self.assertEqual(expected, self.mock.mock_calls) + def test_install_icmpv6_na_spoofing_protection(self): + port = 8888 + ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128'] + self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses) + expected = [ + call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal', + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + nd_target='2001:db8::1', + priority=2, table=24, in_port=8888), + call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal', + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + nd_target='fdf8:f53b:82e4::1/128', + priority=2, table=24, in_port=8888), + call.add_flow(dl_type=const.ETHERTYPE_IPV6, + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + priority=10, table=0, in_port=8888, + actions='resubmit(,24)') + ] + self.assertEqual(expected, self.mock.mock_calls) + def test_install_arp_spoofing_protection(self): port = 8888 ip_addresses = ['192.0.2.1', '192.0.2.2/32'] @@ -207,6 +231,8 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): self.br.delete_arp_spoofing_protection(port) expected = [ call.delete_flows(table_id=0, in_port=8888, proto='arp'), + call.delete_flows(table_id=0, in_port=8888, icmp_type=136, + nw_proto=58), call.delete_flows(table_id=24, in_port=8888), ] self.assertEqual(expected, self.mock.mock_calls) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 1280b10aa..54a1b55c8 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -54,6 +54,7 @@ TEST_NETWORK_ID2 = 'net-id-2' class FakeVif(object): ofport = 99 port_name = 'name' + vif_mac = 'aa:bb:cc:11:22:33' class MockFixedIntervalLoopingCall(object): @@ -1387,6 +1388,18 @@ class TestOvsNeutronAgent(object): [mock.call(ip_addresses=set(), port=vif.ofport)], int_br.install_arp_spoofing_protection.mock_calls) + def test_arp_spoofing_basic_rule_setup_fixed_ipv6(self): + vif = FakeVif() + fake_details = {'fixed_ips': [{'ip_address': 'fdf8:f53b:82e4::1'}], + 'device_owner': 'nobody'} + self.agent.prevent_arp_spoofing = True + br = mock.create_autospec(self.agent.int_br) + self.agent.setup_arp_spoofing_protection(br, vif, fake_details) + self.assertEqual( + [mock.call(port=vif.ofport)], + br.delete_arp_spoofing_protection.mock_calls) + self.assertTrue(br.install_icmpv6_na_spoofing_protection.called) + def test_arp_spoofing_fixed_and_allowed_addresses(self): vif = FakeVif() fake_details = { @@ -1406,6 +1419,25 @@ class TestOvsNeutronAgent(object): [mock.call(port=vif.ofport, ip_addresses=addresses)], int_br.install_arp_spoofing_protection.mock_calls) + def test_arp_spoofing_fixed_and_allowed_addresses_ipv6(self): + vif = FakeVif() + fake_details = { + 'device_owner': 'nobody', + 'fixed_ips': [{'ip_address': '2001:db8::1'}, + {'ip_address': '2001:db8::2'}], + 'allowed_address_pairs': [{'ip_address': '2001:db8::200', + 'mac_address': 'aa:22:33:44:55:66'}] + } + self.agent.prevent_arp_spoofing = True + int_br = mock.create_autospec(self.agent.int_br) + self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details) + # make sure all addresses are allowed including ipv6 LLAs + addresses = {'2001:db8::1', '2001:db8::2', '2001:db8::200', + 'fe80::a822:33ff:fe44:5566', 'fe80::a8bb:ccff:fe11:2233'} + self.assertEqual( + [mock.call(port=vif.ofport, ip_addresses=addresses)], + int_br.install_icmpv6_na_spoofing_protection.mock_calls) + def test__get_ofport_moves(self): previous = {'port1': 1, 'port2': 2} current = {'port1': 5, 'port2': 2} -- 2.45.2