--- /dev/null
+# Copyright (c) 2015 Thales Services SAS
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import fixtures
+
+from neutron.agent.linux import ip_lib
+from neutron.tests.common import net_helpers
+
+
+class FakeMachine(fixtures.Fixture):
+ """Create a fake machine.
+
+ :ivar bridge: bridge on which the fake machine is bound
+ :ivar ip_cidr: fake machine ip_cidr
+ :type ip_cidr: str
+ :ivar ip: fake machine ip
+ :type ip: str
+ :ivar gateway_ip: fake machine gateway ip
+ :type gateway_ip: str
+
+ :ivar namespace: namespace emulating the machine
+ :type namespace: str
+ :ivar port: port binding the namespace to the bridge
+ :type port: IPDevice
+ """
+
+ def __init__(self, bridge, ip_cidr, gateway_ip=None):
+ super(FakeMachine, self).__init__()
+ self.bridge = bridge
+ self.ip_cidr = ip_cidr
+ self.ip = self.ip_cidr.partition('/')[0]
+ self.gateway_ip = gateway_ip
+
+ def setUp(self):
+ super(FakeMachine, self).setUp()
+ self.namespace = self.useFixture(
+ net_helpers.NamespaceFixture()).name
+
+ self.port = self.useFixture(
+ net_helpers.PortFixture.get(self.bridge, self.namespace)).port
+ self.port.addr.add(self.ip_cidr)
+
+ if self.gateway_ip:
+ net_helpers.set_namespace_gateway(self.port, self.gateway_ip)
+
+ def execute(self, *args, **kwargs):
+ ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
+ return ns_ip_wrapper.netns.execute(*args, **kwargs)
+
+
+class PeerMachines(fixtures.Fixture):
+ """Create 'amount' peered machines on an ip_cidr.
+
+ :ivar bridge: bridge on which peer machines are bound
+ :ivar ip_cidr: ip_cidr on which peer machines have ips
+ :type ip_cidr: str
+ :ivar machines: fake machines
+ :type machines: FakeMachine list
+ """
+
+ AMOUNT = 2
+ CIDR = '192.168.0.1/24'
+
+ def __init__(self, bridge, ip_cidr=None, gateway_ip=None):
+ super(PeerMachines, self).__init__()
+ self.bridge = bridge
+ self.ip_cidr = ip_cidr or self.CIDR
+ self.gateway_ip = gateway_ip
+
+ def setUp(self):
+ super(PeerMachines, self).setUp()
+ self.machines = []
+
+ for index in range(self.AMOUNT):
+ ip_cidr = net_helpers.increment_ip_cidr(self.ip_cidr, index)
+ self.machines.append(
+ self.useFixture(
+ FakeMachine(self.bridge, ip_cidr, self.gateway_ip)))
super(VethFixture, self).setUp()
ip_wrapper = ip_lib.IPWrapper()
- def _create_veth(name0):
- name1 = name0.replace(VETH0_PREFIX, VETH1_PREFIX)
- return ip_wrapper.add_veth(name0, name1)
+ self.ports = common_base.create_resource(
+ VETH0_PREFIX,
+ lambda name: ip_wrapper.add_veth(name, self.get_peer_name(name)))
- self.ports = common_base.create_resource(VETH0_PREFIX, _create_veth)
self.addCleanup(self.destroy)
def destroy(self):
# when a namespace owning a veth endpoint is deleted.
pass
+ @staticmethod
+ def get_peer_name(name):
+ if name.startswith(VETH0_PREFIX):
+ return name.replace(VETH0_PREFIX, VETH1_PREFIX)
+ elif name.startswith(VETH1_PREFIX):
+ return name.replace(VETH1_PREFIX, VETH0_PREFIX)
+ else:
+ tools.fail('%s is not a valid VethFixture veth endpoint' % name)
+
@six.add_metaclass(abc.ABCMeta)
class PortFixture(fixtures.Fixture):
if not self.bridge:
self.bridge = self.useFixture(self._create_bridge_fixture()).bridge
+ @classmethod
+ def get(cls, bridge, namespace=None):
+ """Deduce PortFixture class from bridge type and instantiate it."""
+ if isinstance(bridge, ovs_lib.OVSBridge):
+ return OVSPortFixture(bridge, namespace)
+ if isinstance(bridge, bridge_lib.BridgeDevice):
+ return LinuxBridgePortFixture(bridge, namespace)
+ if isinstance(bridge, VethBridge):
+ return VethPortFixture(bridge, namespace)
+ tools.fail('Unexpected bridge type: %s' % type(bridge))
+
class OVSBridgeFixture(fixtures.Fixture):
"""Create an OVS bridge.
import testscenarios
+from neutron.agent.linux import ip_lib
from neutron.tests import base as tests_base
+from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
- def _create_namespace(self, prefix=net_helpers.NS_PREFIX):
- return self.useFixture(net_helpers.NamespaceFixture(prefix)).ip_wrapper
-
def create_veth(self):
return self.useFixture(net_helpers.VethFixture()).ports
self.config(group='OVS', ovsdb_interface=self.ovsdb_interface)
-class BaseIPVethTestCase(BaseLinuxTestCase):
- SRC_ADDRESS = '192.168.0.1'
- DST_ADDRESS = '192.168.0.2'
-
- @staticmethod
- def _set_ip_up(device, cidr):
- device.addr.add(cidr)
- device.link.set_up()
+class BaseIPVethTestCase(functional_base.BaseSudoTestCase):
def prepare_veth_pairs(self):
-
- src_addr = self.SRC_ADDRESS
- dst_addr = self.DST_ADDRESS
-
- src_veth, dst_veth = self.create_veth()
- src_ns = self._create_namespace()
- dst_ns = self._create_namespace()
- src_ns.add_device_to_namespace(src_veth)
- dst_ns.add_device_to_namespace(dst_veth)
-
- self._set_ip_up(src_veth, '%s/24' % src_addr)
- self._set_ip_up(dst_veth, '%s/24' % dst_addr)
-
- return src_ns, dst_ns
+ bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
+ machines = self.useFixture(
+ machine_fixtures.PeerMachines(bridge)).machines
+ self.SRC_ADDRESS = machines[0].ip
+ self.DST_ADDRESS = machines[1].ip
+ return [ip_lib.IPWrapper(m.namespace) for m in machines]
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_firewall
from neutron.agent import securitygroups_rpc as sg_cfg
+from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
-from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
+from neutron.tests.functional import base
from oslo_config import cfg
-class IptablesFirewallTestCase(base.BaseIPVethTestCase):
+class IptablesFirewallTestCase(base.BaseSudoTestCase):
MAC_REAL = "fa:16:3e:9a:2f:49"
MAC_SPOOFED = "fa:16:3e:9a:2f:48"
FAKE_SECURITY_GROUP_ID = "fake_sg_id"
def _set_src_mac(self, mac):
- self.src_port.link.set_down()
- self.src_port.link.set_address(mac)
- self.src_port.link.set_up()
+ self.client.port.link.set_down()
+ self.client.port.link.set_address(mac)
+ self.client.port.link.set_up()
def setUp(self):
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
super(IptablesFirewallTestCase, self).setUp()
- bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
-
- # FIXME(cbrandily): temporary, will be replaced by fake machines
- self.src_ip_wrapper = self.useFixture(
- net_helpers.NamespaceFixture()).ip_wrapper
- src_port_fixture = self.useFixture(
- net_helpers.LinuxBridgePortFixture(
- bridge, self.src_ip_wrapper.namespace))
- self.src_port = src_port_fixture.port
- self._set_ip_up(self.src_port, '%s/24' % self.SRC_ADDRESS)
-
- self.dst_ip_wrapper = self.useFixture(
- net_helpers.NamespaceFixture()).ip_wrapper
- self.dst_port = self.useFixture(
- net_helpers.LinuxBridgePortFixture(
- bridge, self.dst_ip_wrapper.namespace)).port
- self._set_ip_up(self.dst_port, '%s/24' % self.DST_ADDRESS)
+ bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
+ self.client, self.server = self.useFixture(
+ machine_fixtures.PeerMachines(bridge)).machines
self.firewall = iptables_firewall.IptablesFirewallDriver(
namespace=bridge.namespace)
self._set_src_mac(self.MAC_REAL)
+ client_br_port_name = net_helpers.VethFixture.get_peer_name(
+ self.client.port.name)
self.src_port_desc = {'admin_state_up': True,
- 'device': src_port_fixture.br_port.name,
+ 'device': client_br_port_name,
'device_owner': 'compute:None',
- 'fixed_ips': [self.SRC_ADDRESS],
+ 'fixed_ips': [self.client.ip],
'mac_address': self.MAC_REAL,
'port_security_enabled': True,
'security_groups': [self.FAKE_SECURITY_GROUP_ID],
# setup firewall on bridge and send packet from src_veth and observe
# if sent packet can be observed on dst_veth
def test_port_sec_within_firewall(self):
- pinger = helpers.Pinger(self.src_ip_wrapper)
+ client_ip_wrapper = ip_lib.IPWrapper(self.client.namespace)
+ pinger = helpers.Pinger(client_ip_wrapper)
# update the sg_group to make ping pass
sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
self.FAKE_SECURITY_GROUP_ID,
sg_rules)
self.firewall.prepare_port_filter(self.src_port_desc)
- pinger.assert_ping(self.DST_ADDRESS)
+ pinger.assert_ping(self.server.ip)
# modify the src_veth's MAC and test again
self._set_src_mac(self.MAC_SPOOFED)
- pinger.assert_no_ping(self.DST_ADDRESS)
+ pinger.assert_no_ping(self.server.ip)
# update the port's port_security_enabled value and test again
self.src_port_desc['port_security_enabled'] = False
self.firewall.update_port_filter(self.src_port_desc)
- pinger.assert_ping(self.DST_ADDRESS)
+ pinger.assert_ping(self.server.ip)
from neutron.common import constants as l3_constants
from neutron.common import utils as common_utils
from neutron.openstack.common import uuidutils
+from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
router_ip_cidr = self._port_first_ip_cidr(router.internal_ports[0])
router_ip = router_ip_cidr.partition('/')[0]
- src_ip_cidr = net_helpers.increment_ip_cidr(router_ip_cidr)
- dst_ip_cidr = net_helpers.increment_ip_cidr(src_ip_cidr)
- dst_ip = dst_ip_cidr.partition('/')[0]
+ br_int = get_ovs_bridge(self.agent.conf.ovs_integration_bridge)
+ src_machine, dst_machine = self.useFixture(
+ machine_fixtures.PeerMachines(
+ br_int,
+ net_helpers.increment_ip_cidr(router_ip_cidr),
+ router_ip)).machines
+
dst_fip = '19.4.4.10'
router.router[l3_constants.FLOATINGIP_KEY] = []
- self._add_fip(router, dst_fip, fixed_address=dst_ip)
+ self._add_fip(router, dst_fip, fixed_address=dst_machine.ip)
router.process(self.agent)
- br_int = get_ovs_bridge(self.agent.conf.ovs_integration_bridge)
-
- # FIXME(cbrandily): temporary, will be replaced by fake machines
- src_ns = self._create_namespace(prefix='test-src-')
- src_port = self.useFixture(
- net_helpers.OVSPortFixture(br_int, src_ns.namespace)).port
- src_port.addr.add(src_ip_cidr)
- net_helpers.set_namespace_gateway(src_port, router_ip)
- dst_ns = self._create_namespace(prefix='test-dst-')
- dst_port = self.useFixture(
- net_helpers.OVSPortFixture(br_int, dst_ns.namespace)).port
- dst_port.addr.add(dst_ip_cidr)
- net_helpers.set_namespace_gateway(dst_port, router_ip)
-
- protocol_port = helpers.get_free_namespace_port(dst_ns)
+ protocol_port = helpers.get_free_namespace_port(dst_machine.namespace)
# client sends to fip
- netcat = helpers.NetcatTester(src_ns, dst_ns, dst_ip,
- protocol_port,
- client_address=dst_fip,
- run_as_root=True,
- udp=False)
+ netcat = helpers.NetcatTester(
+ ip_lib.IPWrapper(src_machine.namespace),
+ ip_lib.IPWrapper(dst_machine.namespace),
+ dst_machine.ip, protocol_port, client_address=dst_fip,
+ run_as_root=True, udp=False)
self.addCleanup(netcat.stop_processes)
self.assertTrue(netcat.test_connectivity())
self._create_metadata_fake_server(webob.exc.HTTPOk.code)
# Create and configure client namespace
- client_ns = self._create_namespace()
router_ip_cidr = self._port_first_ip_cidr(router.internal_ports[0])
- ip_cidr = net_helpers.increment_ip_cidr(router_ip_cidr)
br_int = get_ovs_bridge(self.agent.conf.ovs_integration_bridge)
- # FIXME(cbrandily): temporary, will be replaced by a fake machine
- port = self.useFixture(
- net_helpers.OVSPortFixture(br_int, client_ns.namespace)).port
- port.addr.add(ip_cidr)
- net_helpers.set_namespace_gateway(port,
- router_ip_cidr.partition('/')[0])
+ machine = self.useFixture(
+ machine_fixtures.FakeMachine(
+ br_int,
+ net_helpers.increment_ip_cidr(router_ip_cidr),
+ router_ip_cidr.partition('/')[0]))
# Query metadata proxy
url = 'http://%(host)s:%(port)s' % {'host': dhcp.METADATA_DEFAULT_IP,
'port': dhcp.METADATA_PORT}
cmd = 'curl', '--max-time', METADATA_REQUEST_TIMEOUT, '-D-', url
try:
- raw_headers = client_ns.netns.execute(cmd)
+ raw_headers = machine.execute(cmd)
except RuntimeError:
self.fail('metadata proxy unreachable on %s before timeout' % url)