from cinder.tests.unit.api import fakes
from cinder.tests.unit.brick import fake_lvm
from cinder.tests.unit import conf_fixture
+from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_driver
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
metadata = metadata or {}
snap = objects.Snapshot(ctxt or context.get_admin_context())
snap.volume_size = size
- snap.user_id = 'fake'
- snap.project_id = 'fake'
+ snap.user_id = fake.user_id
+ snap.project_id = fake.project_id
snap.volume_id = volume_id
snap.status = "creating"
if metadata is not None:
self.volume = importutils.import_object(CONF.volume_manager)
self.configuration = mock.Mock(conf.Configuration)
self.context = context.get_admin_context()
- self.context.user_id = 'fake'
+ self.context.user_id = fake.user_id
# NOTE(mriedem): The id is hard-coded here for tracking race fail
# assertions with the notification code, it's part of an
# elastic-recheck query so don't remove it or change it.
'volume_id': volume_id,
'volume_type': None,
'snapshot_id': None,
- 'user_id': 'fake',
+ 'user_id': fake.user_id,
'launched_at': 'DONTCARE',
'size': 1,
'replication_status': 'disabled',
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_volume_with_provider_id(self):
- volume_params_with_provider_id = dict(provider_id='1111-aaaa',
+ volume_params_with_provider_id = dict(provider_id=fake.provider_id,
**self.volume_params)
volume = tests_utils.create_volume(self.context,
**volume_params_with_provider_id)
self.volume.create_volume(self.context, volume['id'])
- self.assertEqual('1111-aaaa', volume['provider_id'])
+ self.assertEqual(fake.provider_id, volume['provider_id'])
@mock.patch.object(keymgr, 'API', new=fake_keymgr.fake_api)
def test_create_delete_volume_with_encrypted_volume_type(self):
- db_vol_type = db.volume_type_create(self.context,
- {'id': 'type-id', 'name': 'LUKS'})
+ db_vol_type = db.volume_type_create(
+ self.context, {'id': fake.volume_type_id, 'name': 'LUKS'})
db.volume_type_encryption_create(
- self.context, 'type-id',
+ self.context, fake.volume_type_id,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER})
volume = self.volume_api.create(self.context,
'volume_get_all_by_project') as by_project:
with mock.patch.object(volume_api.db,
'volume_get_all') as get_all:
- db_volume = {'volume_type_id': 'fake_type_id',
+ db_volume = {'volume_type_id': fake.volume_type_id,
'name': 'fake_name',
'host': 'fake_host',
- 'id': 'fake_volume_id'}
+ 'id': fake.volume_id}
volume = fake_volume.fake_db_volume(**db_volume)
by_project.return_value = [volume]
biz_type = db.volume_type_get_by_name(context.get_admin_context(),
'biz')
- snapshot = {'id': 1234,
+ snapshot = {'id': fake.snapshot_id,
'status': 'available',
'volume_size': 10,
'volume_type_id': biz_type['id']}
biz_type = db.volume_type_get_by_name(context.get_admin_context(),
'biz')
- source_vol = {'id': 1234,
+ source_vol = {'id': fake.volume_id,
'status': 'available',
'volume_size': 10,
'volume_type': biz_type,
'id': '34e54c31-3bc8-5c1d-9fff-2225bcce4b59',
'description': None}
- source_vol = {'id': 1234,
+ source_vol = {'id': fake.volume_id,
'status': 'available',
'volume_size': 10,
'volume_type': biz_type,
'id': '34e54c31-3bc8-5c1d-9fff-2225bcce4b59',
'description': None}
- source_vol = {'id': 1234,
+ source_vol = {'id': fake.volume_id,
'status': 'available',
'volume_size': 10,
'volume_type': biz_type,
'volume_type_id': biz_type['id']}
- snapshot = {'id': 1234,
+ snapshot = {'id': fake.snapshot_id,
'status': 'available',
'volume_size': 10,
'volume_type_id': biz_type['id']}
def test_create_volume_from_snapshot_fail_bad_size(self):
"""Test volume can't be created from snapshot with bad volume size."""
volume_api = cinder.volume.api.API()
- snapshot = {'id': 1234,
+ snapshot = {'id': fake.snapshot_id,
'status': 'available',
'volume_size': 10}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
1,
'name',
'description',
- snapshot='fake_id',
- image_id='fake_id',
- source_volume='fake_id')
+ snapshot=fake.snapshot_id,
+ image_id=fake.image_id,
+ source_volume=fake.volume_id)
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
mock_get_target):
"""Make sure initialize_connection returns correct information."""
_fake_admin_meta = {'fake-key': 'fake-value'}
- _fake_volume = {'volume_type_id': 'fake_type_id',
+ _fake_volume = {'volume_type_id': fake.volume_type_id,
'name': 'fake_name',
'host': 'fake_host',
- 'id': 'fake_volume_id',
+ 'id': fake.volume_id,
'volume_admin_metadata': _fake_admin_meta}
_mock_volume_get.return_value = _fake_volume
# initialize_connection() passes qos_specs that is designated to
# be consumed by front-end or both front-end and back-end
conn_info = self.volume.initialize_connection(self.context,
- 'fake_volume_id',
+ fake.volume_id,
connector)
self.assertDictMatch(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'both'})
conn_info = self.volume.initialize_connection(self.context,
- 'fake_volume_id',
+ fake.volume_id,
connector)
self.assertDictMatch(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'back-end'})
type_qos.return_value = dict(qos_specs=qos_values)
conn_info = self.volume.initialize_connection(self.context,
- 'fake_volume_id',
+ fake.volume_id,
connector)
self.assertIsNone(conn_info['data']['qos_specs'])
_mock_create_export):
"""Test exception path for create_export failure."""
_fake_admin_meta = {'fake-key': 'fake-value'}
- _fake_volume = {'volume_type_id': 'fake_type_id',
+ _fake_volume = {'volume_type_id': fake.volume_type_id,
'name': 'fake_name',
'host': 'fake_host',
- 'id': 'fake_volume_id',
+ 'id': fake.volume_id,
'volume_admin_metadata': _fake_admin_meta}
_mock_volume_get.return_value = _fake_volume
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.initialize_connection,
self.context,
- 'fake_volume_id',
+ fake.volume_id,
connector)
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
fake_volume = {'volume_type_id': None,
'name': 'fake_name',
'host': 'fake_host',
- 'id': 'fake_volume_id',
+ 'id': fake.volume_id,
'volume_admin_metadata': fake_admin_meta,
'encryption_key_id': ('d371e7bb-7392-4c27-'
'ac0b-ebd9f5d16078')}
'display_name': None,
'snapshot_id': snapshot_id,
'status': 'creating',
- 'tenant_id': 'fake',
- 'user_id': 'fake',
+ 'tenant_id': fake.project_id,
+ 'user_id': fake.user_id,
'volume_id': volume['id'],
'volume_size': 1,
'availability_zone': 'nova',
"""Test volume migration done by driver."""
# stub out driver and rpc functions
self.stubs.Set(self.volume.driver, 'migrate_volume',
- lambda x, y, z, new_type_id=None: (True,
- {'user_id': 'foo'}))
+ lambda x, y, z, new_type_id=None: (
+ True, {'user_id': fake.user_id}))
volume = tests_utils.create_volume(self.context, size=0,
host=CONF.host,
def test_migrate_volume_generic(self, volume_get,
migrate_volume_completion,
nova_api):
- fake_volume_id = 'fake_volume_id'
- fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
+ fake_db_new_volume = {'status': 'available', 'id': fake.volume_id}
fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
new_volume_obj = fake_volume.fake_volume_obj(self.context,
**fake_new_volume)
migrate_volume_completion,
nova_api):
attached_host = 'some-host'
- fake_volume_id = 'fake_volume_id'
+ fake_volume_id = fake.volume_id
fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
host_obj = {'host': 'newhost', 'capabilities': {}}
def test_update_migrated_volume(self, volume_update):
fake_host = 'fake_host'
fake_new_host = 'fake_new_host'
- fake_update = {'_name_id': 'updated_id',
+ fake_update = {'_name_id': fake.volume2_name_id,
'provider_location': 'updated_location'}
fake_elevated = context.RequestContext('fake', self.project_id,
is_admin=True)
self.context, size=1,
status='available',
provider_location='fake_provider_location',
- _name_id='fake_name_id',
+ _name_id=fake.volume_name_id,
host=fake_new_host)
- new_volume._name_id = 'fake_name_id'
+ new_volume._name_id = fake.volume_name_id
new_volume.provider_location = 'fake_provider_location'
fake_update_error = {'_name_id': new_volume._name_id,
'provider_location':
volume_update.reset_mock()
# Reset the volume objects to their original value, since they
# were changed in the last call.
- new_volume._name_id = 'fake_name_id'
+ new_volume._name_id = fake.volume_name_id
new_volume.provider_location = 'fake_provider_location'
migrate_update.side_effect = NotImplementedError
self.volume.update_migrated_volume(self.context, volume,
class ConsistencyGroupTestCase(BaseVolumeTestCase):
def test_delete_volume_in_consistency_group(self):
"""Test deleting a volume that's tied to a consistency group fails."""
- consistencygroup_id = '12345678-1234-5678-1234-567812345678'
+ consistencygroup_id = fake.consistency_group_id
volume_api = cinder.volume.api.API()
self.volume_params.update({'status': 'available',
'consistencygroup_id': consistencygroup_id})
'availability_zone': 'nova',
'tenant_id': self.context.project_id,
'created_at': 'DONTCARE',
- 'user_id': 'fake',
+ 'user_id': fake.user_id,
'consistencygroup_id': group.id
}
self.assertDictMatch(expected, msg['payload'])
'availability_zone': 'nova',
'tenant_id': self.context.project_id,
'created_at': 'DONTCARE',
- 'user_id': 'fake',
+ 'user_id': fake.user_id,
'consistencygroup_id': group.id
}
self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, cg.status)
'availability_zone': 'nova',
'tenant_id': self.context.project_id,
'created_at': 'DONTCARE',
- 'user_id': 'fake',
+ 'user_id': fake.user_id,
'consistencygroup_id': group2.id,
}
self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, cg2.status)
self.volume.delete_consistencygroup(self.context, group)
def test_sort_snapshots(self):
- vol1 = {'id': '1', 'name': 'volume 1',
- 'snapshot_id': '1',
- 'consistencygroup_id': '1'}
- vol2 = {'id': '2', 'name': 'volume 2',
- 'snapshot_id': '2',
- 'consistencygroup_id': '1'}
- vol3 = {'id': '3', 'name': 'volume 3',
- 'snapshot_id': '3',
- 'consistencygroup_id': '1'}
- snp1 = {'id': '1', 'name': 'snap 1',
- 'cgsnapshot_id': '1'}
- snp2 = {'id': '2', 'name': 'snap 2',
- 'cgsnapshot_id': '1'}
- snp3 = {'id': '3', 'name': 'snap 3',
- 'cgsnapshot_id': '1'}
+ vol1 = {'id': fake.volume_id, 'name': 'volume 1',
+ 'snapshot_id': fake.snapshot_id,
+ 'consistencygroup_id': fake.consistency_group_id}
+ vol2 = {'id': fake.volume2_id, 'name': 'volume 2',
+ 'snapshot_id': fake.snapshot2_id,
+ 'consistencygroup_id': fake.consistency_group_id}
+ vol3 = {'id': fake.volume3_id, 'name': 'volume 3',
+ 'snapshot_id': fake.snapshot3_id,
+ 'consistencygroup_id': fake.consistency_group_id}
+ snp1 = {'id': fake.snapshot_id, 'name': 'snap 1',
+ 'cgsnapshot_id': fake.consistency_group_id}
+ snp2 = {'id': fake.snapshot2_id, 'name': 'snap 2',
+ 'cgsnapshot_id': fake.consistency_group_id}
+ snp3 = {'id': fake.snapshot3_id, 'name': 'snap 3',
+ 'cgsnapshot_id': fake.consistency_group_id}
snp1_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp1)
snp2_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp2)
snp3_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp3)
i += 1
self.assertEqual(vol['snapshot_id'], snap.id)
- snapshots[2]['id'] = '9999'
+ snapshots[2]['id'] = fake.will_not_be_found_id
self.assertRaises(exception.SnapshotNotFound,
self.volume._sort_snapshots,
volumes, snapshots)
def _create_cgsnapshot(self, group_id, volume_id, size='0'):
"""Create a cgsnapshot object."""
cgsnap = objects.CGSnapshot(self.context)
- cgsnap.user_id = 'fake'
- cgsnap.project_id = 'fake'
+ cgsnap.user_id = fake.user_id
+ cgsnap.project_id = fake.project_id
cgsnap.consistencygroup_id = group_id
cgsnap.status = "creating"
cgsnap.create()
# Create a snapshot object
snap = objects.Snapshot(context.get_admin_context())
snap.volume_size = size
- snap.user_id = 'fake'
- snap.project_id = 'fake'
+ snap.user_id = fake.user_id
+ snap.project_id = fake.project_id
snap.volume_id = volume_id
snap.status = "available"
snap.cgsnapshot_id = cgsnap.id
'name': None,
'cgsnapshot_id': cgsnapshot.id,
'status': 'creating',
- 'tenant_id': 'fake',
- 'user_id': 'fake',
+ 'tenant_id': fake.project_id,
+ 'user_id': fake.user_id,
'consistencygroup_id': group.id
}
self.assertDictMatch(expected, msg['payload'])
self.ctx = context.get_admin_context(read_deleted="yes")
self.db_attrs = [
{
- 'id': 1,
+ 'id': fake.volume_id,
'host': 'devstack',
- 'project_id': 'p1',
+ 'project_id': fake.project_id,
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
},
{
- 'id': 2,
+ 'id': fake.volume2_id,
'host': 'devstack',
- 'project_id': 'p1',
+ 'project_id': fake.project_id,
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
},
{
- 'id': 3,
+ 'id': fake.volume3_id,
'host': 'devstack',
- 'project_id': 'p1',
+ 'project_id': fake.project_id,
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
},
{
- 'id': 4,
+ 'id': fake.volume4_id,
'host': 'devstack',
- 'project_id': 'p1',
+ 'project_id': fake.project_id,
'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
},
{
- 'id': 5,
+ 'id': fake.volume5_id,
'host': 'devstack',
- 'project_id': 'p1',
+ 'project_id': fake.project_id,
'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
}
]
self.context,
datetime.datetime(1, 3, 1, 1, 1, 1),
datetime.datetime(1, 4, 1, 1, 1, 1),
- project_id='p1')
+ project_id=fake.project_id)
self.assertEqual(3, len(volumes))
- self.assertEqual(u'2', volumes[0].id)
- self.assertEqual(u'3', volumes[1].id)
- self.assertEqual(u'4', volumes[2].id)
+ self.assertEqual(fake.volume2_id, volumes[0].id)
+ self.assertEqual(fake.volume3_id, volumes[1].id)
+ self.assertEqual(fake.volume4_id, volumes[2].id)
def test_snapshot_get_active_by_window(self):
# Find all all snapshots valid within a timeframe window.
- db.volume_create(self.context, {'id': 1})
+ db.volume_create(self.context, {'id': fake.volume_id})
for i in range(5):
- self.db_attrs[i]['volume_id'] = 1
+ self.db_attrs[i]['volume_id'] = fake.volume_id
# Not in window
del self.db_attrs[0]['id']
datetime.datetime(1, 4, 1, 1, 1, 1)).objects
self.assertEqual(3, len(snapshots))
self.assertEqual(snap2.id, snapshots[0].id)
- self.assertEqual(u'1', snapshots[0].volume_id)
+ self.assertEqual(fake.volume_id, snapshots[0].volume_id)
self.assertEqual(snap3.id, snapshots[1].id)
- self.assertEqual(u'1', snapshots[1].volume_id)
+ self.assertEqual(fake.volume_id, snapshots[1].volume_id)
self.assertEqual(snap4.id, snapshots[2].id)
- self.assertEqual(u'1', snapshots[2].volume_id)
+ self.assertEqual(fake.volume_id, snapshots[2].volume_id)
class DriverTestCase(test.TestCase):
mock_file_open,
mock_temporary_chown):
vol = tests_utils.create_volume(self.context)
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
status='backing-up',
previous_status='in-use')
temp_vol = tests_utils.create_volume(self.context)
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
vol = tests_utils.create_volume(self.context,
status='backing-up',
previous_status='in-use')
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
def test_get_backup_device_available(self):
vol = tests_utils.create_volume(self.context)
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
status='backing-up',
previous_status='in-use')
temp_vol = tests_utils.create_volume(self.context)
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
backup = {}
backup['volume_id'] = volume_id
- backup['user_id'] = 'fake'
- backup['project_id'] = 'fake'
+ backup['user_id'] = fake.user_id
+ backup['project_id'] = fake.project_id
backup['host'] = socket.gethostname()
backup['availability_zone'] = '1'
backup['display_name'] = 'test_check_for_setup_error'
mock_file_open,
mock_temporary_chown):
vol = tests_utils.create_volume(self.context)
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
backup = tests_utils.create_backup(self.context,
vol['id'])
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
self.assertTrue(retyped)
def test_update_migrated_volume(self):
- fake_volume_id = 'vol1'
- fake_new_volume_id = 'vol2'
+ fake_volume_id = fake.volume_id
+ fake_new_volume_id = fake.volume2_id
fake_provider = 'fake_provider'
original_volume_name = CONF.volume_name_template % fake_volume_id
current_name = CONF.volume_name_template % fake_new_volume_id
vol = tests_utils.create_volume(self.context,
status='backing-up',
previous_status='in-use')
- self.context.user_id = 'fake'
- self.context.project_id = 'fake'
+ self.context.user_id = fake.user_id
+ self.context.project_id = fake.project_id
mock_volume_get.return_value = vol
temp_snapshot = tests_utils.create_snapshot(self.context, vol['id'])
vol_name = 'volume-d8cd1feb-2dcc-404d-9b15-b86fe3bec0a1'
ref = {'source-name': 'fake_lv'}
- vol = {'name': vol_name, 'id': 1, 'size': 0}
+ vol = {'name': vol_name, 'id': fake.volume_id, 'size': 0}
with mock.patch.object(self.volume.driver.vg, 'rename_volume'):
model_update = self.volume.driver.manage_existing(vol, ref)
self._setup_stubs_for_manage_existing()
ref = {'source-name': 'fake_lv'}
- vol = {'name': 'test', 'id': 1, 'size': 0}
+ vol = {'name': 'test', 'id': fake.volume_id, 'size': 0}
def _rename_volume(old_name, new_name):
self.assertEqual(ref['source-name'], old_name)
self._setup_stubs_for_manage_existing()
ref = {'source-name': 'fake_lv_bad_size'}
- vol = {'name': 'test', 'id': 1, 'size': 2}
+ vol = {'name': 'test', 'id': fake.volume_id, 'size': 2}
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.driver.manage_existing_get_size,
self._setup_stubs_for_manage_existing()
ref = {'source-name': 'fake_lv'}
- snp = {'name': 'test', 'id': 1, 'size': 0}
+ snp = {'name': 'test', 'id': fake.snapshot_id, 'size': 0}
def _rename_volume(old_name, new_name):
self.assertEqual(ref['source-name'], old_name)
self._setup_stubs_for_manage_existing()
ref = {'source-name': 'fake_nonexistent_lv'}
- snp = {'name': 'test', 'id': 1, 'size': 0, 'status': 'available'}
+ snp = {
+ 'name': 'test',
+ 'id': fake.snapshot_id,
+ 'size': 0,
+ 'status': 'available',
+ }
self.assertRaises(exception.ManageExistingInvalidReference,
self.volume.driver.manage_existing_snapshot_get_size,
self._setup_stubs_for_manage_existing()
ref = {'source-name': 'fake_lv_bad_size'}
- snp = {'name': 'test', 'id': 1, 'size': 2}
+ snp = {'name': 'test', 'id': fake.snapshot_id, 'size': 2}
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.driver.manage_existing_snapshot_get_size,