-# Copyright (c) 2014 Red Hat, Inc.
+# Copyright (c) 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
-IPSET_SET = 'test-set'
+MAX_IPSET_NAME_LENGTH = 28
IPSET_ETHERTYPE = 'IPv4'
-ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET
UNRELATED_IP = '1.1.1.1'
super(IpsetBase, self).setUp()
self.src_ns, self.dst_ns = self.prepare_veth_pairs()
+ self.ipset_name = base.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
+ self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT'
+ % self.ipset_name)
self.ipset = self._create_ipset_manager_and_set(self.dst_ns,
- IPSET_SET)
-
+ self.ipset_name)
+ self.addCleanup(self.ipset._destroy, self.ipset_name)
self.dst_iptables = iptables_manager.IptablesManager(
namespace=self.dst_ns.namespace)
- self._add_iptables_ipset_rules(self.dst_iptables)
+ self._add_iptables_ipset_rules()
+ self.addCleanup(self._remove_iptables_ipset_rules)
self.pinger = helpers.Pinger(self.src_ns)
def _create_ipset_manager_and_set(self, dst_ns, set_name):
ipset._create_set(set_name, IPSET_ETHERTYPE)
return ipset
- @staticmethod
- def _remove_iptables_ipset_rules(iptables_manager):
- iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
- iptables_manager.apply()
+ def _remove_iptables_ipset_rules(self):
+ self.dst_iptables.ipv4['filter'].remove_rule(
+ 'INPUT', base.ICMP_BLOCK_RULE)
+ self.dst_iptables.ipv4['filter'].remove_rule(
+ 'INPUT', self.icmp_accept_rule)
+ self.dst_iptables.apply()
- @staticmethod
- def _add_iptables_ipset_rules(iptables_manager):
- iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
- iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
- iptables_manager.apply()
+ def _add_iptables_ipset_rules(self):
+ self.dst_iptables.ipv4['filter'].add_rule(
+ 'INPUT', self.icmp_accept_rule)
+ self.dst_iptables.ipv4['filter'].add_rule(
+ 'INPUT', base.ICMP_BLOCK_RULE)
+ self.dst_iptables.apply()
class IpsetManagerTestCase(IpsetBase):
def test_add_member_allows_ping(self):
self.pinger.assert_no_ping(self.DST_ADDRESS)
- self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
+ self.ipset._add_member_to_set(self.ipset_name, self.SRC_ADDRESS)
self.pinger.assert_ping(self.DST_ADDRESS)
def test_del_member_denies_ping(self):
- self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
+ self.ipset._add_member_to_set(self.ipset_name, self.SRC_ADDRESS)
self.pinger.assert_ping(self.DST_ADDRESS)
- self.ipset._del_member_from_set(IPSET_SET, self.SRC_ADDRESS)
+ self.ipset._del_member_from_set(self.ipset_name, self.SRC_ADDRESS)
self.pinger.assert_no_ping(self.DST_ADDRESS)
def test_refresh_ipset_allows_ping(self):
- self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
+ self.ipset._refresh_set(
+ self.ipset_name, [UNRELATED_IP], IPSET_ETHERTYPE)
self.pinger.assert_no_ping(self.DST_ADDRESS)
- self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.SRC_ADDRESS],
- IPSET_ETHERTYPE)
+ self.ipset._refresh_set(
+ self.ipset_name, [UNRELATED_IP, self.SRC_ADDRESS], IPSET_ETHERTYPE)
self.pinger.assert_ping(self.DST_ADDRESS)
- self.ipset._refresh_set(IPSET_SET, [self.SRC_ADDRESS, UNRELATED_IP],
- IPSET_ETHERTYPE)
+ self.ipset._refresh_set(
+ self.ipset_name, [self.SRC_ADDRESS, UNRELATED_IP], IPSET_ETHERTYPE)
self.pinger.assert_ping(self.DST_ADDRESS)
def test_destroy_ipset_set(self):
- self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)
- self._remove_iptables_ipset_rules(self.dst_iptables)
- self.ipset._destroy(IPSET_SET)
+ self.assertRaises(RuntimeError, self.ipset._destroy, self.ipset_name)
+ self._remove_iptables_ipset_rules()
+ self.ipset._destroy(self.ipset_name)