]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix base test class for functional api testing
authorMaru Newby <marun@redhat.com>
Sat, 8 Nov 2014 17:08:33 +0000 (17:08 +0000)
committerMaru Newby <marun@redhat.com>
Tue, 2 Dec 2014 15:45:41 +0000 (15:45 +0000)
To support in-tree functional api testing, the test class common
across all types of testing can't import modules that define
configuration that Tempest also defines because duplicate
configuration definitions result in runtime errors.  A previous change
(I44251db399cd73390a9d1931a7f253662002ba10) moved conflicting imports
to a separate module, but subsequent additions have added new conflicting
imports.

This change creates a new test class that is safe to use for all types
of testing (neutron.tests.sub_base.SubBaseTestCase), ensures that the
existing existing base test class (neutron.tests.base.BaseTestCase)
extends it, and documents their respective responsibilities.  This
will hopefully make it less likely that the functional api job will be
broken by changes to the base test class.

Implements: bp retargetable-functional-testing

Change-Id: Ifa270536481fcb19c476c9c62d89e6c5cae36ca1

neutron/tests/base.py
neutron/tests/sub_base.py [new file with mode: 0644]

index 9e8ef18cb786fd483722f8f1685f041725143579..d48983bf50316a93a40b96ef6aa25c185ed36c34 100644 (file)
 # License for the specific language governing permissions and limitations
 # under the License.
 
-"""Base Test Case for all Unit Tests"""
+"""Base test case for tests that do not rely on Tempest.
+
+To change behavoir that is common to all tests, please target
+the neutron.tests.sub_base module instead.
+
+If a test needs to import a dependency like Tempest, see
+neutron.tests.sub_base for a base test class that can be used without
+errors due to duplicate configuration definitions.
+"""
 
-import contextlib
 import logging as std_logging
-import os
 import os.path
 import sys
-import traceback
 
-import eventlet.timeout
 import fixtures
 import mock
 from oslo.config import cfg
 from oslo.messaging import conffixture as messaging_conffixture
-from oslo.utils import strutils
-import testtools
 
 from neutron.common import config
 from neutron.common import rpc as n_rpc
 from neutron.tests import fake_notifier
-from neutron.tests import post_mortem_debug
+from neutron.tests import sub_base
 
 
 CONF = cfg.CONF
 CONF.import_opt('state_path', 'neutron.common.config')
-LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+LOG_FORMAT = sub_base.LOG_FORMAT
 
 ROOTDIR = os.path.dirname(__file__)
 ETCDIR = os.path.join(ROOTDIR, 'etc')
@@ -56,12 +58,10 @@ def fake_consume_in_threads(self):
     return []
 
 
-def bool_from_env(key, strict=False, default=False):
-    value = os.environ.get(key)
-    return strutils.bool_from_string(value, strict=strict, default=default)
+bool_from_env = sub_base.bool_from_env
 
 
-class BaseTestCase(testtools.TestCase):
+class BaseTestCase(sub_base.SubBaseTestCase):
 
     @staticmethod
     def config_parse(conf=None, args=None):
@@ -77,27 +77,8 @@ class BaseTestCase(testtools.TestCase):
     def setUp(self):
         super(BaseTestCase, self).setUp()
 
-        # Configure this first to ensure pm debugging support for setUp()
-        debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
-        if debugger:
-            self.addOnException(post_mortem_debug.get_exception_handler(
-                debugger))
-
-        if bool_from_env('OS_DEBUG'):
-            _level = std_logging.DEBUG
-        else:
-            _level = std_logging.INFO
-        capture_logs = bool_from_env('OS_LOG_CAPTURE')
-        if not capture_logs:
-            std_logging.basicConfig(format=LOG_FORMAT, level=_level)
-        self.log_fixture = self.useFixture(
-            fixtures.FakeLogger(
-                format=LOG_FORMAT,
-                level=_level,
-                nuke_handlers=capture_logs,
-            ))
-
         # suppress all but errors here
