fake_notifier.stub_notifier(self.stubs)
- CONF.set_override('fatal_exception_format_errors', True)
+ self.override_config('fatal_exception_format_errors', True)
# This will be cleaned up by the NestedTempfile fixture
- CONF.set_override('lock_path', tempfile.mkdtemp())
- CONF.set_override('policy_file',
- os.path.join(
- os.path.abspath(
- os.path.join(
- os.path.dirname(__file__),
- '..',
- )
- ),
- 'cinder/tests/policy.json'))
+ self.override_config('lock_path', tempfile.mkdtemp())
+ self.override_config('policy_file',
+ os.path.join(
+ os.path.abspath(
+ os.path.join(
+ os.path.dirname(__file__),
+ '..',
+ )
+ ),
+ 'cinder/tests/policy.json'))
def _common_cleanup(self):
"""Runs after each test method to tear down test environment."""
for key in [k for k in self.__dict__.keys() if k[0] != '_']:
del self.__dict__[key]
+ def override_config(self, name, override, group=None):
+ """Cleanly override CONF variables."""
+ CONF.set_override(name, override, group)
+ self.addCleanup(CONF.clear_override, name, group)
+
def flags(self, **kw):
"""Override CONF variables for a test."""
for k, v in kw.iteritems():
- CONF.set_override(k, v)
+ self.override_config(k, v)
def log_level(self, level):
"""Set logging level to the specified value."""
mock_copy_volume_to_image.side_effect = \
self.fake_rpc_copy_volume_to_image
- CONF.set_override('glance_core_properties', [])
+ self.override_config('glance_core_properties', [])
req = fakes.HTTPRequest.blank(
'/v2/tenant1/volumes/%s/action' % id)
def test_backup_manager_driver_name(self):
""""Test mapping between backup services and backup drivers."""
- cfg.CONF.set_override('backup_driver', "cinder.backup.services.swift")
+ self.override_config('backup_driver', "cinder.backup.services.swift")
backup_mgr = \
importutils.import_object(CONF.backup_manager)
self.assertEqual('cinder.backup.drivers.swift',
"""Test Case for backups."""
def setUp(self):
- CONF.set_override("backup_driver",
- "cinder.tests.backup.fake_service_with_verify")
+ self.override_config("backup_driver",
+ "cinder.tests.backup.fake_service_with_verify")
super(BackupTestCaseWithVerify, self).setUp()
def test_import_record_with_verify(self):
u'adminURL': u'http://example.com'}]
}]
self.ctxt.project_id = "12345678"
- CONF.set_override("backup_swift_url", "http://public.example.com/")
+ self.override_config("backup_swift_url", "http://public.example.com/")
backup = SwiftBackupDriver(self.ctxt)
self.assertEqual("%s%s" % (CONF.backup_swift_url,
self.ctxt.project_id),
backup.swift_url)
def test_backup_swift_info(self):
- CONF.set_override("swift_catalog_info", "dummy")
+ self.override_config("swift_catalog_info", "dummy")
self.assertRaises(exception.BackupDriverException,
SwiftBackupDriver,
self.ctxt)
def test_local_path(self):
"""local_path common use case."""
- CONF.set_override("glusterfs_mount_point_base",
- self.TEST_MNT_POINT_BASE)
+ self.override_config("glusterfs_mount_point_base",
+ self.TEST_MNT_POINT_BASE)
drv = self._driver
volume = DumbVolume()
mox.StubOutWithMock(brick.remotefs.remotefs.RemoteFsClient,
'get_mount_point')
- CONF.set_override("glusterfs_mount_point_base",
- self.TEST_MNT_POINT_BASE)
+ self.override_config("glusterfs_mount_point_base",
+ self.TEST_MNT_POINT_BASE)
brick.remotefs.remotefs.RemoteFsClient.\
get_mount_point(self.TEST_EXPORT1).AndReturn(hashed_path)
mox = self._mox
drv = self._driver
- CONF.set_override("glusterfs_shares_config",
- self.TEST_SHARES_CONFIG_FILE)
+ self.override_config("glusterfs_shares_config",
+ self.TEST_SHARES_CONFIG_FILE)
mox.StubOutWithMock(os.path, 'exists')
os.path.exists(self.TEST_SHARES_CONFIG_FILE).AndReturn(True)
mox = self._mox
drv = self._driver
- CONF.set_override("glusterfs_shares_config",
- self.TEST_SHARES_CONFIG_FILE)
+ self.override_config("glusterfs_shares_config",
+ self.TEST_SHARES_CONFIG_FILE)
self.stubs.Set(drv, '_load_shares_config',
self._fake_load_shares_config)
drv = self._driver
volume = self._simple_volume()
- CONF.set_override('glusterfs_sparsed_volumes', True)
+ self.override_config('glusterfs_sparsed_volumes', True)
mox.StubOutWithMock(drv, '_create_sparsed_file')
mox.StubOutWithMock(drv, '_set_rw_permissions_for_all')
def test_get_backing_chain_for_path(self):
(mox, drv) = self._mox, self._driver
- CONF.set_override('glusterfs_mount_point_base',
- self.TEST_MNT_POINT_BASE)
+ self.override_config('glusterfs_mount_point_base',
+ self.TEST_MNT_POINT_BASE)
volume = self._simple_volume()
vol_filename = volume['name']
"disk size: 0")
utils.is_blk_device(self.TEST_DEV_PATH).AndReturn(True)
- CONF.set_override('volume_copy_bps_limit', bps_limit)
+ self.override_config('volume_copy_bps_limit', bps_limit)
image_utils.create_temporary_file().AndReturn(self.TEST_DEV_PATH)
"disk_size: 196K (200704 bytes)"
if bps_limit:
- CONF.set_override('volume_copy_bps_limit', bps_limit)
+ self.override_config('volume_copy_bps_limit', bps_limit)
prefix = ('cgexec', '-g', 'blkio:test')
else:
prefix = ()
"cluster_size: 65536\n"\
"disk_size: 196K (200704 bytes)"
- CONF.set_override('volume_copy_bps_limit', bps_limit)
+ self.override_config('volume_copy_bps_limit', bps_limit)
prefix = ('cgexec', '-g', 'blkio:test')
cmd = prefix + ('qemu-img', 'convert', '-O', 'qcow2',
from mox import IgnoreArg
from mox import IsA
from mox import stubout
-from oslo.config import cfg
from cinder import context
from cinder import exception
drv = self._driver
volume = self._simple_volume()
- cfg.CONF.set_override('nfs_sparsed_volumes', True)
+ self.override_config('nfs_sparsed_volumes', True)
mox.StubOutWithMock(drv, '_create_sparsed_file')
mox.StubOutWithMock(drv, '_set_rw_permissions')
self.configuration.nfs_sparsed_volumes = False
volume = self._simple_volume()
- cfg.CONF.set_override('nfs_sparsed_volumes', False)
+ self.override_config('nfs_sparsed_volumes', False)
mox.StubOutWithMock(drv, '_create_regular_file')
mox.StubOutWithMock(drv, '_set_rw_permissions')
self.assertFalse(serv.model_disconnected)
def test_service_with_long_report_interval(self):
- CONF.set_override('service_down_time', 10)
- CONF.set_override('report_interval', 10)
+ self.override_config('service_down_time', 10)
+ self.override_config('report_interval', 10)
service.Service.create(binary="test_service",
manager="cinder.tests.test_service.FakeManager")
self.assertEqual(CONF.service_down_time, 25)
self.assertEqual(test_service.workers, processutils.get_worker_count())
def test_workers_set_good_user_setting(self):
- CONF.set_override('osapi_volume_workers', 8)
+ self.override_config('osapi_volume_workers', 8)
test_service = service.WSGIService("osapi_volume")
self.assertEqual(test_service.workers, 8)
def test_workers_set_zero_user_setting(self):
- CONF.set_override('osapi_volume_workers', 0)
+ self.override_config('osapi_volume_workers', 0)
test_service = service.WSGIService("osapi_volume")
# If a value less than 1 is used, defaults to number of procs available
self.assertEqual(test_service.workers, processutils.get_worker_count())
def test_workers_set_negative_user_setting(self):
- CONF.set_override('osapi_volume_workers', -1)
+ self.override_config('osapi_volume_workers', -1)
self.assertRaises(exception.InvalidInput,
service.WSGIService,
"osapi_volume")
fake_list_availability_zones)
# Test backwards compatibility, default_availability_zone not set
- CONF.set_override('storage_availability_zone', 'az2')
+ self.override_config('storage_availability_zone', 'az2')
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertEqual(volume['availability_zone'], 'az2')
- CONF.set_override('default_availability_zone', 'default-az')
+ self.override_config('default_availability_zone', 'default-az')
volume = volume_api.create(self.context,
1,
'name',