This is step one in moving towards testr.
Change-Id: Ieac7fbe34ddc9ca05812faca76b68ff7d37ca708
"""
import functools
-import unittest
+import os
import uuid
+import fixtures
import mox
-import nose.plugins.skip
from oslo.config import cfg
import stubout
+import testtools
from cinder import flags
from cinder.openstack.common import log as logging
@functools.wraps(func)
def _skipper(*args, **kw):
"""Wrapped skipper function."""
- raise nose.SkipTest(self.message)
+ raise testtools.TestCase.skipException(self.message)
return _skipper
def _skipper(*args, **kw):
"""Wrapped skipper function."""
if self.condition:
- raise nose.SkipTest(self.message)
+ raise testtools.TestCase.skipException(self.message)
func(*args, **kw)
return _skipper
def _skipper(*args, **kw):
"""Wrapped skipper function."""
if not self.condition:
- raise nose.SkipTest(self.message)
+ raise testtools.TestCase.skipException(self.message)
func(*args, **kw)
return _skipper
def _skipper(*args, **kw):
"""Wrapped skipper function."""
if FLAGS.fake_tests:
- raise unittest.SkipTest('Test cannot be run in fake mode')
+ raise testtools.TestCase.skipException(
+ 'Test cannot be run in fake mode')
else:
return func(*args, **kw)
return _skipper
pass
-class TestCase(unittest.TestCase):
+class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
+ test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
+ try:
+ test_timeout = int(test_timeout)
+ except ValueError:
+ # If timeout value is invalid do not set a timeout.
+ test_timeout = 0
+ if test_timeout > 0:
+ self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+ self.useFixture(fixtures.NestedTempfile())
+ self.useFixture(fixtures.TempHomeDir())
+
+ if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
+ os.environ.get('OS_STDOUT_CAPTURE') == '1'):
+ stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+ if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
+ os.environ.get('OS_STDERR_CAPTURE') == '1'):
+ stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+
+ self.log_fixture = self.useFixture(fixtures.FakeLogger())
+
fake_flags.set_defaults(FLAGS)
flags.parse_args([], default_config_files=[])
# because it screws with our generators
self.mox = mox.Mox()
self.stubs = stubout.StubOutForTesting()
+ self.addCleanup(self.mox.UnsetStubs)
+ self.addCleanup(self.stubs.UnsetAll)
+ self.addCleanup(self.stubs.SmartUnsetAll)
+ self.addCleanup(self.mox.VerifyAll)
self.injected = []
self._services = []
FLAGS.set_override('fatal_exception_format_errors', True)
+ self.addCleanup(FLAGS.reset)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
- try:
- self.mox.UnsetStubs()
- self.stubs.UnsetAll()
- self.stubs.SmartUnsetAll()
- self.mox.VerifyAll()
- super(TestCase, self).tearDown()
- finally:
- # Reset any overridden flags
- FLAGS.reset()
-
- # Stop any timers
- for x in self.injected:
- try:
- x.stop()
- except AssertionError:
- pass
-
- # Kill any services
- for x in self._services:
- try:
- x.kill()
- except Exception:
- pass
-
- # Delete attributes that don't start with _ so they don't pin
- # memory around unnecessarily for the duration of the test
- # suite
- for key in [k for k in self.__dict__.keys() if k[0] != '_']:
- del self.__dict__[key]
+
+ # Stop any timers
+ for x in self.injected:
+ try:
+ x.stop()
+ except AssertionError:
+ pass
+
+ # Kill any services
+ for x in self._services:
+ try:
+ x.kill()
+ except Exception:
+ pass
+
+ # Delete attributes that don't start with _ so they don't pin
+ # memory around unnecessarily for the duration of the test
+ # suite
+ for key in [k for k in self.__dict__.keys() if k[0] != '_']:
+ del self.__dict__[key]
+ super(TestCase, self).tearDown()
def flags(self, **kw):
"""Override flag variables for a test."""
self.flags(lock_path=self.tempdir)
self.volume_api = volume_api.API()
- def tearDown(self):
- shutil.rmtree(self.tempdir)
-
def test_reset_status_as_admin(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
def tearDown(self):
# restore original HTTPConnection object
httplib.HTTPConnection = self.oldHTTPConnection
+ super(WsgiLimiterProxyTest, self).tearDown()
class LimitsViewBuilderTest(test.TestCase):
def tearDown(self):
# restore original HTTPConnection object
httplib.HTTPConnection = self.oldHTTPConnection
+ super(WsgiLimiterProxyTest, self).tearDown()
class LimitsViewBuilderTest(test.TestCase):
# under the License.
import time
-import unittest
from cinder.openstack.common import log as logging
from cinder import service
found_volume = self.api.get_volume(created_volume_id)
self.assertEqual(created_volume_id, found_volume['id'])
self.assertEqual(found_volume['display_name'], 'vol-one')
-
-if __name__ == "__main__":
- unittest.main()
ONE_GB_IN_BYTES = 1024 * 1024 * 1024
def setUp(self):
+ super(GlusterFsDriverTestCase, self).setUp()
self._mox = mox_lib.Mox()
self._configuration = mox_lib.MockObject(conf.Configuration)
self._configuration.append_config_values(mox_lib.IgnoreArg())
def tearDown(self):
self._mox.UnsetStubs()
self.stubs.UnsetAll()
+ super(GlusterFsDriverTestCase, self).tearDown()
def stub_out_not_replaying(self, obj, attr_name):
attr_to_replace = getattr(obj, attr_name)
"""Test case for NetApp specific NFS clone driver."""
def setUp(self):
+ super(NetappNfsDriverTestCase, self).setUp()
self._mox = mox.Mox()
self._driver = netapp_nfs.NetAppNFSDriver(
configuration=create_configuration())
-
- def tearDown(self):
- self._mox.UnsetStubs()
+ self.addCleanup(self._mox.UnsetStubs)
def test_check_for_setup_error(self):
mox = self._mox
"""Test case for NetApp C Mode specific NFS clone driver"""
def setUp(self):
+ super(NetappCmodeNfsDriverTestCase, self).setUp()
self._mox = mox.Mox()
self._custom_setup()
+ self.addCleanup(self._mox.UnsetStubs)
def _custom_setup(self):
self._driver = netapp_nfs.NetAppCmodeNfsDriver(
configuration=create_configuration())
- def tearDown(self):
- self._mox.UnsetStubs()
-
def test_check_for_setup_error(self):
mox = self._mox
drv = self._driver
TEST_FILE_NAME = 'test.txt'
def setUp(self):
+ super(RemoteFsDriverTestCase, self).setUp()
self._driver = nfs.RemoteFsDriver()
self._mox = mox_lib.Mox()
- pass
-
- def tearDown(self):
- self._mox.UnsetStubs()
+ self.addCleanup(self._mox.UnsetStubs)
def test_create_sparsed_file(self):
(mox, drv) = self._mox, self._driver
ONE_GB_IN_BYTES = 1024 * 1024 * 1024
def setUp(self):
+ super(NfsDriverTestCase, self).setUp()
self._mox = mox_lib.Mox()
self.stubs = stubout.StubOutForTesting()
self.configuration = mox_lib.MockObject(conf.Configuration)
self.configuration.nfs_sparsed_volumes = True
self._driver = nfs.NfsDriver(configuration=self.configuration)
self._driver.shares = {}
-
- def tearDown(self):
- self._mox.UnsetStubs()
- self.stubs.UnsetAll()
+ self.addCleanup(self.stubs.UnsetAll)
+ self.addCleanup(self._mox.UnsetStubs)
def stub_out_not_replaying(self, obj, attr_name):
attr_to_replace = getattr(obj, attr_name)
import random
import re
import socket
-import unittest
from cinder import context
from cinder import exception
self.assertAlmostEqual(stats['free_capacity_gb'], 3287.5)
-# The test case does not rely on Openstack runtime,
-# so it should inherit from unittest.TestCase.
-class CLIResponseTestCase(unittest.TestCase):
+class CLIResponseTestCase(test.TestCase):
def test_empty(self):
self.assertEqual(0, len(storwize_svc.CLIResponse('')))
self.assertEqual(0, len(storwize_svc.CLIResponse(('', 'stderr'))))
self.assertEqual(list(resp.select('port_id', 'port_status')),
[('500507680210C744', 'active'),
('500507680240C744', 'inactive')])
-
-if __name__ == '__main__':
- unittest.main()
import os.path
import ssl
import tempfile
-import unittest
import urllib2
from oslo.config import cfg
)
-class TestLoaderNormalFilesystem(unittest.TestCase):
+class TestLoaderNormalFilesystem(test.TestCase):
"""Loader tests with normal filesystem (unmodified os.path module)."""
_paste_config = """
"""
def setUp(self):
+ super(TestLoaderNormalFilesystem, self).setUp()
self.config = tempfile.NamedTemporaryFile(mode="w+t")
self.config.write(self._paste_config.lstrip())
self.config.seek(0)
self.config.flush()
self.loader = cinder.wsgi.Loader(self.config.name)
+ self.addCleanup(self.config.close)
def test_config_found(self):
self.assertEquals(self.config.name, self.loader.config_path)
url_parser = self.loader.load_app("test_app")
self.assertEquals("/tmp", url_parser.directory)
- def tearDown(self):
- self.config.close()
-
-class TestWSGIServer(unittest.TestCase):
+class TestWSGIServer(test.TestCase):
"""WSGI server tests."""
def _ipv6_configured():
try:
import contextlib
import StringIO
-import unittest
import mock
import mox
from cinder.db import api as db_api
from cinder import exception
+from cinder import test
from cinder.volume import configuration as conf
from cinder.volume import driver as parent_driver
from cinder.volume.drivers.xenapi import lib
return driver.XenAPINFSDriver(configuration=configuration)
-class DriverTestCase(unittest.TestCase):
+class DriverTestCase(test.TestCase):
def assert_flag(self, flagname):
self.assertTrue(hasattr(driver.FLAGS, flagname))
self.assertEquals('xensm', stats['storage_protocol'])
-class ToolsTest(unittest.TestCase):
+class ToolsTest(test.TestCase):
@mock.patch('cinder.volume.drivers.xenapi.tools._stripped_first_line_of')
def test_get_this_vm_uuid(self, mock_read_first_line):
mock_read_first_line.return_value = 'someuuid'
cover-erase = true
cover-inclusive = true
verbosity=2
-detailed-errors=1
coverage>=3.6
distribute>=0.6.24
+fixtures>=0.3.12
hp3parclient>=1.0.0
mock>=0.8.0
mox>=0.5.3
openstack.nose_plugin>=0.7
psycopg2
sphinx>=1.1.2
+testtools>=0.9.29