+        capture_logs = bool_from_env('OS_LOG_CAPTURE')
         self.useFixture(
             fixtures.FakeLogger(
                 name='neutron.api.extensions',
@@ -106,28 +87,11 @@ class BaseTestCase(testtools.TestCase):
                 nuke_handlers=capture_logs,
             ))
 
-        test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
-        if test_timeout == -1:
-            test_timeout = 0
-        if test_timeout > 0:
-            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
-
-        # If someone does use tempfile directly, ensure that it's cleaned up
-        self.useFixture(fixtures.NestedTempfile())
-        self.useFixture(fixtures.TempHomeDir())
-
         self.temp_dir = self.useFixture(fixtures.TempDir()).path
         cfg.CONF.set_override('state_path', self.temp_dir)
 
-        self.addCleanup(mock.patch.stopall)
         self.addCleanup(CONF.reset)
 
-        if bool_from_env('OS_STDOUT_CAPTURE'):
-            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
-            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
-        if bool_from_env('OS_STDERR_CAPTURE'):
-            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
-            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
         self.useFixture(fixtures.MonkeyPatch(
             'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
             fake_use_fatal_exceptions))
@@ -138,7 +102,6 @@ class BaseTestCase(testtools.TestCase):
             raise self.skipException('XML Testing Skipped in Py26')
 
         self.setup_config()
-        self.addOnException(self.check_for_systemexit)
 
     def setup_rpc_mocks(self):
         # don't actually start RPC listeners when testing
@@ -165,11 +128,6 @@ class BaseTestCase(testtools.TestCase):
         self.addCleanup(n_rpc.cleanup)
         n_rpc.init(CONF)
 
-    def check_for_systemexit(self, exc_info):
-        if isinstance(exc_info[1], SystemExit):
-            self.fail("A SystemExit was raised during the test. %s"
-                      % traceback.format_exception(*exc_info))
-
     def setup_config(self):
         """Tests that need a non-default config can override this method."""
         self.config_parse()
@@ -189,43 +147,3 @@ class BaseTestCase(testtools.TestCase):
         group = kw.pop('group', None)
         for k, v in kw.iteritems():
             CONF.set_override(k, v, group)
