class BackupNFSSwiftBasedTestCase(test.TestCase):
"""Test Cases for based on Swift tempest backup tests."""
- def _create_volume_db_entry(self):
- vol = {'id': '1234-5678-1234-8888',
+
+ _DEFAULT_VOLUME_ID = '8d31c3aa-c5fa-467d-8819-8888887225b6'
+
+ def _create_volume_db_entry(self, volume_id=_DEFAULT_VOLUME_ID):
+ vol = {'id': volume_id,
'size': 1,
'status': 'available'}
return db.volume_create(self.ctxt, vol)['id']
- def _create_backup_db_entry(self, container='test-container',
- backup_id=123, parent_id=None):
+ def _create_backup_db_entry(self,
+ volume_id=_DEFAULT_VOLUME_ID,
+ container='test-container',
+ backup_id=123,
+ parent_id=None):
+
+ try:
+ db.volume_get(self.ctxt, volume_id)
+ except exception.NotFound:
+ self._create_volume_db_entry(volume_id=volume_id)
+
backup = {'id': backup_id,
'size': 1,
'container': container,
- 'volume_id': '1234-5678-1234-8888',
+ 'volume_id': volume_id,
'parent_id': parent_id,
'user_id': 'user-id',
'project_id': 'project-id',
self.ctxt = context.get_admin_context()
self.stubs.Set(hashlib, 'md5', fake_md5)
- self._create_volume_db_entry()
self.volume_file = tempfile.NamedTemporaryFile()
self.temp_dir = tempfile.mkdtemp()
self.addCleanup(self.volume_file.close)
self.volume_file.write(os.urandom(1024))
def test_backup_uncompressed(self):
- self._create_backup_db_entry()
+ volume_id = '0adffe69-ce32-4bb0-b5e6-0000002d748d'
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
def test_backup_bz2(self):
- self._create_backup_db_entry()
+ volume_id = '057a035f-2584-4cfd-bf23-000000e39288'
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
def test_backup_zlib(self):
- self._create_backup_db_entry()
+ volume_id = '3701a9f8-effd-44b9-bf2e-000000bb99ca'
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='zlib')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
def test_backup_default_container(self):
- self._create_backup_db_entry(container=None,
+ volume_id = 'caffdc68-ef65-48af-928d-000000289076'
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=None,
backup_id=FAKE_BACKUP_ID)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
'_send_progress_notification')
def test_backup_default_container_notify(self, _send_progress,
_send_progress_end):
- self._create_backup_db_entry(container=None)
+ volume_id = '170a1081-9fe2-4add-9094-000000b48877'
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=None)
# If the backup_object_number_per_notification is set to 1,
# the _send_progress method will be called for sure.
CONF.set_override("backup_object_number_per_notification", 1)
self.assertTrue(_send_progress_end.called)
def test_backup_custom_container(self):
+ volume_id = '449b8140-85b6-465e-bdf6-0000002b29c4'
container_name = 'fake99'
- self._create_backup_db_entry(container=container_name)
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup = objects.Backup.get_by_id(self.ctxt, 123)
self.assertEqual(backup['container'], container_name)
def test_backup_shafile(self):
+ volume_id = '1eb6325f-6666-43a2-bcdd-0000001d8dac'
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
- self._create_backup_db_entry(container=container_name)
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup = objects.Backup.get_by_id(self.ctxt, 123)
len(content1['sha256s']))
def test_backup_cmp_shafiles(self):
+ volume_id = '261e8c1a-0c07-41d7-923f-000000d3efb8'
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
- self._create_backup_db_entry(container=container_name,
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
backup_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
self.assertEqual(backup['container'], container_name)
# Create incremental backup with no change to contents
- self._create_backup_db_entry(container=container_name, backup_id=124,
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=124,
parent_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
self.assertEqual(set(content1['sha256s']), set(content2['sha256s']))
def test_backup_delta_two_objects_change(self):
+ volume_id = '3f400215-e346-406c-83b0-0000009ac4fa'
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
- self._create_backup_db_entry(container=container_name, backup_id=123)
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup = objects.Backup.get_by_id(self.ctxt, 123)
self.volume_file.seek(20 * 1024)
self.volume_file.write(os.urandom(1024))
- self._create_backup_db_entry(container=container_name, backup_id=124,
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=124,
parent_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
self.assertNotEqual(content1['sha256s'][20], content2['sha256s'][20])
def test_backup_delta_two_blocks_in_object_change(self):
+ volume_id = '5f3f810a-2ff3-4905-aaa3-0000005814ab'
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
- self._create_backup_db_entry(container=container_name, backup_id=123)
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup = objects.Backup.get_by_id(self.ctxt, 123)
self.volume_file.seek(20 * 1024)
self.volume_file.write(os.urandom(1024))
- self._create_backup_db_entry(container=container_name, backup_id=124,
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=124,
parent_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
self._backup_metadata(), we want to check the process of an
exception handler.
"""
- self._create_backup_db_entry()
+ volume_id = '26481bc2-fc85-40ae-8a4a-0000000b24e5'
+
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
self._backup_metadata(), we want to check the process when the
second exception occurs in self.delete().
"""
- self._create_backup_db_entry()
+ volume_id = 'ce18dbc6-65d6-49ca-8866-000000b1c05b'
+
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup, self.volume_file)
def test_restore_uncompressed(self):
- self._create_backup_db_entry()
+ volume_id = 'b6f39bd5-ad93-474b-8ee4-000000a0d11e'
+
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
self.flags(backup_sha_block_size_bytes=32)
service = nfs.NFSBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as restored_file:
backup = objects.Backup.get_by_id(self.ctxt, 123)
- service.restore(backup, '1234-5678-1234-8888', restored_file)
+ service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_bz2(self):
- self._create_backup_db_entry()
+ volume_id = '3d4f044e-dc78-49e1-891e-000000549431'
+
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
self.flags(backup_file_size=(1024 * 3))
self.flags(backup_sha_block_size_bytes=1024)
with tempfile.NamedTemporaryFile() as restored_file:
backup = objects.Backup.get_by_id(self.ctxt, 123)
- service.restore(backup, '1234-5678-1234-8888', restored_file)
+ service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_zlib(self):
- self._create_backup_db_entry()
+ volume_id = 'ab84fe59-19a8-4c7d-9103-00000061488b'
+
+ self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='zlib')
self.flags(backup_file_size=(1024 * 3))
self.flags(backup_sha_block_size_bytes = 1024)
with tempfile.NamedTemporaryFile() as restored_file:
backup = objects.Backup.get_by_id(self.ctxt, 123)
- service.restore(backup, '1234-5678-1234-8888', restored_file)
+ service.restore(backup, volume_id, restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_restore_delta(self):
+ volume_id = '486249dc-83c6-4a02-8d65-000000d819e7'
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
- self._create_backup_db_entry(container=container_name, backup_id=123)
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=123)
service = nfs.NFSBackupDriver(self.ctxt)
self.volume_file.seek(0)
backup = objects.Backup.get_by_id(self.ctxt, 123)
self.volume_file.seek(20 * 1024)
self.volume_file.write(os.urandom(1024))
- self._create_backup_db_entry(container=container_name, backup_id=124,
+ self._create_backup_db_entry(volume_id=volume_id,
+ container=container_name,
+ backup_id=124,
parent_id=123)
self.volume_file.seek(0)
deltabackup = objects.Backup.get_by_id(self.ctxt, 124)
with tempfile.NamedTemporaryFile() as restored_file:
backup = objects.Backup.get_by_id(self.ctxt, 124)
- service.restore(backup, '1234-5678-1234-8888',
+ service.restore(backup, volume_id,
restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
def test_delete(self):
- self._create_backup_db_entry()
+ volume_id = '4b5c39f2-4428-473c-b85a-000000477eca'
+ self._create_backup_db_entry(volume_id=volume_id)
service = nfs.NFSBackupDriver(self.ctxt)
backup = objects.Backup.get_by_id(self.ctxt, 123)
service.delete(backup)