addresses += [p['ip_address']
for p in port_details['allowed_address_pairs']]
- # allow ARP replies as long as they match addresses that actually
+ # allow ARPs as long as they match addresses that actually
# belong to the port.
for ip in addresses:
if netaddr.IPNetwork(ip).version != 4:
continue
- bridge.add_flow(
- table=constants.ARP_SPOOF_TABLE, priority=2,
- proto='arp', arp_op=constants.ARP_REPLY, arp_spa=ip,
- in_port=vif.ofport, actions="NORMAL")
+ bridge.add_flow(table=constants.ARP_SPOOF_TABLE, priority=2,
+ proto='arp', arp_spa=ip, in_port=vif.ofport,
+ actions="NORMAL")
- # drop any ARP replies in this table that aren't explicitly allowed
- bridge.add_flow(
- table=constants.ARP_SPOOF_TABLE, priority=1, proto='arp',
- arp_op=constants.ARP_REPLY, actions="DROP")
+ # drop any ARPs in this table that aren't explicitly allowed
+ bridge.add_flow(table=constants.ARP_SPOOF_TABLE, priority=1,
+ proto='arp', actions="DROP")
# Now that the rules are ready, direct ARP traffic from the port into
# the anti-spoof table.
# on ARP headers will just process traffic normally.
bridge.add_flow(table=constants.LOCAL_SWITCHING,
priority=10, proto='arp', in_port=vif.ofport,
- arp_op=constants.ARP_REPLY,
actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE))
def port_unbound(self, vif_id, net_uuid=None):
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.agent.linux import ip_lib
from neutron.cmd.sanity import checks
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional.agent import test_ovs_lib
+from neutron.tests import tools
class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
pinger = helpers.Pinger(self.src_ns)
pinger.assert_no_ping(self.dst_addr)
+ def test_arp_spoof_blocks_request(self):
+ # this will prevent the source from sending an ARP
+ # request with its own address
+ self._setup_arp_spoof_for_port(self.src_p.name, ['192.168.0.3'])
+ self.src_p.addr.add('%s/24' % self.src_addr)
+ self.dst_p.addr.add('%s/24' % self.dst_addr)
+ ns_ip_wrapper = ip_lib.IPWrapper(self.src_ns)
+ try:
+ ns_ip_wrapper.netns.execute(['arping', '-I', self.src_p.name,
+ '-c1', self.dst_addr])
+ tools.fail("arping should have failed. The arp request should "
+ "have been blocked.")
+ except RuntimeError:
+ pass
+
def test_arp_spoof_allowed_address_pairs(self):
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3',
self.dst_addr])
# make sure redirect into spoof table is installed
int_br.add_flow.assert_any_call(
table=constants.LOCAL_SWITCHING, in_port=vif.ofport,
- arp_op=constants.ARP_REPLY, proto='arp', actions=mock.ANY,
- priority=10)
+ proto='arp', actions=mock.ANY, priority=10)
# make sure drop rule for replies is installed
int_br.add_flow.assert_any_call(
table=constants.ARP_SPOOF_TABLE,
- proto='arp', arp_op=constants.ARP_REPLY, actions='DROP',
- priority=mock.ANY)
+ proto='arp', actions='DROP', priority=mock.ANY)
def test_arp_spoofing_fixed_and_allowed_addresses(self):
vif = FakeVif()
'192.168.44.103/32'):
int_br.add_flow.assert_any_call(
table=constants.ARP_SPOOF_TABLE, in_port=vif.ofport,
- proto='arp', arp_op=constants.ARP_REPLY, actions='NORMAL',
- arp_spa=addr, priority=mock.ANY)
+ proto='arp', actions='NORMAL', arp_spa=addr, priority=mock.ANY)
def test__get_ofport_moves(self):
previous = {'port1': 1, 'port2': 2}