import six
from neutron.common import constants
+from neutron.tests import base
from neutron.tests.common import helpers as c_helpers
from neutron.tests.functional.agent.linux import helpers
-from neutron.tests import sub_base
-class ConfigDict(sub_base.AttributeDict):
+class ConfigDict(base.AttributeDict):
def update(self, other):
self.convert_to_attr_dict(other)
super(ConfigDict, self).update(other)
"""
for key, value in other.iteritems():
if isinstance(value, dict):
- if not isinstance(value, sub_base.AttributeDict):
- other[key] = sub_base.AttributeDict(value)
+ if not isinstance(value, base.AttributeDict):
+ other[key] = base.AttributeDict(value)
self.convert_to_attr_dict(value)
})
def _generate_host(self):
- return sub_base.get_rand_name(prefix='host-')
+ return base.get_rand_name(prefix='host-')
def _generate_state_path(self, temp_dir):
# Assume that temp_dir will be removed by the caller
def _generate_bridge_mappings(self):
return ('physnet1:%s' %
- sub_base.get_rand_name(
+ base.get_rand_name(
prefix='br-eth',
max_length=constants.DEVICE_NAME_MAX_LEN))
def _generate_integration_bridge(self):
- return sub_base.get_rand_name(prefix='br-int',
- max_length=constants.DEVICE_NAME_MAX_LEN)
+ return base.get_rand_name(prefix='br-int',
+ max_length=constants.DEVICE_NAME_MAX_LEN)
+++ /dev/null
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Base test case for all tests.
-
-To change behavoir only for tests that do not rely on Tempest, please
-target the neutron.tests.base module instead.
-
-There should be no non-test Neutron imports in this module to ensure
-that the functional API tests can import Tempest without triggering
-errors due to duplicate configuration definitions.
-"""
-
-import contextlib
-import logging as std_logging
-import os
-import os.path
-import random
-import traceback
-
-import eventlet.timeout
-import fixtures
-import mock
-from oslo_utils import strutils
-import testtools
-
-from neutron.tests import post_mortem_debug
-
-
-LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
-
-
-def get_rand_name(max_length=None, prefix='test'):
- name = prefix + str(random.randint(1, 0x7fffffff))
- return name[:max_length] if max_length is not None else name
-
-
-def bool_from_env(key, strict=False, default=False):
- value = os.environ.get(key)
- return strutils.bool_from_string(value, strict=strict, default=default)
-
-
-class AttributeDict(dict):
-
- """
- Provide attribute access (dict.key) to dictionary values.
- """
-
- def __getattr__(self, name):
- """Allow attribute access for all keys in the dict."""
- if name in self:
- return self[name]
- raise AttributeError(_("Unknown attribute '%s'.") % name)
-
-
-class SubBaseTestCase(testtools.TestCase):
-
- def setUp(self):
- super(SubBaseTestCase, self).setUp()
-
- # Configure this first to ensure pm debugging support for setUp()
- debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
- if debugger:
- self.addOnException(post_mortem_debug.get_exception_handler(
- debugger))
-
- if bool_from_env('OS_DEBUG'):
- _level = std_logging.DEBUG
- else:
- _level = std_logging.INFO
- capture_logs = bool_from_env('OS_LOG_CAPTURE')
- if not capture_logs:
- std_logging.basicConfig(format=LOG_FORMAT, level=_level)
- self.log_fixture = self.useFixture(
- fixtures.FakeLogger(
- format=LOG_FORMAT,
- level=_level,
- nuke_handlers=capture_logs,
- ))
-
- test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
- if test_timeout == -1:
- test_timeout = 0
- if test_timeout > 0:
- self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
-
- # If someone does use tempfile directly, ensure that it's cleaned up
- self.useFixture(fixtures.NestedTempfile())
- self.useFixture(fixtures.TempHomeDir())
-
- self.addCleanup(mock.patch.stopall)
-
- if bool_from_env('OS_STDOUT_CAPTURE'):
- stdout = self.useFixture(fixtures.StringStream('stdout')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
- if bool_from_env('OS_STDERR_CAPTURE'):
- stderr = self.useFixture(fixtures.StringStream('stderr')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
-
- self.addOnException(self.check_for_systemexit)
-
- def check_for_systemexit(self, exc_info):
- if isinstance(exc_info[1], SystemExit):
- self.fail("A SystemExit was raised during the test. %s"
- % traceback.format_exception(*exc_info))
-
- @contextlib.contextmanager
- def assert_max_execution_time(self, max_execution_time=5):
- with eventlet.timeout.Timeout(max_execution_time, False):
- yield
- return
- self.fail('Execution of this test timed out')
-
- def assertOrderedEqual(self, expected, actual):
- expect_val = self.sort_dict_lists(expected)
- actual_val = self.sort_dict_lists(actual)
- self.assertEqual(expect_val, actual_val)
-
- def sort_dict_lists(self, dic):
- for key, value in dic.iteritems():
- if isinstance(value, list):
- dic[key] = sorted(value)
- elif isinstance(value, dict):
- dic[key] = self.sort_dict_lists(value)
- return dic
-
- def assertDictSupersetOf(self, expected_subset, actual_superset):
- """Checks that actual dict contains the expected dict.
-
- After checking that the arguments are of the right type, this checks
- that each item in expected_subset is in, and matches, what is in
- actual_superset. Separate tests are done, so that detailed info can
- be reported upon failure.
- """
- if not isinstance(expected_subset, dict):
- self.fail("expected_subset (%s) is not an instance of dict" %
- type(expected_subset))
- if not isinstance(actual_superset, dict):
- self.fail("actual_superset (%s) is not an instance of dict" %
- type(actual_superset))
- for k, v in expected_subset.items():
- self.assertIn(k, actual_superset)
- self.assertEqual(v, actual_superset[k],
- "Key %(key)s expected: %(exp)r, actual %(act)r" %
- {'key': k, 'exp': v, 'act': actual_superset[k]})