--- /dev/null
+#! /usr/bin/env python
+
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Copyright (C) 2014 YAMAMOTO Takashi <yamamoto at valinux co jp>
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import print_function
+import sys
+
+import eventlet
+
+
+def print_binary_name():
+ # NOTE(yamamoto): Don't move this import to module-level.
+ # The aim is to test importing from eventlet non-main thread.
+ # See Bug #1367075 for details.
+ from neutron.agent.linux import iptables_manager
+
+ print(iptables_manager.binary_name)
+
+if __name__ == "__main__":
+ if 'spawn' in sys.argv:
+ eventlet.spawn(print_binary_name).wait()
+ else:
+ print_binary_name()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import os.path
+
import testtools
from neutron.agent.linux import iptables_manager
-from neutron.tests.functional.agent.linux import base
+from neutron.agent.linux import utils
+from neutron.tests import base
+from neutron.tests.functional.agent.linux import base as linux_base
+from neutron.tests.functional.agent.linux.bin import ipt_binname
from neutron.tests.functional.agent.linux import helpers
-class IptablesManagerTestCase(base.BaseIPVethTestCase):
+class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
'egress': 'OUTPUT'}
PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
def test_icmp(self):
pinger = helpers.Pinger(self.client_ns)
pinger.assert_ping(self.DST_ADDRESS)
- self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
+ self.server_fw.ipv4['filter'].add_rule('INPUT',
+ linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_no_ping(self.DST_ADDRESS)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
- base.ICMP_BLOCK_RULE)
+ linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_ping(self.DST_ADDRESS)
def test_mangle_icmp(self):
pinger = helpers.Pinger(self.client_ns)
pinger.assert_ping(self.DST_ADDRESS)
- self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
- self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
+ self.server_fw.ipv4['mangle'].add_rule('INPUT',
+ linux_base.ICMP_MARK_RULE)
+ self.server_fw.ipv4['filter'].add_rule('INPUT',
+ linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_no_ping(self.DST_ADDRESS)
self.server_fw.ipv4['mangle'].remove_rule('INPUT',
- base.ICMP_MARK_RULE)
+ linux_base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
- base.MARKED_BLOCK_RULE)
+ linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_ping(self.DST_ADDRESS)
def test_udp_output(self):
self._test_with_nc(self.client_fw, 'egress', port=None, udp=True)
+
+
+class IptablesManagerNonRootTestCase(base.BaseTestCase):
+ @staticmethod
+ def _normalize_module_name(name):
+ for suf in ['.pyc', '.pyo']:
+ if name.endswith(suf):
+ return name[:-len(suf)] + '.py'
+ return name
+
+ def _test_binary_name(self, module, *extra_options):
+ executable = self._normalize_module_name(module.__file__)
+ expected = os.path.basename(executable)[:16]
+ observed = utils.execute([executable] + list(extra_options)).rstrip()
+ self.assertEqual(expected, observed)
+
+ def test_binary_name(self):
+ self._test_binary_name(ipt_binname)
+
+ def test_binary_name_eventlet_spawn(self):
+ self._test_binary_name(ipt_binname, 'spawn')