LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
"it has port security disabled"), vif)
return
+ if port_details['device_owner'].startswith('network:'):
+ # clear any previous entries related to this port
+ delete_arp_spoofing_protection([vif], current_rules)
+ LOG.debug("Skipping ARP spoofing rules for network owned port "
+ "'%s'.", vif)
+ return
# collect all of the addresses and cidrs that belong to the port
addresses = {f['ip_address'] for f in port_details['fixed_ips']}
if port_details.get('allowed_address_pairs'):
machine_fixtures.PeerMachines(bridge, amount=3)).machines
def _add_arp_protection(self, machine, addresses, extra_port_dict=None):
- port_dict = {'fixed_ips': [{'ip_address': a} for a in addresses]}
+ port_dict = {'fixed_ips': [{'ip_address': a} for a in addresses],
+ 'device_owner': 'nobody'}
if extra_port_dict:
port_dict.update(extra_port_dict)
name = net_helpers.VethFixture.get_peer_name(machine.port.name)
{'port_security_enabled': False})
arping(self.observer.namespace, self.source.ip)
+ def test_arp_protection_network_owner(self):
+ self._add_arp_protection(self.source, ['1.1.1.1'])
+ no_arping(self.observer.namespace, self.source.ip)
+ self._add_arp_protection(self.source, ['1.1.1.1'],
+ {'device_owner': 'network:router_gateway'})
+ arping(self.observer.namespace, self.source.ip)
+
def test_arp_protection_dead_reference_removal(self):
self._add_arp_protection(self.source, ['1.1.1.1'])
self._add_arp_protection(self.destination, ['2.2.2.2'])