# License for the specific language governing permissions and limitations
# under the License.
#
-
import fixtures
-import netaddr
from neutron.agent.linux import ip_lib
from neutron.tests.common import net_helpers
-from neutron.tests import tools
-
-
-class Pinger(object):
- def __init__(self, namespace, timeout=1, max_attempts=1):
- self.namespace = namespace
- self._timeout = timeout
- self._max_attempts = max_attempts
-
- def _ping_destination(self, dest_address):
- ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
- ipversion = netaddr.IPAddress(dest_address).version
- ping_command = 'ping' if ipversion == 4 else 'ping6'
- ns_ip_wrapper.netns.execute([ping_command, '-c', self._max_attempts,
- '-W', self._timeout, dest_address])
-
- def assert_ping(self, dst_ip):
- self._ping_destination(dst_ip)
-
- def assert_no_ping(self, dst_ip):
- try:
- self._ping_destination(dst_ip)
- tools.fail("destination ip %(dst_ip)s is replying to ping "
- "from namespace %(ns)s, but it shouldn't" %
- {'ns': self.namespace, 'dst_ip': dst_ip})
- except RuntimeError:
- pass
class FakeMachine(fixtures.Fixture):
return ns_ip_wrapper.netns.execute(*args, **kwargs)
def assert_ping(self, dst_ip):
- pinger = Pinger(self.namespace)
- pinger.assert_ping(dst_ip)
+ net_helpers.assert_ping(self.namespace, dst_ip)
def assert_no_ping(self, dst_ip):
- pinger = Pinger(self.namespace)
- pinger.assert_no_ping(dst_ip)
+ net_helpers.assert_no_ping(self.namespace, dst_ip)
class PeerMachines(fixtures.Fixture):
port_dev.route.add_gateway(gateway_ip)
+def assert_ping(src_namespace, dst_ip, timeout=1, count=1):
+ ipversion = netaddr.IPAddress(dst_ip).version
+ ping_command = 'ping' if ipversion == 4 else 'ping6'
+ ns_ip_wrapper = ip_lib.IPWrapper(src_namespace)
+ ns_ip_wrapper.netns.execute([ping_command, '-c', count, '-W', timeout,
+ dst_ip])
+
+
+def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
+ try:
+ assert_ping(src_namespace, dst_ip, timeout, count)
+ except RuntimeError:
+ pass
+ else:
+ tools.fail("destination ip %(destination)s is replying to ping from "
+ "namespace %(ns)s, but it shouldn't" %
+ {'ns': src_namespace, 'destination': dst_ip})
+
+
class NamespaceFixture(fixtures.Fixture):
"""Create a namespace.
from neutron.cmd.sanity import checks
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
from neutron.plugins.openvswitch.common import constants
-from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent import test_ovs_lib
from neutron.tests.functional import base
net_helpers.NamespaceFixture()).name
self.dst_namespace = self.useFixture(
net_helpers.NamespaceFixture()).name
- self.pinger = machine_fixtures.Pinger(
- self.src_namespace, max_attempts=2)
self.src_p = self.useFixture(
net_helpers.OVSPortFixture(self.br, self.src_namespace)).port
self.dst_p = self.useFixture(
self._setup_arp_spoof_for_port(self.dst_p.name, [self.dst_addr])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
- self.pinger.assert_ping(self.dst_addr)
+ net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
def test_arp_spoof_doesnt_block_ipv6(self):
self.src_addr = '2000::1'
# make sure the IPv6 addresses are ready before pinging
self.src_p.addr.wait_until_address_ready(self.src_addr)
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
- self.pinger.assert_ping(self.dst_addr)
+ net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
def test_arp_spoof_blocks_response(self):
# this will prevent the destination from responding to the ARP
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
- self.pinger.assert_no_ping(self.dst_addr)
+ net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
def test_arp_spoof_blocks_request(self):
# this will prevent the source from sending an ARP
self.dst_addr])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
- self.pinger.assert_ping(self.dst_addr)
+ net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
def test_arp_spoof_disable_port_security(self):
# block first and then disable port security to make sure old rules
psec=False)
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
- self.pinger.assert_ping(self.dst_addr)
+ net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
def _setup_arp_spoof_for_port(self, port, addrs, psec=True):
of_port_map = self.br.get_vif_port_to_ofport_map()