# License for the specific language governing permissions and limitations
# under the License.
-"""Base Test Case for all Unit Tests"""
+"""Base test case for tests that do not rely on Tempest.
+
+To change behavoir that is common to all tests, please target
+the neutron.tests.sub_base module instead.
+
+If a test needs to import a dependency like Tempest, see
+neutron.tests.sub_base for a base test class that can be used without
+errors due to duplicate configuration definitions.
+"""
-import contextlib
import logging as std_logging
-import os
import os.path
import sys
-import traceback
-import eventlet.timeout
import fixtures
import mock
from oslo.config import cfg
from oslo.messaging import conffixture as messaging_conffixture
-from oslo.utils import strutils
-import testtools
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.tests import fake_notifier
-from neutron.tests import post_mortem_debug
+from neutron.tests import sub_base
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
-LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+LOG_FORMAT = sub_base.LOG_FORMAT
ROOTDIR = os.path.dirname(__file__)
ETCDIR = os.path.join(ROOTDIR, 'etc')
return []
-def bool_from_env(key, strict=False, default=False):
- value = os.environ.get(key)
- return strutils.bool_from_string(value, strict=strict, default=default)
+bool_from_env = sub_base.bool_from_env
-class BaseTestCase(testtools.TestCase):
+class BaseTestCase(sub_base.SubBaseTestCase):
@staticmethod
def config_parse(conf=None, args=None):
def setUp(self):
super(BaseTestCase, self).setUp()
- # Configure this first to ensure pm debugging support for setUp()
- debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
- if debugger:
- self.addOnException(post_mortem_debug.get_exception_handler(
- debugger))
-
- if bool_from_env('OS_DEBUG'):
- _level = std_logging.DEBUG
- else:
- _level = std_logging.INFO
- capture_logs = bool_from_env('OS_LOG_CAPTURE')
- if not capture_logs:
- std_logging.basicConfig(format=LOG_FORMAT, level=_level)
- self.log_fixture = self.useFixture(
- fixtures.FakeLogger(
- format=LOG_FORMAT,
- level=_level,
- nuke_handlers=capture_logs,
- ))
-
# suppress all but errors here
+ capture_logs = bool_from_env('OS_LOG_CAPTURE')
self.useFixture(
fixtures.FakeLogger(
name='neutron.api.extensions',
nuke_handlers=capture_logs,
))
- test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
- if test_timeout == -1:
- test_timeout = 0
- if test_timeout > 0:
- self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
-
- # If someone does use tempfile directly, ensure that it's cleaned up
- self.useFixture(fixtures.NestedTempfile())
- self.useFixture(fixtures.TempHomeDir())
-
self.temp_dir = self.useFixture(fixtures.TempDir()).path
cfg.CONF.set_override('state_path', self.temp_dir)
- self.addCleanup(mock.patch.stopall)
self.addCleanup(CONF.reset)
- if bool_from_env('OS_STDOUT_CAPTURE'):
- stdout = self.useFixture(fixtures.StringStream('stdout')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
- if bool_from_env('OS_STDERR_CAPTURE'):
- stderr = self.useFixture(fixtures.StringStream('stderr')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
raise self.skipException('XML Testing Skipped in Py26')
self.setup_config()
- self.addOnException(self.check_for_systemexit)
def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
- def check_for_systemexit(self, exc_info):
- if isinstance(exc_info[1], SystemExit):
- self.fail("A SystemExit was raised during the test. %s"
- % traceback.format_exception(*exc_info))
-
def setup_config(self):
"""Tests that need a non-default config can override this method."""
self.config_parse()
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
-
- @contextlib.contextmanager
- def assert_max_execution_time(self, max_execution_time=5):
- with eventlet.timeout.Timeout(max_execution_time, False):
- yield
- return
- self.fail('Execution of this test timed out')
-
- def assertOrderedEqual(self, expected, actual):
- expect_val = self.sort_dict_lists(expected)
- actual_val = self.sort_dict_lists(actual)
- self.assertEqual(expect_val, actual_val)
-
- def sort_dict_lists(self, dic):
- for key, value in dic.iteritems():
- if isinstance(value, list):
- dic[key] = sorted(value)
- elif isinstance(value, dict):
- dic[key] = self.sort_dict_lists(value)
- return dic
-
- def assertDictSupersetOf(self, expected_subset, actual_superset):
- """Checks that actual dict contains the expected dict.
-
- After checking that the arguments are of the right type, this checks
- that each item in expected_subset is in, and matches, what is in
- actual_superset. Separate tests are done, so that detailed info can
- be reported upon failure.
- """
- if not isinstance(expected_subset, dict):
- self.fail("expected_subset (%s) is not an instance of dict" %
- type(expected_subset))
- if not isinstance(actual_superset, dict):
- self.fail("actual_superset (%s) is not an instance of dict" %
- type(actual_superset))
- for k, v in expected_subset.items():
- self.assertIn(k, actual_superset)
- self.assertEqual(v, actual_superset[k],
- "Key %(key)s expected: %(exp)r, actual %(act)r" %
- {'key': k, 'exp': v, 'act': actual_superset[k]})
--- /dev/null
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Base test case for all tests.
+
+To change behavoir only for tests that do not rely on Tempest, please
+target the neutron.tests.base module instead.
+
+There should be no non-test Neutron imports in this module to ensure
+that the functional API tests can import Tempest without triggering
+errors due to duplicate configuration definitions.
+"""
+
+import contextlib
+import logging as std_logging
+import os
+import os.path
+import traceback
+
+import eventlet.timeout
+import fixtures
+import mock
+from oslo.utils import strutils
+import testtools
+
+from neutron.tests import post_mortem_debug
+
+
+LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+
+
+def bool_from_env(key, strict=False, default=False):
+ value = os.environ.get(key)
+ return strutils.bool_from_string(value, strict=strict, default=default)
+
+
+class SubBaseTestCase(testtools.TestCase):
+
+ def setUp(self):
+ super(SubBaseTestCase, self).setUp()
+
+ # Configure this first to ensure pm debugging support for setUp()
+ debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
+ if debugger:
+ self.addOnException(post_mortem_debug.get_exception_handler(
+ debugger))
+
+ if bool_from_env('OS_DEBUG'):
+ _level = std_logging.DEBUG
+ else:
+ _level = std_logging.INFO
+ capture_logs = bool_from_env('OS_LOG_CAPTURE')
+ if not capture_logs:
+ std_logging.basicConfig(format=LOG_FORMAT, level=_level)
+ self.log_fixture = self.useFixture(
+ fixtures.FakeLogger(
+ format=LOG_FORMAT,
+ level=_level,
+ nuke_handlers=capture_logs,
+ ))
+
+ test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
+ if test_timeout == -1:
+ test_timeout = 0
+ if test_timeout > 0:
+ self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+ # If someone does use tempfile directly, ensure that it's cleaned up
+ self.useFixture(fixtures.NestedTempfile())
+ self.useFixture(fixtures.TempHomeDir())
+
+ self.addCleanup(mock.patch.stopall)
+
+ if bool_from_env('OS_STDOUT_CAPTURE'):
+ stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+ if bool_from_env('OS_STDERR_CAPTURE'):
+ stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+
+ self.addOnException(self.check_for_systemexit)
+
+ def check_for_systemexit(self, exc_info):
+ if isinstance(exc_info[1], SystemExit):
+ self.fail("A SystemExit was raised during the test. %s"
+ % traceback.format_exception(*exc_info))
+
+ @contextlib.contextmanager
+ def assert_max_execution_time(self, max_execution_time=5):
+ with eventlet.timeout.Timeout(max_execution_time, False):
+ yield
+ return
+ self.fail('Execution of this test timed out')
+
+ def assertOrderedEqual(self, expected, actual):
+ expect_val = self.sort_dict_lists(expected)
+ actual_val = self.sort_dict_lists(actual)
+ self.assertEqual(expect_val, actual_val)
+
+ def sort_dict_lists(self, dic):
+ for key, value in dic.iteritems():
+ if isinstance(value, list):
+ dic[key] = sorted(value)
+ elif isinstance(value, dict):
+ dic[key] = self.sort_dict_lists(value)
+ return dic
+
+ def assertDictSupersetOf(self, expected_subset, actual_superset):
+ """Checks that actual dict contains the expected dict.
+
+ After checking that the arguments are of the right type, this checks
+ that each item in expected_subset is in, and matches, what is in
+ actual_superset. Separate tests are done, so that detailed info can
+ be reported upon failure.
+ """
+ if not isinstance(expected_subset, dict):
+ self.fail("expected_subset (%s) is not an instance of dict" %
+ type(expected_subset))
+ if not isinstance(actual_superset, dict):
+ self.fail("actual_superset (%s) is not an instance of dict" %
+ type(actual_superset))
+ for k, v in expected_subset.items():
+ self.assertIn(k, actual_superset)
+ self.assertEqual(v, actual_superset[k],
+ "Key %(key)s expected: %(exp)r, actual %(act)r" %
+ {'key': k, 'exp': v, 'act': actual_superset[k]})