-
-    @contextlib.contextmanager
-    def assert_max_execution_time(self, max_execution_time=5):
-        with eventlet.timeout.Timeout(max_execution_time, False):
-            yield
-            return
-        self.fail('Execution of this test timed out')
-
-    def assertOrderedEqual(self, expected, actual):
-        expect_val = self.sort_dict_lists(expected)
-        actual_val = self.sort_dict_lists(actual)
-        self.assertEqual(expect_val, actual_val)
-
-    def sort_dict_lists(self, dic):
-        for key, value in dic.iteritems():
-            if isinstance(value, list):
-                dic[key] = sorted(value)
-            elif isinstance(value, dict):
-                dic[key] = self.sort_dict_lists(value)
-        return dic
-
-    def assertDictSupersetOf(self, expected_subset, actual_superset):
-        """Checks that actual dict contains the expected dict.
-
-        After checking that the arguments are of the right type, this checks
-        that each item in expected_subset is in, and matches, what is in
-        actual_superset. Separate tests are done, so that detailed info can
-        be reported upon failure.
-        """
-        if not isinstance(expected_subset, dict):
-            self.fail("expected_subset (%s) is not an instance of dict" %
-                      type(expected_subset))
-        if not isinstance(actual_superset, dict):
-            self.fail("actual_superset (%s) is not an instance of dict" %
-                      type(actual_superset))
-        for k, v in expected_subset.items():
-            self.assertIn(k, actual_superset)
-            self.assertEqual(v, actual_superset[k],
-                             "Key %(key)s expected: %(exp)r, actual %(act)r" %
-                             {'key': k, 'exp': v, 'act': actual_superset[k]})
diff --git a/neutron/tests/sub_base.py b/neutron/tests/sub_base.py
new file mode 100644 (file)
index 0000000..38a896a
--- /dev/null
@@ -0,0 +1,138 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Base test case for all tests.
+
+To change behavoir only for tests that do not rely on Tempest, please
+target the neutron.tests.base module instead.
+
+There should be no non-test Neutron imports in this module to ensure
+that the functional API tests can import Tempest without triggering
+errors due to duplicate configuration definitions.
+"""
+
+import contextlib
+import logging as std_logging
+import os
+import os.path
+import traceback
+
+import eventlet.timeout
+import fixtures
+import mock
+from oslo.utils import strutils
+import testtools
+
+from neutron.tests import post_mortem_debug
+
+
+LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+
+
+def bool_from_env(key, strict=False, default=False):
+    value = os.environ.get(key)
+    return strutils.bool_from_string(value, strict=strict, default=default)
+
+
+class SubBaseTestCase(testtools.TestCase):
+
+    def setUp(self):
+        super(SubBaseTestCase, self).setUp()
+
+        # Configure this first to ensure pm debugging support for setUp()
+        debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
+        if debugger:
+            self.addOnException(post_mortem_debug.get_exception_handler(
+                debugger))
+
+        if bool_from_env('OS_DEBUG'):
+            _level = std_logging.DEBUG
+        else:
+            _level = std_logging.INFO
+        capture_logs = bool_from_env('OS_LOG_CAPTURE')
+        if not capture_logs:
+            std_logging.basicConfig(format=LOG_FORMAT, level=_level)
+        self.log_fixture = self.useFixture(
+            fixtures.FakeLogger(
+                format=LOG_FORMAT,
+                level=_level,
+                nuke_handlers=capture_logs,
+            ))
+
+        test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
+        if test_timeout == -1:
+            test_timeout = 0
+        if test_timeout > 0:
+            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+        # If someone does use tempfile directly, ensure that it's cleaned up
+        self.useFixture(fixtures.NestedTempfile())
+        self.useFixture(fixtures.TempHomeDir())
+
+        self.addCleanup(mock.patch.stopall)
+
+        if bool_from_env('OS_STDOUT_CAPTURE'):
+            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+        if bool_from_env('OS_STDERR_CAPTURE'):
+            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+
+        self.addOnException(self.check_for_systemexit)
+
+    def check_for_systemexit(self, exc_info):
+        if isinstance(exc_info[1], SystemExit):
+            self.fail("A SystemExit was raised during the test. %s"
+                      % traceback.format_exception(*exc_info))
+
+    @contextlib.contextmanager
+    def assert_max_execution_time(self, max_execution_time=5):
+        with eventlet.timeout.Timeout(max_execution_time, False):
+            yield
+            return
+        self.fail('Execution of this test timed out')
+
+    def assertOrderedEqual(self, expected, actual):
+        expect_val = self.sort_dict_lists(expected)
+        actual_val = self.sort_dict_lists(actual)
+        self.assertEqual(expect_val, actual_val)
+
+    def sort_dict_lists(self, dic):
+        for key, value in dic.iteritems():
+            if isinstance(value, list):
+                dic[key] = sorted(value)
+            elif isinstance(value, dict):
+                dic[key] = self.sort_dict_lists(value)
+        return dic
+
+    def assertDictSupersetOf(self, expected_subset, actual_superset):
+        """Checks that actual dict contains the expected dict.
+
+        After checking that the arguments are of the right type, this checks
+        that each item in expected_subset is in, and matches, what is in
+        actual_superset. Separate tests are done, so that detailed info can
+        be reported upon failure.
+        """
+        if not isinstance(expected_subset, dict):
+            self.fail("expected_subset (%s) is not an instance of dict" %
+                      type(expected_subset))
+        if not isinstance(actual_superset, dict):
+            self.fail("actual_superset (%s) is not an instance of dict" %
+                      type(actual_superset))
+        for k, v in expected_subset.items():
+            self.assertIn(k, actual_superset)
+            self.assertEqual(v, actual_superset[k],
+                             "Key %(key)s expected: %(exp)r, actual %(act)r" %
+                             {'key': k, 'exp': v, 'act': actual_superset[k]})