# create by the backup job are deleted when service is started.
ctxt = context.get_admin_context()
for backup in backups:
- volume = self.db.volume_get(ctxt, backup.volume_id)
- volume_host = volume_utils.extract_host(volume['host'], 'backend')
- backend = self._get_volume_backend(host=volume_host)
- mgr = self._get_manager(backend)
+ try:
+ volume = self.db.volume_get(ctxt, backup.volume_id)
+ volume_host = volume_utils.extract_host(volume['host'],
+ 'backend')
+ backend = self._get_volume_backend(host=volume_host)
+ mgr = self._get_manager(backend)
+ except (KeyError, exception.VolumeNotFound):
+ LOG.debug("Could not find a volume to clean up for "
+ "backup %s.", backup.id)
+ continue
if backup.temp_volume_id and backup.status == 'error':
temp_volume = self.db.volume_get(ctxt,
backup.temp_volume_id)
"""
+import ddt
import tempfile
import mock
return backup
+@ddt.ddt
class BackupTestCase(BaseBackupTest):
"""Test Case for backups."""
self.assertTrue(mock_delete_volume.called)
self.assertTrue(mock_delete_snapshot.called)
+ @mock.patch.object(db, 'volume_get')
+ @ddt.data(KeyError, exception.VolumeNotFound)
+ def test_cleanup_temp_volumes_snapshots(self,
+ err,
+ mock_volume_get):
+ """Ensure we handle missing volume for a backup."""
+ mock_volume_get.side_effect = [err]
+
+ backup1 = self._create_backup_db_entry(status='creating')
+ backups = [backup1]
+
+ self.assertIsNone(self.backup_mgr._cleanup_temp_volumes_snapshots(
+ backups))
+
def test_create_backup_with_bad_volume_status(self):
"""Test creating a backup from a volume with a bad status."""
vol_id = self._create_volume_db_entry(status='restoring', size=1)