This patch sets up framework for functional test of iptables firewall.
Tests will come as another patches.
Partially Implements: blueprint ml2-ovs-portsecurity
Change-Id: Ia0af50c8942681559bf9801515298127d30f9e34
--- /dev/null
+# Copyright 2015 Intel Corporation.
+# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
+# <isaku.yamahata at gmail com>
+# All Rights Reserved.
+#
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import utils
+
+
+class BridgeDevice(ip_lib.IPDevice):
+ def _brctl(self, cmd, log_fail_as_error=True):
+ cmd = ['brctl'] + cmd
+ if self.namespace:
+ cmd = ['ip', 'netns', 'exec', self.namespace] + cmd
+ return utils.execute(cmd, run_as_root=True,
+ log_fail_as_error=log_fail_as_error)
+
+ @classmethod
+ def addbr(cls, name, namespace=None):
+ bridge = cls(name, namespace)
+ bridge._brctl(['addbr', bridge.name])
+ return bridge
+
+ def delbr(self):
+ return self._brctl(['delbr', self.name])
+
+ def addif(self, interface):
+ return self._brctl(['addif', self.name, interface])
+
+ def delif(self, interface):
+ return self._brctl(['delif', self.name, interface])
IPTABLES_DIRECTION = {INGRESS_DIRECTION: 'physdev-out',
EGRESS_DIRECTION: 'physdev-in'}
- def __init__(self):
+ def __init__(self, namespace=None):
self.iptables = iptables_manager.IptablesManager(
- use_ipv6=ipv6_utils.is_enabled())
+ use_ipv6=ipv6_utils.is_enabled(),
+ namespace=namespace)
# TODO(majopela, shihanzhang): refactor out ipset to a separate
# driver composed over this one
- self.ipset = ipset_manager.IpsetManager()
+ self.ipset = ipset_manager.IpsetManager(namespace=namespace)
# list of port which has security group
self.filtered_ports = {}
self._add_fallback_chain_v4v6()
import netaddr
import testscenarios
+from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
max_length=n_const.DEVICE_NAME_MAX_LEN)
+def get_rand_bridge_name():
+ return get_rand_name(prefix=BR_PREFIX,
+ max_length=n_const.DEVICE_NAME_MAX_LEN)
+
+
class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
self._set_ip_up(dst_veth, '%s/24' % dst_addr)
return src_ns, dst_ns
+
+
+class BaseBridgeTestCase(BaseIPVethTestCase):
+ def create_veth_pairs(self, dst_namespace):
+ src_ns = self._create_namespace()
+ src_veth = get_rand_veth_name()
+ dst_veth = get_rand_veth_name()
+
+ return src_ns.add_veth(src_veth, dst_veth, dst_namespace)
+
+ def create_bridge(self, br_ns=None):
+ br_ns = br_ns or self._create_namespace()
+ br_name = get_rand_bridge_name()
+ bridge = bridge_lib.BridgeDevice.addbr(br_name, br_ns.namespace)
+ self.addCleanup(bridge.delbr)
+ bridge.link.set_up()
+ self.addCleanup(bridge.link.set_down)
+ return bridge
--- /dev/null
+# Copyright 2015 Intel Corporation.
+# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
+# <isaku.yamahata at gmail com>
+# All Rights Reserved.
+#
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import iptables_firewall
+from neutron.tests.functional.agent.linux import base
+
+
+class IptablesFirewallTestCase(base.BaseBridgeTestCase):
+ def setUp(self):
+ super(IptablesFirewallTestCase, self).setUp()
+ self.bridge = self.create_bridge()
+
+ self.src_veth, self.src_br_veth = self.create_veth_pairs(
+ self.bridge.namespace)
+ self.bridge.addif(self.src_br_veth.name)
+ self._set_ip_up(self.src_veth, '%s/24' % self.SRC_ADDRESS)
+ self.src_br_veth.link.set_up()
+
+ self.dst_veth, self.dst_br_veth = self.create_veth_pairs(
+ self.bridge.namespace)
+ self.bridge.addif(self.dst_br_veth.name)
+ self._set_ip_up(self.dst_veth, '%s/24' % self.DST_ADDRESS)
+ self.dst_br_veth.link.set_up()
+
+ self.firewall = iptables_firewall.IptablesFirewallDriver(
+ namespace=self.bridge.namespace)
+
+ # TODO(yamahata): add tests...
+ # setup firewall on bridge and send packet from src_veth and observe
+ # if sent packet can be observed on dst_veth
+ def test_firewall(self):
+ pass
--- /dev/null
+# Copyright 2015 Intel Corporation.
+# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
+# <isaku.yamahata at gmail com>
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.linux import bridge_lib
+from neutron.agent.linux import utils
+from neutron.tests import base
+
+
+class BridgeLibTest(base.BaseTestCase):
+ """A test suite to exercise the bridge libraries """
+ _NAMESPACE = 'test-namespace'
+ _BR_NAME = 'test-br'
+ _IF_NAME = 'test-if'
+
+ def setUp(self):
+ super(BridgeLibTest, self).setUp()
+ self.execute = mock.patch.object(
+ utils, "execute", spec=utils.execute).start()
+
+ def _verify_bridge_mock(self, cmd, namespace=None):
+ if namespace is not None:
+ cmd = ['ip', 'netns', 'exec', namespace] + cmd
+ self.execute.assert_called_once_with(cmd, run_as_root=True,
+ log_fail_as_error=True)
+ self.execute.reset_mock()
+
+ def _test_br(self, namespace=None):
+ br = bridge_lib.BridgeDevice.addbr(self._BR_NAME, namespace)
+ self._verify_bridge_mock(['brctl', 'addbr', self._BR_NAME], namespace)
+
+ br.addif(self._IF_NAME)
+ self._verify_bridge_mock(
+ ['brctl', 'addif', self._BR_NAME, self._IF_NAME], namespace)
+
+ br.delif(self._IF_NAME)
+ self._verify_bridge_mock(
+ ['brctl', 'delif', self._BR_NAME, self._IF_NAME], namespace)
+
+ br.delbr()
+ self._verify_bridge_mock(['brctl', 'delbr', self._BR_NAME], namespace)
+
+ def test_addbr_with_namespace(self):
+ self._test_br(self._NAMESPACE)
+
+ def test_addbr_without_namespace(self):
+ self._test_br()