from neutron.agent.linux import ip_lib
from neutron.tests.common import net_helpers
+from neutron.tests import tools
+
+
+class Pinger(object):
+ def __init__(self, namespace, timeout=1, max_attempts=1):
+ self.namespace = namespace
+ self._timeout = timeout
+ self._max_attempts = max_attempts
+
+ def _ping_destination(self, dest_address):
+ ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
+ ns_ip_wrapper.netns.execute(['ping', '-c', self._max_attempts,
+ '-W', self._timeout, dest_address])
+
+ def assert_ping(self, dst_ip):
+ self._ping_destination(dst_ip)
+
+ def assert_no_ping(self, dst_ip):
+ try:
+ self._ping_destination(dst_ip)
+ tools.fail("destination ip %(dst_ip)s is replying to ping "
+ "from namespace %(ns)s, but it shouldn't" %
+ {'ns': self.namespace, 'dst_ip': dst_ip})
+ except RuntimeError:
+ pass
class FakeMachine(fixtures.Fixture):
def setUp(self):
super(FakeMachine, self).setUp()
- self.namespace = self.useFixture(
- net_helpers.NamespaceFixture()).name
+ ns_fixture = self.useFixture(
+ net_helpers.NamespaceFixture())
+ self.namespace = ns_fixture.name
self.port = self.useFixture(
net_helpers.PortFixture.get(self.bridge, self.namespace)).port
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
return ns_ip_wrapper.netns.execute(*args, **kwargs)
+ def assert_ping(self, dst_ip):
+ pinger = Pinger(self.namespace)
+ pinger.assert_ping(dst_ip)
+
+ def assert_no_ping(self, dst_ip):
+ pinger = Pinger(self.namespace)
+ pinger.assert_no_ping(dst_ip)
+
class PeerMachines(fixtures.Fixture):
"""Create 'amount' peered machines on an ip_cidr.
import testscenarios
-from neutron.agent.linux import ip_lib
from neutron.tests import base as tests_base
-from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
def setUp(self):
super(BaseOVSLinuxTestCase, self).setUp()
self.config(group='OVS', ovsdb_interface=self.ovsdb_interface)
-
-
-class BaseIPVethTestCase(functional_base.BaseSudoTestCase):
-
- def prepare_veth_pairs(self):
- bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
- machines = self.useFixture(
- machine_fixtures.PeerMachines(bridge)).machines
- self.SRC_ADDRESS = machines[0].ip
- self.DST_ADDRESS = machines[1].ip
- return [ip_lib.IPWrapper(m.namespace) for m in machines]
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
-from neutron.tests import tools
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
return random.choice(list(candidates - used))
-class Pinger(object):
- def __init__(self, namespace, timeout=1, max_attempts=1):
- self.namespace = namespace
- self._timeout = timeout
- self._max_attempts = max_attempts
-
- def _ping_destination(self, dest_address):
- self.namespace.netns.execute(['ping', '-c', self._max_attempts,
- '-W', self._timeout, dest_address])
-
- def assert_ping(self, dst_ip):
- self._ping_destination(dst_ip)
-
- def assert_no_ping(self, dst_ip):
- try:
- self._ping_destination(dst_ip)
- tools.fail("destination ip %(dst_ip)s is replying to ping"
- "from namespace %(ns)s, but it shouldn't" %
- {'ns': self.namespace.namespace, 'dst_ip': dst_ip})
- except RuntimeError:
- pass
-
-
class RootHelperProcess(subprocess.Popen):
def __init__(self, cmd, *args, **kwargs):
for arg in ('stdin', 'stdout', 'stderr'):
from neutron.agent.linux import ebtables_driver
from neutron.agent.linux import ip_lib
-from neutron.agent.linux import utils as linux_utils
-from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
+from neutron.tests.functional import base
NO_FILTER_APPLY = (
"COMMIT")
-class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
+class EbtablesLowLevelTestCase(base.BaseSudoTestCase):
def setUp(self):
super(EbtablesLowLevelTestCase, self).setUp()
- self.src_ns, self.dst_ns = self.prepare_veth_pairs()
- devs = [d for d in self.src_ns.get_devices() if d.name != "lo"]
- src_dev_name = devs[0].name
- self.ns = self.src_ns.namespace
- self.execute = linux_utils.execute
- self.pinger = helpers.Pinger(self.src_ns)
+
+ bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+ self.source, self.destination = self.useFixture(
+ machine_fixtures.PeerMachines(bridge)).machines
# Extract MAC and IP address of one of my interfaces
- self.mac = self.src_ns.device(src_dev_name).link.address
- addr = [a for a in
- self.src_ns.device(src_dev_name).addr.list()][0]['cidr']
- self.addr = addr.split("/")[0]
+ self.mac = self.source.port.link.address
+ self.addr = self.source.ip
# Pick one of the namespaces and setup a bridge for the local ethernet
# interface there, because ebtables only works on bridged interfaces.
- self.src_ns.netns.execute("brctl addbr mybridge".split())
- self.src_ns.netns.execute(("brctl addif mybridge %s" % src_dev_name).
- split())
+ self.source.execute(['brctl', 'addbr', 'mybridge'])
+ self.source.execute(
+ ['brctl', 'addif', 'mybridge', self.source.port.name])
# Take the IP addrss off one of the interfaces and apply it to the
# bridge interface instead.
- dev_source = ip_lib.IPDevice(src_dev_name, self.src_ns.namespace)
- dev_mybridge = ip_lib.IPDevice("mybridge", self.src_ns.namespace)
- dev_source.addr.delete(addr)
+ self.source.port.addr.delete(self.source.ip_cidr)
+ dev_mybridge = ip_lib.IPDevice("mybridge", self.source.namespace)
dev_mybridge.link.set_up()
- dev_mybridge.addr.add(addr)
+ dev_mybridge.addr.add(self.source.ip_cidr)
def _test_basic_port_filter_wrong_mac(self):
# Setup filter with wrong IP/MAC address pair. Basic filters only allow
mac_ip_pair = dict(mac_addr="11:11:11:22:22:22", ip_addr=self.addr)
filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
- self.execute,
- self.ns)
- self.pinger.assert_no_ping(self.DST_ADDRESS)
+ self.source.execute)
+ self.source.assert_no_ping(self.destination.ip)
# Assure that ping will work once we unfilter the instance
ebtables_driver.ebtables_restore(NO_FILTER_APPLY,
- self.execute,
- self.ns)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.source.execute)
+ self.source.assert_ping(self.destination.ip)
def _test_basic_port_filter_correct_mac(self):
# Use the correct IP/MAC address pair for this one.
filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
- self.execute,
- self.ns)
+ self.source.execute)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.source.assert_ping(self.destination.ip)
def test_ebtables_filtering(self):
# Cannot parallelize those tests. Therefore need to call them
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import ipset_manager
from neutron.agent.linux import iptables_manager
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.functional import base as functional_base
IPSET_SET = 'test-set'
IPSET_ETHERTYPE = 'IPv4'
UNRELATED_IP = '1.1.1.1'
-class IpsetBase(base.BaseIPVethTestCase):
+class IpsetBase(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpsetBase, self).setUp()
- self.src_ns, self.dst_ns = self.prepare_veth_pairs()
- self.ipset = self._create_ipset_manager_and_set(self.dst_ns,
- IPSET_SET)
+ bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+ self.source, self.destination = self.useFixture(
+ machine_fixtures.PeerMachines(bridge)).machines
+
+ self.ipset = self._create_ipset_manager_and_set(
+ ip_lib.IPWrapper(self.destination.namespace), IPSET_SET)
self.dst_iptables = iptables_manager.IptablesManager(
- namespace=self.dst_ns.namespace)
+ namespace=self.destination.namespace)
self._add_iptables_ipset_rules(self.dst_iptables)
- self.pinger = helpers.Pinger(self.src_ns)
def _create_ipset_manager_and_set(self, dst_ns, set_name):
ipset = ipset_manager.IpsetManager(
class IpsetManagerTestCase(IpsetBase):
def test_add_member_allows_ping(self):
- self.pinger.assert_no_ping(self.DST_ADDRESS)
- self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.source.assert_no_ping(self.destination.ip)
+ self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
+ self.source.assert_ping(self.destination.ip)
def test_del_member_denies_ping(self):
- self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
+ self.source.assert_ping(self.destination.ip)
- self.ipset._del_member_from_set(IPSET_SET, self.SRC_ADDRESS)
- self.pinger.assert_no_ping(self.DST_ADDRESS)
+ self.ipset._del_member_from_set(IPSET_SET, self.source.ip)
+ self.source.assert_no_ping(self.destination.ip)
def test_refresh_ipset_allows_ping(self):
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
- self.pinger.assert_no_ping(self.DST_ADDRESS)
+ self.source.assert_no_ping(self.destination.ip)
- self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.SRC_ADDRESS],
+ self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip],
IPSET_ETHERTYPE)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.source.assert_ping(self.destination.ip)
- self.ipset._refresh_set(IPSET_SET, [self.SRC_ADDRESS, UNRELATED_IP],
+ self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP],
IPSET_ETHERTYPE)
- self.pinger.assert_ping(self.DST_ADDRESS)
+ self.source.assert_ping(self.destination.ip)
def test_destroy_ipset_set(self):
self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)
import testtools
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import utils
from neutron.tests import base
+from neutron.tests.common import machine_fixtures
+from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base as linux_base
from neutron.tests.functional.agent.linux.bin import ipt_binname
from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.functional import base as functional_base
-class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
+class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
'egress': 'OUTPUT'}
PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
- self.client_ns, self.server_ns = self.prepare_veth_pairs()
+
+ bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+ self.client, self.server = self.useFixture(
+ machine_fixtures.PeerMachines(bridge)).machines
+
self.client_fw, self.server_fw = self.create_firewalls()
# The port is used in isolated namespace that precludes possibility of
# port conflicts
- self.port = helpers.get_free_namespace_port(self.server_ns.namespace)
+ self.port = helpers.get_free_namespace_port(self.server.namespace)
def create_firewalls(self):
client_iptables = iptables_manager.IptablesManager(
- namespace=self.client_ns.namespace)
+ namespace=self.client.namespace)
server_iptables = iptables_manager.IptablesManager(
- namespace=self.server_ns.namespace)
+ namespace=self.server.namespace)
return client_iptables, server_iptables
return chain, rule
def _test_with_nc(self, fw_manager, direction, port, udp):
- netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
- self.DST_ADDRESS, self.port,
- run_as_root=True, udp=udp)
+ netcat = helpers.NetcatTester(
+ ip_lib.IPWrapper(self.client.namespace),
+ ip_lib.IPWrapper(self.server.namespace),
+ self.server.ip, self.port, run_as_root=True, udp=udp)
self.addCleanup(netcat.stop_processes)
protocol = 'tcp'
if udp:
protocol = 'udp'
self.assertTrue(netcat.test_connectivity())
self.filter_add_rule(
- fw_manager, self.DST_ADDRESS, direction, protocol, port)
+ fw_manager, self.server.ip, direction, protocol, port)
with testtools.ExpectedException(RuntimeError):
netcat.test_connectivity()
self.filter_remove_rule(
- fw_manager, self.DST_ADDRESS, direction, protocol, port)
+ fw_manager, self.server.ip, direction, protocol, port)
self.assertTrue(netcat.test_connectivity(True))
def test_icmp(self):
- pinger = helpers.Pinger(self.client_ns)
- pinger.assert_ping(self.DST_ADDRESS)
+ self.client.assert_ping(self.server.ip)
self.server_fw.ipv4['filter'].add_rule('INPUT',
linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
- pinger.assert_no_ping(self.DST_ADDRESS)
+ self.client.assert_no_ping(self.server.ip)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
- pinger.assert_ping(self.DST_ADDRESS)
+ self.client.assert_ping(self.server.ip)
def test_mangle_icmp(self):
- pinger = helpers.Pinger(self.client_ns)
- pinger.assert_ping(self.DST_ADDRESS)
+ self.client.assert_ping(self.server.ip)
self.server_fw.ipv4['mangle'].add_rule('INPUT',
linux_base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].add_rule('INPUT',
linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
- pinger.assert_no_ping(self.DST_ADDRESS)
+ self.client.assert_no_ping(self.server.ip)
self.server_fw.ipv4['mangle'].remove_rule('INPUT',
linux_base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
- pinger.assert_ping(self.DST_ADDRESS)
+ self.client.assert_ping(self.server.ip)
def test_tcp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False)
# License for the specific language governing permissions and limitations
# under the License.
-from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_firewall
from neutron.agent import securitygroups_rpc as sg_cfg
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
-from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base
from oslo_config import cfg
# setup firewall on bridge and send packet from src_veth and observe
# if sent packet can be observed on dst_veth
def test_port_sec_within_firewall(self):
- client_ip_wrapper = ip_lib.IPWrapper(self.client.namespace)
- pinger = helpers.Pinger(client_ip_wrapper)
# update the sg_group to make ping pass
sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
self.FAKE_SECURITY_GROUP_ID,
sg_rules)
self.firewall.prepare_port_filter(self.src_port_desc)
- pinger.assert_ping(self.server.ip)
+ self.client.assert_ping(self.server.ip)
# modify the src_veth's MAC and test again
self._set_src_mac(self.MAC_SPOOFED)
- pinger.assert_no_ping(self.server.ip)
+ self.client.assert_no_ping(self.server.ip)
# update the port's port_security_enabled value and test again
self.src_port_desc['port_security_enabled'] = False
self.firewall.update_port_filter(self.src_port_desc)
- pinger.assert_ping(self.server.ip)
+ self.client.assert_ping(self.server.ip)
from neutron.cmd.sanity import checks
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
+from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
-from neutron.tests.functional.agent.linux import base
-from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional.agent import test_ovs_lib
+from neutron.tests.functional import base
class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
- base.BaseIPVethTestCase):
+ base.BaseSudoTestCase):
def setUp(self):
if not checks.arp_header_match_supported():
self.dst_addr = '192.168.0.2'
self.src_ns = self._create_namespace()
self.dst_ns = self._create_namespace()
- self.pinger = helpers.Pinger(self.src_ns, max_attempts=2)
+ self.pinger = machine_fixtures.Pinger(
+ self.src_ns.namespace, max_attempts=2)
self.src_p = self.useFixture(
net_helpers.OVSPortFixture(self.br, self.src_ns.namespace)).port
self.dst_p = self.useFixture(