class IpsetManager(object):
"""Wrapper for ipset."""
- def __init__(self, execute=None, root_helper=None):
+ def __init__(self, execute=None, root_helper=None, namespace=None):
self.execute = execute or linux_utils.execute
self.root_helper = root_helper
+ self.namespace = namespace
@utils.synchronized('ipset', external=True)
def create_ipset_chain(self, chain_name, ethertype):
def _apply(self, cmd, input=None):
input = '\n'.join(input) if input else None
- self.execute(cmd, root_helper=self.root_helper, process_input=input)
+ cmd_ns = []
+ if self.namespace:
+ cmd_ns.extend(['ip', 'netns', 'exec', self.namespace])
+ cmd_ns.extend(cmd)
+ self.execute(cmd_ns,
+ root_helper=self.root_helper,
+ process_input=input)
def _get_ipset_chain_type(self, ethertype):
return 'inet6' if ethertype == 'IPv6' else 'inet'
import random
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
+from neutron.openstack.common import uuidutils
+from neutron.tests.functional.agent.linux import pinger
from neutron.tests.functional import base as functional_base
BR_PREFIX = 'test-br'
+ICMP_BLOCK_RULE = '-p icmp -j DROP'
class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
br = self.create_resource(br_prefix, self.ovs.add_bridge)
self.addCleanup(br.destroy)
return br
+
+
+class BaseIPVethTestCase(BaseLinuxTestCase):
+ SRC_ADDRESS = '192.168.0.1'
+ DST_ADDRESS = '192.168.0.2'
+ BROADCAST_ADDRESS = '192.168.0.255'
+ SRC_VETH = 'source'
+ DST_VETH = 'destination'
+
+ def setUp(self):
+ super(BaseIPVethTestCase, self).setUp()
+ self.check_sudo_enabled()
+ self.pinger = pinger.Pinger(self)
+
+ @staticmethod
+ def _set_ip_up(device, cidr, broadcast, ip_version=4):
+ device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
+ device.link.set_up()
+
+ def _create_namespace(self):
+ ip_cmd = ip_lib.IPWrapper(self.root_helper)
+ name = "func-%s" % uuidutils.generate_uuid()
+ namespace = ip_cmd.ensure_namespace(name)
+ self.addCleanup(namespace.netns.delete, namespace.namespace)
+
+ return namespace
+
+ def prepare_veth_pairs(self, src_addr=None,
+ dst_addr=None,
+ broadcast_addr=None,
+ src_ns=None, dst_ns=None,
+ src_veth=None,
+ dst_veth=None):
+
+ src_addr = src_addr or self.SRC_ADDRESS
+ dst_addr = dst_addr or self.DST_ADDRESS
+ broadcast_addr = broadcast_addr or self.BROADCAST_ADDRESS
+ src_veth = src_veth or self.SRC_VETH
+ dst_veth = dst_veth or self.DST_VETH
+ src_ns = src_ns or self._create_namespace()
+ dst_ns = dst_ns or self._create_namespace()
+
+ src_veth, dst_veth = src_ns.add_veth(src_veth,
+ dst_veth,
+ dst_ns.namespace)
+
+ self._set_ip_up(src_veth, '%s/24' % src_addr, broadcast_addr)
+ self._set_ip_up(dst_veth, '%s/24' % dst_addr, broadcast_addr)
+
+ return src_ns, dst_ns
--- /dev/null
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class Pinger(object):
+ def __init__(self, testcase, timeout=1, max_attempts=1):
+ self.testcase = testcase
+ self._timeout = timeout
+ self._max_attempts = max_attempts
+
+ def _ping_destination(self, src_namespace, dest_address):
+ src_namespace.netns.execute(['ping', '-c', self._max_attempts,
+ '-W', self._timeout, dest_address])
+
+ def assert_ping_from_ns(self, src_ns, dst_ip):
+ try:
+ self._ping_destination(src_ns, dst_ip)
+ except RuntimeError:
+ self.testcase.fail("destination ip %(dst_ip)s is not replying "
+ "to ping from namespace %(src_ns)s" %
+ {'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
+
+ def assert_no_ping_from_ns(self, src_ns, dst_ip):
+ try:
+ self._ping_destination(src_ns, dst_ip)
+ self.testcase.fail("destination ip %(dst_ip)s is replying to ping"
+ "from namespace %(src_ns)s, but it shouldn't" %
+ {'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
+ except RuntimeError:
+ pass
--- /dev/null
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import ipset_manager
+from neutron.agent.linux import iptables_manager
+from neutron.tests.functional.agent.linux import base
+
+IPSET_CHAIN = 'test-chain'
+IPSET_ETHERTYPE = 'IPv4'
+ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_CHAIN
+UNRELATED_IP = '1.1.1.1'
+
+
+class IpsetBase(base.BaseIPVethTestCase):
+
+ def setUp(self):
+ super(IpsetBase, self).setUp()
+
+ self.src_ns, self.dst_ns = self.prepare_veth_pairs()
+ self.ipset = self._create_ipset_manager_and_chain(self.dst_ns,
+ IPSET_CHAIN)
+
+ self.dst_iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ namespace=self.dst_ns.namespace)
+
+ self._add_iptables_ipset_rules(self.dst_iptables)
+
+ def _create_ipset_manager_and_chain(self, dst_ns, chain_name):
+ ipset = ipset_manager.IpsetManager(
+ root_helper=self.root_helper,
+ namespace=dst_ns.namespace)
+
+ ipset.create_ipset_chain(chain_name, IPSET_ETHERTYPE)
+ return ipset
+
+ @staticmethod
+ def _remove_iptables_ipset_rules(iptables_manager):
+ iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
+ iptables_manager.apply()
+
+ @staticmethod
+ def _add_iptables_ipset_rules(iptables_manager):
+ iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
+ iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
+ iptables_manager.apply()
+
+
+class IpsetManagerTestCase(IpsetBase):
+
+ def test_add_member_allows_ping(self):
+ self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+ self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ def test_del_member_denies_ping(self):
+ self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ self.ipset.del_ipset_chain_member(IPSET_CHAIN, self.SRC_ADDRESS)
+ self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ def test_refresh_ipset_allows_ping(self):
+ self.ipset.refresh_ipset_chain_by_name(IPSET_CHAIN, [UNRELATED_IP],
+ IPSET_ETHERTYPE)
+ self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ self.ipset.refresh_ipset_chain_by_name(
+ IPSET_CHAIN, [UNRELATED_IP, self.SRC_ADDRESS], IPSET_ETHERTYPE)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ self.ipset.refresh_ipset_chain_by_name(
+ IPSET_CHAIN, [self.SRC_ADDRESS, UNRELATED_IP], IPSET_ETHERTYPE)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+
+ def test_destroy_ipset_chain(self):
+ self.assertRaises(RuntimeError,
+ self.ipset.destroy_ipset_chain_by_name, IPSET_CHAIN)
+ self._remove_iptables_ipset_rules(self.dst_iptables)
+ self.ipset.destroy_ipset_chain_by_name(IPSET_CHAIN)
# License for the specific language governing permissions and limitations
# under the License.
-from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
-from neutron.openstack.common import uuidutils
from neutron.tests.functional.agent.linux import base
-ICMP_BLOCK_RULE = '-p icmp -j DROP'
-SRC_VETH_NAME = 'source'
-DEST_VETH_NAME = 'destination'
+class IptablesManagerTestCase(base.BaseIPVethTestCase):
-class IpBase(base.BaseLinuxTestCase):
- SRC_ADDRESS = '192.168.0.1'
- DST_ADDRESS = '192.168.0.2'
-
- @staticmethod
- def _set_ip_up(device, cidr, broadcast='192.168.0.255', ip_version=4):
- device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
- device.link.set_up()
-
- @staticmethod
- def _ping_destination(src_namespace, dest_address, attempts=3):
- src_namespace.netns.execute(['ping', '-c', attempts, dest_address])
-
- def _create_namespace(self):
- ip_cmd = ip_lib.IPWrapper(self.root_helper)
- name = "func-%s" % uuidutils.generate_uuid()
- namespace = ip_cmd.ensure_namespace(name)
- self.addCleanup(namespace.netns.delete, namespace.namespace)
-
- return namespace
-
- def _prepare_veth_pairs(self):
- src_ns = self._create_namespace()
- dst_ns = self._create_namespace()
- src_veth, dst_veth = src_ns.add_veth(SRC_VETH_NAME,
- DEST_VETH_NAME,
- dst_ns.namespace)
- self._set_ip_up(src_veth, '%s/24' % self.SRC_ADDRESS)
- self._set_ip_up(dst_veth, '%s/24' % self.DST_ADDRESS)
-
- return src_ns, dst_ns
-
-
-class IptablesManagerTestCase(IpBase):
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
- self.check_sudo_enabled()
- self.src_ns, self.dst_ns = self._prepare_veth_pairs()
+ self.src_ns, self.dst_ns = self.prepare_veth_pairs()
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
namespace=self.dst_ns.namespace)
def test_icmp(self):
- self._ping_destination(self.src_ns, self.DST_ADDRESS)
- self.iptables.ipv4['filter'].add_rule('INPUT', ICMP_BLOCK_RULE)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+ self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
self.iptables.apply()
- self.assertRaises(RuntimeError, self._ping_destination, self.src_ns,
- self.DST_ADDRESS)
- self.iptables.ipv4['filter'].remove_rule('INPUT', ICMP_BLOCK_RULE)
+ self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
+ self.iptables.ipv4['filter'].remove_rule('INPUT',
+ base.ICMP_BLOCK_RULE)
self.iptables.apply()
- self._ping_destination(self.src_ns, self.DST_ADDRESS)
+ self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)