from cinder.i18n import _
from cinder import objects
from cinder import test
+from cinder.tests.unit import fake_constants as fake
from cinder import utils
CONF = cfg.CONF
FAKE_BACKUP_SHARE = '%s:/%s' % (FAKE_HOST, FAKE_EXPORT_PATH)
FAKE_BACKUP_PATH = os.path.join(FAKE_BACKUP_MOUNT_POINT_BASE,
FAKE_EXPORT_PATH)
-
-FAKE_BACKUP_ID_PART1 = 'de'
-FAKE_BACKUP_ID_PART2 = 'ad'
-FAKE_BACKUP_ID_REST = 'beef-whatever'
-FAKE_BACKUP_ID = (FAKE_BACKUP_ID_PART1 + FAKE_BACKUP_ID_PART2 +
- FAKE_BACKUP_ID_REST)
+FAKE_BACKUP_ID = fake.backup_id
+FAKE_BACKUP_ID_PART1 = fake.backup_id[:2]
+FAKE_BACKUP_ID_PART2 = fake.backup_id[2:4]
+FAKE_BACKUP_ID_REST = fake.backup_id[4:]
UPDATED_CONTAINER_NAME = os.path.join(FAKE_BACKUP_ID_PART1,
FAKE_BACKUP_ID_PART2,
FAKE_BACKUP_ID)
class BackupNFSSwiftBasedTestCase(test.TestCase):
"""Test Cases for based on Swift tempest backup tests."""
- _DEFAULT_VOLUME_ID = '8d31c3aa-c5fa-467d-8819-8888887225b6'
+ _DEFAULT_VOLUME_ID = fake.volume_id
def _create_volume_db_entry(self, volume_id=_DEFAULT_VOLUME_ID):
vol = {'id': volume_id,
def _create_backup_db_entry(self,
volume_id=_DEFAULT_VOLUME_ID,
container='test-container',
- backup_id=123,
+ backup_id=fake.backup_id,
parent_id=None):
try:
'container': container,
'volume_id': volume_id,
'parent_id': parent_id,
- 'user_id': 'user-id',
- 'project_id': 'project-id',
+ 'user_id': fake.user_id,
+ 'project_id': fake.project_id,
}
return db.backup_create(self.ctxt, backup)['id']
self.volume_file.write(os.urandom(1024))
def test_backup_uncompressed(self):
- volume_id = '0adffe69-ce32-4bb0-b5e6-0000002d748d'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
def test_backup_bz2(self):
- volume_id = '057a035f-2584-4cfd-bf23-000000e39288'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
def test_backup_zlib(self):
- volume_id = '3701a9f8-effd-44b9-bf2e-000000bb99ca'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='zlib')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
def test_backup_default_container(self):
- volume_id = 'caffdc68-ef65-48af-928d-000000289076'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id,
container=None,
backup_id=FAKE_BACKUP_ID)
'_send_progress_notification')
def test_backup_default_container_notify(self, _send_progress,
_send_progress_end):
- volume_id = '170a1081-9fe2-4add-9094-000000b48877'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id,
container=None)
# If the backup_object_number_per_notification is set to 1,
CONF.set_override("backup_enable_progress_timer", False)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
CONF.set_override("backup_object_number_per_notification", 10)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertFalse(_send_progress.called)
self.assertTrue(_send_progress_end.called)
CONF.set_override("backup_enable_progress_timer", True)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
def test_backup_custom_container(self):
- volume_id = '449b8140-85b6-465e-bdf6-0000002b29c4'
+ volume_id = fake.volume_id
container_name = 'fake99'
self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(backup['container'], container_name)
def test_backup_shafile(self):
- volume_id = '1eb6325f-6666-43a2-bcdd-0000001d8dac'
+ volume_id = fake.volume_id
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container=container_name)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(backup['container'], container_name)
# Verify sha contents
len(content1['sha256s']))
def test_backup_cmp_shafiles(self):
- volume_id = '261e8c1a-0c07-41d7-923f-000000d3efb8'
+ volume_id = fake.volume_id
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(backup['container'], container_name)
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(deltabackup['container'], container_name)
# Compare shas from both files
self.assertEqual(set(content1['sha256s']), set(content2['sha256s']))
def test_backup_delta_two_objects_change(self):
- volume_id = '3f400215-e346-406c-83b0-0000009ac4fa'
+ volume_id = fake.volume_id
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(backup['container'], container_name)
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(deltabackup['container'], container_name)
content1 = service._read_sha256file(backup)
self.assertNotEqual(content1['sha256s'][20], content2['sha256s'][20])
def test_backup_delta_two_blocks_in_object_change(self):
- volume_id = '5f3f810a-2ff3-4905-aaa3-0000005814ab'
+ volume_id = fake.volume_id
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(backup['container'], container_name)
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(deltabackup['container'], container_name)
# Verify that two shas are changed at index 16 and 20
self._backup_metadata(), we want to check the process of an
exception handler.
"""
- volume_id = '26481bc2-fc85-40ae-8a4a-0000000b24e5'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
def fake_backup_metadata(self, backup, object_meta):
raise exception.BackupDriverException(message=_('fake'))
self._backup_metadata(), we want to check the process when the
second exception occurs in self.delete().
"""
- volume_id = 'ce18dbc6-65d6-49ca-8866-000000b1c05b'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
def fake_backup_metadata(self, backup, object_meta):
raise exception.BackupDriverException(message=_('fake'))
backup, self.volume_file)
def test_restore_uncompressed(self):
- volume_id = 'b6f39bd5-ad93-474b-8ee4-000000a0d11e'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
self.flags(backup_sha_block_size_bytes=32)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
with tempfile.NamedTemporaryFile() as restored_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_bz2(self):
- volume_id = '3d4f044e-dc78-49e1-891e-000000549431'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
self.flags(backup_sha_block_size_bytes=1024)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
with tempfile.NamedTemporaryFile() as restored_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_zlib(self):
- volume_id = 'ab84fe59-19a8-4c7d-9103-00000061488b'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='zlib')
self.flags(backup_sha_block_size_bytes = 1024)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
with tempfile.NamedTemporaryFile() as restored_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_delta(self):
- volume_id = '486249dc-83c6-4a02-8d65-000000d819e7'
+ volume_id = fake.volume_id
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file, True)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
with tempfile.NamedTemporaryFile() as restored_file:
- backup = objects.Backup.get_by_id(self.ctxt, 124)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.restore(backup, volume_id,
restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_delete(self):
- volume_id = '4b5c39f2-4428-473c-b85a-000000477eca'
+ volume_id = fake.volume_id
self._create_backup_db_entry(volume_id=volume_id)
service = nfs.NFSBackupDriver(self.ctxt)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.delete(backup)
def test_get_compressor(self):
from cinder.backup.drivers import posix
from cinder import context
from cinder import test
+from cinder.tests.unit import fake_constants as fake
FAKE_FILE_SIZE = 52428800
FAKE_BACKUP_ENABLE_PROGRESS_TIMER = True
FAKE_CONTAINER = 'fake/container'
-FAKE_BACKUP_ID_PART1 = 'de'
-FAKE_BACKUP_ID_PART2 = 'ad'
-FAKE_BACKUP_ID_REST = 'beef-whatever'
-FAKE_BACKUP_ID = (FAKE_BACKUP_ID_PART1 + FAKE_BACKUP_ID_PART2 +
- FAKE_BACKUP_ID_REST)
+FAKE_BACKUP_ID = fake.backup_id
+FAKE_BACKUP_ID_PART1 = fake.backup_id[:2]
+FAKE_BACKUP_ID_PART2 = fake.backup_id[2:4]
+FAKE_BACKUP_ID_REST = fake.backup_id[4:]
FAKE_BACKUP = {'id': FAKE_BACKUP_ID, 'container': None}
UPDATED_CONTAINER_NAME = os.path.join(FAKE_BACKUP_ID_PART1,
from cinder import objects
from cinder import test
from cinder.tests.unit import fake_backup
+from cinder.tests.unit import fake_constants as fake
class BackupRpcAPITestCase(test.TestCase):
def setUp(self):
super(BackupRpcAPITestCase, self).setUp()
- self.context = context.RequestContext('fake_user', 'fake_project')
+ self.context = context.RequestContext(fake.user_id, fake.project_id)
self.fake_backup_obj = fake_backup.fake_backup_obj(self.context)
def _test_backup_api(self, method, rpc_method, server=None, fanout=False,
server='fake_volume_host',
volume_host='fake_volume_host',
backup=self.fake_backup_obj,
- volume_id='fake_volume_id',
+ volume_id=fake.volume_id,
version='1.1')
def test_delete_backup(self):
from cinder import objects
from cinder.objects import fields as c_fields
+from cinder.tests.unit import fake_constants as fake
def fake_db_backup(**updates):
db_backup = {
- 'id': 1,
- 'user_id': 'fake_user',
- 'project_id': 'fake_project',
- 'volume_id': 'fake_id',
+ 'id': fake.backup_id,
+ 'user_id': fake.user_id,
+ 'project_id': fake.project_id,
+ 'volume_id': fake.volume_id,
'status': c_fields.BackupStatus.CREATING,
'host': 'fake_host',
'display_name': 'fake_name',
attachment_id = '4dc3bb12-ad75-41b9-ab2c-7609e743e600'
backup_id = '707844eb-6d8a-4ac1-8b98-618e1c0b3a3a'
+backup2_id = '40e8462a-c9d8-462f-a810-b732a1790535'
+backup3_id = '30ae7641-017e-4221-a642-855687c8bd71'
cgsnapshot_id = '5e34cce3-bc97-46b7-a127-5cfb95ef445d'
cgsnapshot2_id = '5c36d762-d6ba-4f04-bd07-88a298cc410a'
cgsnapshot3_id = '5f392156-fc03-492a-9cb8-e46a7eedaf33'
# under the License.
"""Tests for Backup code."""
+import copy
import ddt
import tempfile
import uuid
self.backup_mgr = importutils.import_object(CONF.backup_manager)
self.backup_mgr.host = 'testhost'
self.ctxt = context.get_admin_context()
+
paths = ['cinder.volume.rpcapi.VolumeAPI.delete_snapshot',
'cinder.volume.rpcapi.VolumeAPI.delete_volume',
'cinder.volume.rpcapi.VolumeAPI.detach_volume',
self.volume_mocks[name] = self.volume_patches[name].start()
self.addCleanup(self.volume_patches[name].stop)
- def _create_backup_db_entry(self, volume_id=1, restore_volume_id=None,
+ def _create_backup_db_entry(self, volume_id=str(uuid.uuid4()),
+ restore_volume_id=None,
display_name='test_backup',
display_description='this is a test backup',
container='volumebackups',
status=fields.BackupStatus.CREATING,
size=1,
object_count=0,
- project_id='fake',
+ project_id=str(uuid.uuid4()),
service=None,
temp_volume_id=None,
temp_snapshot_id=None):
kwargs = {}
kwargs['volume_id'] = volume_id
kwargs['restore_volume_id'] = restore_volume_id
- kwargs['user_id'] = 'fake'
+ kwargs['user_id'] = str(uuid.uuid4())
kwargs['project_id'] = project_id
kwargs['host'] = 'testhost'
kwargs['availability_zone'] = '1'
vol = {}
vol['size'] = size
vol['host'] = 'testhost'
- vol['user_id'] = 'fake'
- vol['project_id'] = 'fake'
+ vol['user_id'] = str(uuid.uuid4())
+ vol['project_id'] = str(uuid.uuid4())
vol['status'] = status
vol['display_name'] = display_name
vol['display_description'] = display_description
display_description='test snapshot',
status='available',
size=1,
- volume_id='1',
+ volume_id=str(uuid.uuid4()),
provider_location=None):
"""Create a snapshot entry in the DB.
kwargs = {}
kwargs['size'] = size
kwargs['host'] = 'testhost'
- kwargs['user_id'] = 'fake'
- kwargs['project_id'] = 'fake'
+ kwargs['user_id'] = str(uuid.uuid4())
+ kwargs['project_id'] = str(uuid.uuid4())
kwargs['status'] = status
kwargs['display_name'] = display_name
kwargs['display_description'] = display_description
return export
def _create_export_record_db_entry(self,
- volume_id='0000',
+ volume_id=str(uuid.uuid4()),
status=fields.BackupStatus.CREATING,
- project_id='fake',
+ project_id=str(uuid.uuid4()),
backup_id=None):
"""Create a backup entry in the DB.
"""
kwargs = {}
kwargs['volume_id'] = volume_id
- kwargs['user_id'] = 'fake'
+ kwargs['user_id'] = str(uuid.uuid4())
kwargs['project_id'] = project_id
kwargs['status'] = status
if backup_id:
def test_cleanup_incomplete_backup_operations_with_exceptions(self):
"""Test cleanup resilience in the face of exceptions."""
- fake_backup_list = [{'id': 'bkup1'}, {'id': 'bkup2'}, {'id': 'bkup3'}]
+ fake_backup_list = [{'id': str(uuid.uuid4())},
+ {'id': str(uuid.uuid4())},
+ {'id': str(uuid.uuid4())}]
mock_backup_get_by_host = self.mock_object(
objects.BackupList, 'get_all_by_host')
mock_backup_get_by_host.return_value = fake_backup_list
fake_attachments = [
{
- 'id': 'attachment1',
+ 'id': str(uuid.uuid4()),
'attached_host': 'testhost',
'instance_uuid': None,
},
{
- 'id': 'attachment2',
+ 'id': str(uuid.uuid4()),
'attached_host': 'testhost',
'instance_uuid': None,
}
]
fake_volume = {
- 'id': 'fake_volume_id',
+ 'id': str(uuid.uuid4()),
'volume_attachment': fake_attachments
}
vol1_id = self._create_volume_db_entry()
self._create_volume_attach(vol1_id)
db.volume_update(self.ctxt, vol1_id, {'status': 'backing-up'})
- backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
- volume_id=vol1_id,
- temp_snapshot_id='fake')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.ERROR,
+ volume_id=vol1_id,
+ temp_snapshot_id=str(uuid.uuid4()))
self.assertIsNone(
self.backup_mgr._cleanup_temp_volumes_snapshots_for_one_backup(
db.volume_update(self.ctxt, vol1_id, {'status': 'backing-up'})
backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=vol1_id,
- temp_volume_id='fake')
+ temp_volume_id=str(uuid.uuid4()))
self.assertIsNone(
self.backup_mgr._cleanup_temp_volumes_snapshots_for_one_backup(
backup = self._create_backup_db_entry(volume_id=vol_id)
mock_run_backup = self.mock_object(self.backup_mgr, '_run_backup')
- mock_run_backup.side_effect = FakeBackupException('fake')
+ mock_run_backup.side_effect = FakeBackupException(str(uuid.uuid4()))
self.assertRaises(FakeBackupException,
self.backup_mgr.create_backup,
self.ctxt,
self.assertEqual(2, notify.call_count)
def test_list_backup(self):
- backups = db.backup_get_all_by_project(self.ctxt, 'project1')
+ project_id = str(uuid.uuid4())
+ backups = db.backup_get_all_by_project(self.ctxt, project_id)
self.assertEqual(0, len(backups))
self._create_backup_db_entry()
- b2 = self._create_backup_db_entry(project_id='project1')
- backups = db.backup_get_all_by_project(self.ctxt, 'project1')
+ b2 = self._create_backup_db_entry(project_id=project_id)
+ backups = db.backup_get_all_by_project(self.ctxt, project_id)
self.assertEqual(1, len(backups))
self.assertEqual(b2.id, backups[0].id)
Test deleted backups don't show up in backup_get_all_by_project.
Unless context.read_deleted is 'yes'.
"""
- backups = db.backup_get_all_by_project(self.ctxt, 'fake')
+ project_id = str(uuid.uuid4())
+ backups = db.backup_get_all_by_project(self.ctxt, project_id)
self.assertEqual(0, len(backups))
- backup_keep = self._create_backup_db_entry()
- backup = self._create_backup_db_entry()
+ backup_keep = self._create_backup_db_entry(project_id=project_id)
+ backup = self._create_backup_db_entry(project_id=project_id)
db.backup_destroy(self.ctxt, backup.id)
- backups = db.backup_get_all_by_project(self.ctxt, 'fake')
+ backups = db.backup_get_all_by_project(self.ctxt, project_id)
self.assertEqual(1, len(backups))
self.assertEqual(backup_keep.id, backups[0].id)
ctxt_read_deleted = context.get_admin_context('yes')
- backups = db.backup_get_all_by_project(ctxt_read_deleted, 'fake')
+ backups = db.backup_get_all_by_project(ctxt_read_deleted, project_id)
self.assertEqual(2, len(backups))
def test_backup_get_all_by_host_with_deleted(self):
@mock.patch.object(objects, 'BackupList')
def test_get_all_true_value_all_tenants_non_admin(self, mock_backuplist):
- ctxt = context.RequestContext('fake', 'fake')
+ ctxt = context.RequestContext(uuid.uuid4(), uuid.uuid4())
result = self.api.get_all(ctxt, {'all_tenants': '1',
'key': 'value'})
self.assertFalse(mock_backuplist.get_all.called)
def test_create_when_failed_to_create_backup_object(
self, mock_create,
mock_get_service):
+
+ # Create volume in admin context
volume_id = utils.create_volume(self.ctxt)['id']
- self.ctxt.user_id = 'user_id'
- self.ctxt.project_id = 'project_id'
+
+ # Will try to backup from a different context
+ new_context = copy.copy(self.ctxt)
+ new_context.user_id = uuid.uuid4()
+ new_context.project_id = uuid.uuid4()
# The opposite side of this test case is a "NotImplementedError:
# Cannot load 'id' in the base class" being raised.
# in the except clause, backup.destroy() is invoked to do cleanup,
# which internally tries to access backup.id.
self.assertRaises(db_exc.DBError, self.api.create,
- context=self.ctxt,
+ context=new_context,
name="test_backup",
description="test backup description",
volume_id=volume_id,
def test_create_when_failed_to_new_backup_object(self, mock_new,
mock_get_service):
volume_id = utils.create_volume(self.ctxt)['id']
- self.ctxt.user_id = 'user_id'
- self.ctxt.project_id = 'project_id'
# The opposite side of this test case is that a "UnboundLocalError:
# local variable 'backup' referenced before assignment" is raised.
def test_restore_volume(self,
mock_rpcapi_restore,
mock_is_service_enabled):
- ctxt = context.RequestContext('fake', 'fake')
volume_id = self._create_volume_db_entry(status='available',
size=1)
backup = self._create_backup_db_entry(size=1,
status='available')
mock_is_service_enabled.return_value = True
- self.api.restore(ctxt, backup.id, volume_id)
- backup = objects.Backup.get_by_id(ctxt, backup.id)
+ self.api.restore(self.ctxt, backup.id, volume_id)
+ backup = objects.Backup.get_by_id(self.ctxt, backup.id)
self.assertEqual(volume_id, backup.restore_volume_id)
return db.volume_create(self.ctxt, vol)['id']
def _create_backup_db_entry(self, backupid, volid, size,
- userid='user-id', projectid='project-id'):
+ userid=str(uuid.uuid4()),
+ projectid=str(uuid.uuid4())):
backup = {'id': backupid, 'size': size, 'volume_id': volid,
'user_id': userid, 'project_id': projectid}
return db.backup_create(self.ctxt, backup)['id']
return db.volume_create(self.ctxt, vol)['id']
def _create_backup_db_entry(self, backupid, volid, size,
- userid='user-id', projectid='project-id'):
+ userid=str(uuid.uuid4()),
+ projectid=str(uuid.uuid4())):
backup = {'id': backupid, 'size': size, 'volume_id': volid,
'user_id': userid, 'project_id': projectid}
return db.backup_create(self.ctxt, backup)['id']
from cinder import test
from cinder.tests.unit.backup import fake_google_client
from cinder.tests.unit.backup import fake_google_client2
+from cinder.tests.unit import fake_constants as fake
class FakeMD5(object):
'container': container,
'volume_id': volume_id,
'parent_id': parent_id,
- 'user_id': 'user-id',
- 'project_id': 'project-id',
+ 'user_id': fake.user_id,
+ 'project_id': fake.project_id,
'service_metadata': service_metadata,
}
backup = objects.Backup(context=self.ctxt, **kwargs)
from cinder import test
from cinder.tests.unit.backup import fake_swift_client
from cinder.tests.unit.backup import fake_swift_client2
+from cinder.tests.unit import fake_constants as fake
CONF = cfg.CONF
def _create_backup_db_entry(self,
volume_id=_DEFAULT_VOLUME_ID,
container='test-container',
- backup_id=123, parent_id=None,
+ backup_id=fake.backup_id, parent_id=None,
service_metadata=None):
try:
'container': container,
'volume_id': volume_id,
'parent_id': parent_id,
- 'user_id': 'user-id',
- 'project_id': 'project-id',
+ 'user_id': fake.user_id,
+ 'project_id': fake.project_id,
'service_metadata': service_metadata,
}
return db.backup_create(self.ctxt, backup)['id']
u'endpoints': [{
u'publicURL':
u'http://example.com'}]}]
- self.ctxt.project_id = "12345678"
+ self.ctxt.project_id = fake.project_id
self.override_config("backup_swift_url",
"http://public.example.com/")
backup = swift_dr.SwiftBackupDriver(self.ctxt)
u'endpoints': [{
u'adminURL':
u'http://example.com'}]}]
- self.ctxt.project_id = "12345678"
+ self.ctxt.project_id = fake.project_id
self.override_config("backup_swift_auth_url",
"http://public.example.com/")
backup = swift_dr.SwiftBackupDriver(self.ctxt)
self.flags(backup_compression_algorithm='none')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
def test_backup_bz2(self):
self.flags(backup_compression_algorithm='bz2')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
def test_backup_zlib(self):
self.flags(backup_compression_algorithm='zlib')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
@mock.patch.object(db, 'backup_update', wraps=db.backup_update)
container=None)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual('volumebackups', backup['container'])
self.assertEqual(3, backup_update_mock.call_count)
container='existing_name')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual('existing_name', backup['container'])
# Make sure we are not making a DB update when we are using the same
# value that's already in the DB.
container=None)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
with mock.patch.object(service, 'update_container_name',
return_value='driver_name'):
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual('driver_name', backup['container'])
self.assertEqual(3, backup_update_mock.call_count)
CONF.set_override("backup_swift_enable_progress_timer", False)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
CONF.set_override("backup_object_number_per_notification", 10)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertFalse(_send_progress.called)
self.assertTrue(_send_progress_end.called)
CONF.set_override("backup_swift_enable_progress_timer", True)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
container=container_name)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(container_name, backup['container'])
def test_backup_shafile(self):
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(container_name, backup['container'])
# Verify sha contents
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(container_name, backup['container'])
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id= fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(container_name, deltabackup['container'])
# Compare shas from both files
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(container_name, backup['container'])
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(container_name, deltabackup['container'])
content1 = service._read_sha256file(backup)
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertEqual(container_name, backup['container'])
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.assertEqual(container_name, deltabackup['container'])
# Verify that two shas are changed at index 16 and 20
container=container_name)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertRaises(exception.SwiftConnectionFailed,
service.backup,
backup, self.volume_file)
self.flags(backup_compression_algorithm='none')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
def fake_backup_metadata(self, backup, object_meta):
raise exception.BackupDriverException(message=_('fake'))
self.flags(backup_compression_algorithm='none')
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
def fake_backup_metadata(self, backup, object_meta):
raise exception.BackupDriverException(message=_('fake'))
service = swift_dr.SwiftBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as volume_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.restore(backup, volume_id, volume_file)
def test_restore_delta(self):
'', 1)
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=123)
+ backup_id=fake.backup_id)
self.stubs.Set(swift, 'Connection',
fake_swift_client2.FakeSwiftClient2.Connection)
service = swift_dr.SwiftBackupDriver(self.ctxt)
self.volume_file.seek(0)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.backup(backup, self.volume_file)
# Create incremental backup with no change to contents
self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
- backup_id=124,
- parent_id=123)
+ backup_id=fake.backup2_id,
+ parent_id=fake.backup_id)
self.volume_file.seek(0)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.backup(deltabackup, self.volume_file, True)
- deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
+ deltabackup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
with tempfile.NamedTemporaryFile() as restored_file:
- backup = objects.Backup.get_by_id(self.ctxt, 124)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
service.restore(backup, volume_id,
restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
service = swift_dr.SwiftBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as volume_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertRaises(exception.SwiftConnectionFailed,
service.restore,
backup, volume_id, volume_file)
service = swift_dr.SwiftBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as volume_file:
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertRaises(exception.InvalidBackup,
service.restore,
backup, volume_id, volume_file)
self._create_backup_db_entry(volume_id=volume_id,
service_metadata=object_prefix)
service = swift_dr.SwiftBackupDriver(self.ctxt)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.delete(backup)
def test_delete_wraps_socket_error(self):
container=container_name,
service_metadata=object_prefix)
service = swift_dr.SwiftBackupDriver(self.ctxt)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertRaises(exception.SwiftConnectionFailed,
service.delete,
backup)
self._create_backup_db_entry(volume_id=volume_id)
service = swift_dr.SwiftBackupDriver(self.ctxt)
- backup = objects.Backup.get_by_id(self.ctxt, 123)
+ backup = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
service.delete(backup)
def test_get_compressor(self):
from cinder import exception
from cinder import objects
from cinder import test
+from cinder.tests.unit import fake_constants as fake
from cinder import utils
SIM = None
backup = {'id': backup_id,
'size': 1,
'container': 'test-container',
- 'volume_id': '1234-5678-1234-8888',
+ 'volume_id': fake.volume_id,
'service_metadata': service_metadata,
- 'user_id': 'user-id',
- 'project_id': 'project-id',
+ 'user_id': fake.user_id,
+ 'project_id': fake.project_id,
}
return db.backup_create(self.ctxt, backup)['id']
def test_backup_image(self):
- volume_id = '1234-5678-1234-7777'
+ volume_id = fake.volume_id
mode = 'image'
self._create_volume_db_entry(volume_id)
- backup_id1 = 123
- backup_id2 = 456
- backup_id3 = 666
+ backup_id1 = fake.backup_id
+ backup_id2 = fake.backup2_id
+ backup_id3 = fake.backup3_id
self._create_backup_db_entry(backup_id1, mode)
self._create_backup_db_entry(backup_id2, mode)
self._create_backup_db_entry(backup_id3, mode)
self.driver.delete(backup1)
def test_backup_file(self):
- volume_id = '1234-5678-1234-8888'
+ volume_id = fake.volume_id
mode = 'file'
self.stubs.Set(os, 'stat', fake_stat_file)
self._create_volume_db_entry(volume_id)
- backup_id1 = 123
- backup_id2 = 456
- self._create_backup_db_entry(backup_id1, mode)
- self._create_backup_db_entry(backup_id2, mode)
+ self._create_backup_db_entry(fake.backup_id, mode)
+ self._create_backup_db_entry(fake.backup2_id, mode)
with open(VOLUME_PATH, 'w+') as volume_file:
# Create two backups of the volume
- backup1 = objects.Backup.get_by_id(self.ctxt, 123)
+ backup1 = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.driver.backup(backup1, volume_file)
- backup2 = objects.Backup.get_by_id(self.ctxt, 456)
+ backup2 = objects.Backup.get_by_id(self.ctxt, fake.backup2_id)
self.driver.backup(backup2, volume_file)
# Create a backup that fails
- self._create_backup_db_entry(666, mode)
- fail_back = objects.Backup.get_by_id(self.ctxt, 666)
+ self._create_backup_db_entry(fake.backup3_id, mode)
+ fail_back = objects.Backup.get_by_id(self.ctxt, fake.backup3_id)
self.sim.error_injection('backup', 'fail')
self.assertRaises(exception.InvalidBackup,
self.driver.backup, fail_back, volume_file)
self.driver.delete(backup2)
def test_backup_invalid_mode(self):
- volume_id = '1234-5678-1234-9999'
+ volume_id = fake.volume_id
mode = 'illegal'
self.stubs.Set(os, 'stat', fake_stat_illegal)
self._create_volume_db_entry(volume_id)
- backup_id1 = 123
- self._create_backup_db_entry(backup_id1, mode)
+ self._create_backup_db_entry(fake.backup_id, mode)
with open(VOLUME_PATH, 'w+') as volume_file:
# Create two backups of the volume
- backup1 = objects.Backup.get_by_id(self.ctxt, 123)
+ backup1 = objects.Backup.get_by_id(self.ctxt, fake.backup_id)
self.assertRaises(exception.InvalidBackup,
self.driver.backup, backup1, volume_file)