]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add a functional test for iptables_manager.binary_name
authorYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Thu, 11 Sep 2014 04:28:55 +0000 (13:28 +0900)
committerYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Mon, 2 Mar 2015 07:19:50 +0000 (16:19 +0900)
Related-Bug: #1367075
Change-Id: Iaa940b3a04302c6eb3674a2840d54df8b62c1926

neutron/tests/functional/agent/linux/bin/__init__.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/bin/ipt_binname.py [new file with mode: 0755]
neutron/tests/functional/agent/linux/test_iptables.py

diff --git a/neutron/tests/functional/agent/linux/bin/__init__.py b/neutron/tests/functional/agent/linux/bin/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/functional/agent/linux/bin/ipt_binname.py b/neutron/tests/functional/agent/linux/bin/ipt_binname.py
new file mode 100755 (executable)
index 0000000..79bd9be
--- /dev/null
@@ -0,0 +1,37 @@
+#! /usr/bin/env python
+
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Copyright (C) 2014 YAMAMOTO Takashi <yamamoto at valinux co jp>
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from __future__ import print_function
+import sys
+
+import eventlet
+
+
+def print_binary_name():
+    # NOTE(yamamoto): Don't move this import to module-level.
+    # The aim is to test importing from eventlet non-main thread.
+    # See Bug #1367075 for details.
+    from neutron.agent.linux import iptables_manager
+
+    print(iptables_manager.binary_name)
+
+if __name__ == "__main__":
+    if 'spawn' in sys.argv:
+        eventlet.spawn(print_binary_name).wait()
+    else:
+        print_binary_name()
index eb9c828a5f5dcace3750afa53faccab8fa77c963..e38058971272d9f110d65c7077b52c0ca4924a3c 100644 (file)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import os.path
+
 import testtools
 
 from neutron.agent.linux import iptables_manager
-from neutron.tests.functional.agent.linux import base
+from neutron.agent.linux import utils
+from neutron.tests import base
+from neutron.tests.functional.agent.linux import base as linux_base
+from neutron.tests.functional.agent.linux.bin import ipt_binname
 from neutron.tests.functional.agent.linux import helpers
 
 
-class IptablesManagerTestCase(base.BaseIPVethTestCase):
+class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
     DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
                               'egress': 'OUTPUT'}
     PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
@@ -85,25 +90,28 @@ class IptablesManagerTestCase(base.BaseIPVethTestCase):
     def test_icmp(self):
         pinger = helpers.Pinger(self.client_ns)
         pinger.assert_ping(self.DST_ADDRESS)
-        self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
+        self.server_fw.ipv4['filter'].add_rule('INPUT',
+                                               linux_base.ICMP_BLOCK_RULE)
         self.server_fw.apply()
         pinger.assert_no_ping(self.DST_ADDRESS)
         self.server_fw.ipv4['filter'].remove_rule('INPUT',
-                                                 base.ICMP_BLOCK_RULE)
+                                                  linux_base.ICMP_BLOCK_RULE)
         self.server_fw.apply()
         pinger.assert_ping(self.DST_ADDRESS)
 
     def test_mangle_icmp(self):
         pinger = helpers.Pinger(self.client_ns)
         pinger.assert_ping(self.DST_ADDRESS)
-        self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
-        self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
+        self.server_fw.ipv4['mangle'].add_rule('INPUT',
+                                               linux_base.ICMP_MARK_RULE)
+        self.server_fw.ipv4['filter'].add_rule('INPUT',
+                                               linux_base.MARKED_BLOCK_RULE)
         self.server_fw.apply()
         pinger.assert_no_ping(self.DST_ADDRESS)
         self.server_fw.ipv4['mangle'].remove_rule('INPUT',
-                                                  base.ICMP_MARK_RULE)
+                                                  linux_base.ICMP_MARK_RULE)
         self.server_fw.ipv4['filter'].remove_rule('INPUT',
-                                                  base.MARKED_BLOCK_RULE)
+                                                  linux_base.MARKED_BLOCK_RULE)
         self.server_fw.apply()
         pinger.assert_ping(self.DST_ADDRESS)
 
@@ -130,3 +138,24 @@ class IptablesManagerTestCase(base.BaseIPVethTestCase):
 
     def test_udp_output(self):
         self._test_with_nc(self.client_fw, 'egress', port=None, udp=True)
+
+
+class IptablesManagerNonRootTestCase(base.BaseTestCase):
+    @staticmethod
+    def _normalize_module_name(name):
+        for suf in ['.pyc', '.pyo']:
+            if name.endswith(suf):
+                return name[:-len(suf)] + '.py'
+        return name
+
+    def _test_binary_name(self, module, *extra_options):
+        executable = self._normalize_module_name(module.__file__)
+        expected = os.path.basename(executable)[:16]
+        observed = utils.execute([executable] + list(extra_options)).rstrip()
+        self.assertEqual(expected, observed)
+
+    def test_binary_name(self):
+        self._test_binary_name(ipt_binname)
+
+    def test_binary_name_eventlet_spawn(self):
+        self._test_binary_name(ipt_binname, 'spawn')