From: Maru Newby Date: Sat, 8 Nov 2014 17:08:33 +0000 (+0000) Subject: Fix base test class for functional api testing X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=744d517fce352eb6e5e43754833f1a4f41e11617;p=openstack-build%2Fneutron-build.git Fix base test class for functional api testing To support in-tree functional api testing, the test class common across all types of testing can't import modules that define configuration that Tempest also defines because duplicate configuration definitions result in runtime errors. A previous change (I44251db399cd73390a9d1931a7f253662002ba10) moved conflicting imports to a separate module, but subsequent additions have added new conflicting imports. This change creates a new test class that is safe to use for all types of testing (neutron.tests.sub_base.SubBaseTestCase), ensures that the existing existing base test class (neutron.tests.base.BaseTestCase) extends it, and documents their respective responsibilities. This will hopefully make it less likely that the functional api job will be broken by changes to the base test class. Implements: bp retargetable-functional-testing Change-Id: Ifa270536481fcb19c476c9c62d89e6c5cae36ca1 --- diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 9e8ef18cb..d48983bf5 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -13,32 +13,34 @@ # License for the specific language governing permissions and limitations # under the License. -"""Base Test Case for all Unit Tests""" +"""Base test case for tests that do not rely on Tempest. + +To change behavoir that is common to all tests, please target +the neutron.tests.sub_base module instead. + +If a test needs to import a dependency like Tempest, see +neutron.tests.sub_base for a base test class that can be used without +errors due to duplicate configuration definitions. +""" -import contextlib import logging as std_logging -import os import os.path import sys -import traceback -import eventlet.timeout import fixtures import mock from oslo.config import cfg from oslo.messaging import conffixture as messaging_conffixture -from oslo.utils import strutils -import testtools from neutron.common import config from neutron.common import rpc as n_rpc from neutron.tests import fake_notifier -from neutron.tests import post_mortem_debug +from neutron.tests import sub_base CONF = cfg.CONF CONF.import_opt('state_path', 'neutron.common.config') -LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" +LOG_FORMAT = sub_base.LOG_FORMAT ROOTDIR = os.path.dirname(__file__) ETCDIR = os.path.join(ROOTDIR, 'etc') @@ -56,12 +58,10 @@ def fake_consume_in_threads(self): return [] -def bool_from_env(key, strict=False, default=False): - value = os.environ.get(key) - return strutils.bool_from_string(value, strict=strict, default=default) +bool_from_env = sub_base.bool_from_env -class BaseTestCase(testtools.TestCase): +class BaseTestCase(sub_base.SubBaseTestCase): @staticmethod def config_parse(conf=None, args=None): @@ -77,27 +77,8 @@ class BaseTestCase(testtools.TestCase): def setUp(self): super(BaseTestCase, self).setUp() - # Configure this first to ensure pm debugging support for setUp() - debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER') - if debugger: - self.addOnException(post_mortem_debug.get_exception_handler( - debugger)) - - if bool_from_env('OS_DEBUG'): - _level = std_logging.DEBUG - else: - _level = std_logging.INFO - capture_logs = bool_from_env('OS_LOG_CAPTURE') - if not capture_logs: - std_logging.basicConfig(format=LOG_FORMAT, level=_level) - self.log_fixture = self.useFixture( - fixtures.FakeLogger( - format=LOG_FORMAT, - level=_level, - nuke_handlers=capture_logs, - )) - # suppress all but errors here + capture_logs = bool_from_env('OS_LOG_CAPTURE') self.useFixture( fixtures.FakeLogger( name='neutron.api.extensions', @@ -106,28 +87,11 @@ class BaseTestCase(testtools.TestCase): nuke_handlers=capture_logs, )) - test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0)) - if test_timeout == -1: - test_timeout = 0 - if test_timeout > 0: - self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) - - # If someone does use tempfile directly, ensure that it's cleaned up - self.useFixture(fixtures.NestedTempfile()) - self.useFixture(fixtures.TempHomeDir()) - self.temp_dir = self.useFixture(fixtures.TempDir()).path cfg.CONF.set_override('state_path', self.temp_dir) - self.addCleanup(mock.patch.stopall) self.addCleanup(CONF.reset) - if bool_from_env('OS_STDOUT_CAPTURE'): - stdout = self.useFixture(fixtures.StringStream('stdout')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) - if bool_from_env('OS_STDERR_CAPTURE'): - stderr = self.useFixture(fixtures.StringStream('stderr')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) self.useFixture(fixtures.MonkeyPatch( 'neutron.common.exceptions.NeutronException.use_fatal_exceptions', fake_use_fatal_exceptions)) @@ -138,7 +102,6 @@ class BaseTestCase(testtools.TestCase): raise self.skipException('XML Testing Skipped in Py26') self.setup_config() - self.addOnException(self.check_for_systemexit) def setup_rpc_mocks(self): # don't actually start RPC listeners when testing @@ -165,11 +128,6 @@ class BaseTestCase(testtools.TestCase): self.addCleanup(n_rpc.cleanup) n_rpc.init(CONF) - def check_for_systemexit(self, exc_info): - if isinstance(exc_info[1], SystemExit): - self.fail("A SystemExit was raised during the test. %s" - % traceback.format_exception(*exc_info)) - def setup_config(self): """Tests that need a non-default config can override this method.""" self.config_parse() @@ -189,43 +147,3 @@ class BaseTestCase(testtools.TestCase): group = kw.pop('group', None) for k, v in kw.iteritems(): CONF.set_override(k, v, group) - - @contextlib.contextmanager - def assert_max_execution_time(self, max_execution_time=5): - with eventlet.timeout.Timeout(max_execution_time, False): - yield - return - self.fail('Execution of this test timed out') - - def assertOrderedEqual(self, expected, actual): - expect_val = self.sort_dict_lists(expected) - actual_val = self.sort_dict_lists(actual) - self.assertEqual(expect_val, actual_val) - - def sort_dict_lists(self, dic): - for key, value in dic.iteritems(): - if isinstance(value, list): - dic[key] = sorted(value) - elif isinstance(value, dict): - dic[key] = self.sort_dict_lists(value) - return dic - - def assertDictSupersetOf(self, expected_subset, actual_superset): - """Checks that actual dict contains the expected dict. - - After checking that the arguments are of the right type, this checks - that each item in expected_subset is in, and matches, what is in - actual_superset. Separate tests are done, so that detailed info can - be reported upon failure. - """ - if not isinstance(expected_subset, dict): - self.fail("expected_subset (%s) is not an instance of dict" % - type(expected_subset)) - if not isinstance(actual_superset, dict): - self.fail("actual_superset (%s) is not an instance of dict" % - type(actual_superset)) - for k, v in expected_subset.items(): - self.assertIn(k, actual_superset) - self.assertEqual(v, actual_superset[k], - "Key %(key)s expected: %(exp)r, actual %(act)r" % - {'key': k, 'exp': v, 'act': actual_superset[k]}) diff --git a/neutron/tests/sub_base.py b/neutron/tests/sub_base.py new file mode 100644 index 000000000..38a896a2b --- /dev/null +++ b/neutron/tests/sub_base.py @@ -0,0 +1,138 @@ +# Copyright 2014 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Base test case for all tests. + +To change behavoir only for tests that do not rely on Tempest, please +target the neutron.tests.base module instead. + +There should be no non-test Neutron imports in this module to ensure +that the functional API tests can import Tempest without triggering +errors due to duplicate configuration definitions. +""" + +import contextlib +import logging as std_logging +import os +import os.path +import traceback + +import eventlet.timeout +import fixtures +import mock +from oslo.utils import strutils +import testtools + +from neutron.tests import post_mortem_debug + + +LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" + + +def bool_from_env(key, strict=False, default=False): + value = os.environ.get(key) + return strutils.bool_from_string(value, strict=strict, default=default) + + +class SubBaseTestCase(testtools.TestCase): + + def setUp(self): + super(SubBaseTestCase, self).setUp() + + # Configure this first to ensure pm debugging support for setUp() + debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER') + if debugger: + self.addOnException(post_mortem_debug.get_exception_handler( + debugger)) + + if bool_from_env('OS_DEBUG'): + _level = std_logging.DEBUG + else: + _level = std_logging.INFO + capture_logs = bool_from_env('OS_LOG_CAPTURE') + if not capture_logs: + std_logging.basicConfig(format=LOG_FORMAT, level=_level) + self.log_fixture = self.useFixture( + fixtures.FakeLogger( + format=LOG_FORMAT, + level=_level, + nuke_handlers=capture_logs, + )) + + test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0)) + if test_timeout == -1: + test_timeout = 0 + if test_timeout > 0: + self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) + + # If someone does use tempfile directly, ensure that it's cleaned up + self.useFixture(fixtures.NestedTempfile()) + self.useFixture(fixtures.TempHomeDir()) + + self.addCleanup(mock.patch.stopall) + + if bool_from_env('OS_STDOUT_CAPTURE'): + stdout = self.useFixture(fixtures.StringStream('stdout')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) + if bool_from_env('OS_STDERR_CAPTURE'): + stderr = self.useFixture(fixtures.StringStream('stderr')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) + + self.addOnException(self.check_for_systemexit) + + def check_for_systemexit(self, exc_info): + if isinstance(exc_info[1], SystemExit): + self.fail("A SystemExit was raised during the test. %s" + % traceback.format_exception(*exc_info)) + + @contextlib.contextmanager + def assert_max_execution_time(self, max_execution_time=5): + with eventlet.timeout.Timeout(max_execution_time, False): + yield + return + self.fail('Execution of this test timed out') + + def assertOrderedEqual(self, expected, actual): + expect_val = self.sort_dict_lists(expected) + actual_val = self.sort_dict_lists(actual) + self.assertEqual(expect_val, actual_val) + + def sort_dict_lists(self, dic): + for key, value in dic.iteritems(): + if isinstance(value, list): + dic[key] = sorted(value) + elif isinstance(value, dict): + dic[key] = self.sort_dict_lists(value) + return dic + + def assertDictSupersetOf(self, expected_subset, actual_superset): + """Checks that actual dict contains the expected dict. + + After checking that the arguments are of the right type, this checks + that each item in expected_subset is in, and matches, what is in + actual_superset. Separate tests are done, so that detailed info can + be reported upon failure. + """ + if not isinstance(expected_subset, dict): + self.fail("expected_subset (%s) is not an instance of dict" % + type(expected_subset)) + if not isinstance(actual_superset, dict): + self.fail("actual_superset (%s) is not an instance of dict" % + type(actual_superset)) + for k, v in expected_subset.items(): + self.assertIn(k, actual_superset) + self.assertEqual(v, actual_superset[k], + "Key %(key)s expected: %(exp)r, actual %(act)r" % + {'key': k, 'exp': v, 'act': actual_superset[k]})