import ast
import os
-import shutil
-import tempfile
import webob
from oslo.config import cfg
def setUp(self):
super(AdminActionsTest, self).setUp()
- self.tempdir = tempfile.mkdtemp()
- self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')
- self.flags(lock_path=self.tempdir)
self.volume_api = volume_api.API()
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
- def tearDown(self):
- shutil.rmtree(self.tempdir)
- super(AdminActionsTest, self).tearDown()
-
def test_reset_status_as_admin(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
self.context.project_id = 'fake'
self.context.user_id = 'fake'
- def tearDown(self):
- super(BackupsAPITestCase, self).tearDown()
-
@staticmethod
def _create_backup(volume_id=1,
display_name='test_backup',
#reset notifier drivers left over from other api/contrib tests
notifier_api._reset_drivers()
test_notifier.NOTIFICATIONS = []
-
- def tearDown(self):
- notifier_api._reset_drivers()
- super(QoSSpecManageApiTest, self).tearDown()
+ self.addCleanup(notifier_api._reset_drivers)
def test_index(self):
self.stubs.Set(qos_specs, 'get_all_specs',
self.context = context.get_admin_context()
self.controller = services.ServiceController()
- def tearDown(self):
- super(ServicesTest, self).tearDown()
-
def test_services_list(self):
req = FakeRequest()
res_dict = self.controller.index(req)
"""to reset notifier drivers left over from other api/contrib tests"""
notifier_api._reset_drivers()
test_notifier.NOTIFICATIONS = []
-
- def tearDown(self):
- notifier_api._reset_drivers()
- super(VolumeTypesExtraSpecsTest, self).tearDown()
+ self.addCleanup(notifier_api._reset_drivers)
def test_index(self):
self.stubs.Set(cinder.db, 'volume_type_extra_specs_get',
"""to reset notifier drivers left over from other api/contrib tests"""
notifier_api._reset_drivers()
test_notifier.NOTIFICATIONS = []
-
- def tearDown(self):
- notifier_api._reset_drivers()
- super(VolumeTypesManageApiTest, self).tearDown()
+ self.addCleanup(notifier_api._reset_drivers)
def test_volume_types_delete(self):
self.stubs.Set(volume_types, 'get_volume_type',
for _meth in self._methods:
self.api_patchers[_meth] = mock.patch('cinder.volume.API.' + _meth)
self.api_patchers[_meth].start()
+ self.addCleanup(self.api_patchers[_meth].stop)
self.api_patchers[_meth].return_value = True
vol = {'id': 'fake', 'host': 'fake', 'status': 'available', 'size': 1,
'migration_status': None, 'volume_type_id': 'fake'}
self.get_patcher = mock.patch('cinder.volume.API.get')
self.mock_volume_get = self.get_patcher.start()
+ self.addCleanup(self.get_patcher.stop)
self.mock_volume_get.return_value = vol
self.update_patcher = mock.patch('cinder.volume.API.update')
self.mock_volume_update = self.update_patcher.start()
+ self.addCleanup(self.update_patcher.stop)
self.mock_volume_update.return_value = vol
self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')
- def tearDown(self):
- for patcher in self.api_patchers:
- self.api_patchers[patcher].stop()
- self.update_patcher.stop()
- self.get_patcher.stop()
- super(VolumeActionsTest, self).tearDown()
-
def test_simple_api_actions(self):
app = fakes.wsgi_app()
for _action in self._actions:
name = path.split('.')[-1]
self.retype_patchers[name] = mock.patch(path)
self.retype_mocks[name] = self.retype_patchers[name].start()
+ self.addCleanup(self.retype_patchers[name].stop)
self.retype_mocks['get_volume_type'].side_effect = get_vol_type
self.retype_mocks['get_volume_type_by_name'].side_effect = get_vol_type
super(VolumeRetypeActionsTest, self).setUp()
- def tearDown(self):
- for name, patcher in self.retype_patchers.iteritems():
- patcher.stop()
- super(VolumeRetypeActionsTest, self).tearDown()
-
def _retype_volume_exec(self, expected_status, new_type='foo'):
req = webob.Request.blank('/v2/fake/volumes/1/action')
req.method = 'POST'
self.ctxt = context.RequestContext('fake', 'fake')
self.volume_id = self._create_volume(self.ctxt)
-
- def tearDown(self):
- db.volume_destroy(self.ctxt.elevated(), self.volume_id)
- super(VolumeEncryptionMetadataTest, self).tearDown()
+ self.addCleanup(db.volume_destroy, self.ctxt.elevated(),
+ self.volume_id)
def test_index(self):
self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
def setUp(self):
super(VolumeTransferAPITestCase, self).setUp()
- def tearDown(self):
- super(VolumeTransferAPITestCase, self).tearDown()
-
@staticmethod
def _create_transfer(volume_id=1,
display_name='test_transfer'):
"""to reset notifier drivers left over from other api/contrib tests"""
notifier_api._reset_drivers()
test_notifier.NOTIFICATIONS = []
-
- def tearDown(self):
- notifier_api._reset_drivers()
- super(VolumeTypeEncryptionTest, self).tearDown()
+ self.addCleanup(notifier_api._reset_drivers)
def _get_response(self, volume_type, admin=True,
url='/v2/fake/types/%s/encryption',
"""
super(WsgiLimiterProxyTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
- self.oldHTTPConnection = (
+ oldHTTPConnection = (
wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))
self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
+ self.addCleanup(self._restore, oldHTTPConnection)
+
+ def _restore(self, oldHTTPConnection):
+ # restore original HTTPConnection object
+ httplib.HTTPConnection = oldHTTPConnection
def test_200(self):
"""Successful request test."""
self.assertEqual((delay, error), expected)
- def tearDown(self):
- # restore original HTTPConnection object
- httplib.HTTPConnection = self.oldHTTPConnection
- super(WsgiLimiterProxyTest, self).tearDown()
-
class LimitsViewBuilderTest(test.TestCase):
def setUp(self):