from cinder import units
from cinder import utils
from cinder.volume import configuration as conf
+from cinder.volume import driver as base_driver
from cinder.volume.drivers import glusterfs
return self.fields[item]
+class FakeDb(object):
+ msg = "Tests are broken: mock this out."
+
+ def volume_get(self, *a, **kw):
+ raise Exception(self.msg)
+
+ def snapshot_get_all_for_volume(self, *a, **kw):
+ """Mock this if you want results from it."""
+ return []
+
+
class GlusterFsDriverTestCase(test.TestCase):
"""Test case for GlusterFS driver."""
self.stubs = stubout.StubOutForTesting()
self._driver =\
- glusterfs.GlusterfsDriver(configuration=self._configuration)
+ glusterfs.GlusterfsDriver(configuration=self._configuration,
+ db=FakeDb())
self._driver.shares = {}
def tearDown(self):
self.assertEqual(drv._get_mount_point_base(),
self.TEST_MNT_POINT_BASE)
+
+ def test_backup_volume(self):
+ """Backup a volume with no snapshots."""
+
+ (mox, drv) = self._mox, self._driver
+
+ mox.StubOutWithMock(drv, '_qemu_img_info')
+ mox.StubOutWithMock(drv.db, 'volume_get')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
+ mox.StubOutWithMock(drv, '_read_info_file')
+ mox.StubOutWithMock(drv, 'get_active_image_from_info')
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ volume = self._simple_volume()
+ backup = {'volume_id': volume['id']}
+
+ drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn({})
+ drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/path')
+
+ info = imageutils.QemuImgInfo()
+ info.file_format = 'raw'
+
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+ drv._qemu_img_info(IgnoreArg()).AndReturn(info)
+
+ base_driver.VolumeDriver.backup_volume(IgnoreArg(),
+ IgnoreArg(),
+ IgnoreArg())
+
+ mox.ReplayAll()
+
+ drv.backup_volume(ctxt, backup, IgnoreArg())
+
+ def test_backup_volume_previous_snap(self):
+ """Backup a volume that previously had a snapshot.
+
+ Snapshot was deleted, snap_info is different from above.
+ """
+
+ (mox, drv) = self._mox, self._driver
+
+ mox.StubOutWithMock(drv, '_qemu_img_info')
+ mox.StubOutWithMock(drv.db, 'volume_get')
+ mox.StubOutWithMock(drv, '_read_info_file')
+ mox.StubOutWithMock(drv, 'get_active_image_from_info')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ volume = self._simple_volume()
+ backup = {'volume_id': volume['id']}
+
+ drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn(
+ {'active': 'file2'})
+ drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/file2')
+
+ info = imageutils.QemuImgInfo()
+ info.file_format = 'raw'
+
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+ drv._qemu_img_info(IgnoreArg()).AndReturn(info)
+
+ base_driver.VolumeDriver.backup_volume(IgnoreArg(),
+ IgnoreArg(),
+ IgnoreArg())
+
+ mox.ReplayAll()
+
+ drv.backup_volume(ctxt, backup, IgnoreArg())
+
+ def test_backup_snap_failure_1(self):
+ """Backup fails if snapshot exists (database)."""
+
+ (mox, drv) = self._mox, self._driver
+ mox.StubOutWithMock(drv.db, 'snapshot_get_all_for_volume')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ volume = self._simple_volume()
+ backup = {'volume_id': volume['id']}
+
+ drv.db.snapshot_get_all_for_volume(ctxt, volume['id']).AndReturn(
+ [{'snap1': 'a'}, {'snap2': 'b'}])
+
+ base_driver.VolumeDriver.backup_volume(IgnoreArg(),
+ IgnoreArg(),
+ IgnoreArg())
+
+ mox.ReplayAll()
+
+ self.assertRaises(exception.InvalidVolume,
+ drv.backup_volume,
+ ctxt, backup, IgnoreArg())
+
+ def test_backup_snap_failure_2(self):
+ """Backup fails if snapshot exists (on-disk)."""
+
+ (mox, drv) = self._mox, self._driver
+ mox.StubOutWithMock(drv, '_read_info_file')
+ mox.StubOutWithMock(drv.db, 'volume_get')
+ mox.StubOutWithMock(drv, 'get_active_image_from_info')
+ mox.StubOutWithMock(drv, '_qemu_img_info')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ volume = self._simple_volume()
+ backup = {'volume_id': volume['id']}
+
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+
+ drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn(
+ {'id1': 'file1',
+ 'id2': 'file2',
+ 'active': 'file2'})
+
+ drv.get_active_image_from_info(IgnoreArg()).\
+ AndReturn('/some/path/file2')
+
+ info = imageutils.QemuImgInfo()
+ info.file_format = 'raw'
+ info.backing_file = 'file1'
+
+ drv._qemu_img_info(IgnoreArg()).AndReturn(info)
+
+ base_driver.VolumeDriver.backup_volume(IgnoreArg(),
+ IgnoreArg(),
+ IgnoreArg())
+
+ mox.ReplayAll()
+
+ self.assertRaises(exception.InvalidVolume,
+ drv.backup_volume,
+ ctxt, backup, IgnoreArg())
+
+ def test_backup_failure_unsupported_format(self):
+ """Attempt to backup a volume with a qcow2 base."""
+
+ (mox, drv) = self._mox, self._driver
+
+ mox.StubOutWithMock(drv, '_qemu_img_info')
+ mox.StubOutWithMock(drv.db, 'volume_get')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
+ mox.StubOutWithMock(drv, '_read_info_file')
+ mox.StubOutWithMock(drv, 'get_active_image_from_info')
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ volume = self._simple_volume()
+ backup = {'volume_id': volume['id']}
+
+ drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn({})
+ drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/path')
+
+ info = imageutils.QemuImgInfo()
+ info.file_format = 'qcow2'
+
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+ drv._qemu_img_info(IgnoreArg()).AndReturn(info)
+
+ base_driver.VolumeDriver.backup_volume(IgnoreArg(),
+ IgnoreArg(),
+ IgnoreArg())
+
+ mox.ReplayAll()
+
+ self.assertRaises(exception.InvalidVolume,
+ drv.backup_volume,
+ ctxt, backup, IgnoreArg())
return self.base
def backup_volume(self, context, backup, backup_service):
- """Create a new backup from an existing volume."""
- raise NotImplementedError()
+ """Create a new backup from an existing volume.
- def restore_backup(self, context, backup, volume, backup_service):
- """Restore an existing backup to a new or existing volume."""
- raise NotImplementedError()
+ Allow a backup to occur only if no snapshots exist.
+ Check both Cinder and the file on-disk. The latter is only
+ a safety mechanism to prevent further damage if the snapshot
+ information is already inconsistent.
+ """
+
+ snapshots = self.db.snapshot_get_all_for_volume(context,
+ backup['volume_id'])
+ snap_error_msg = _('Backup is not supported for GlusterFS '
+ 'volumes with snapshots.')
+ if len(snapshots) > 0:
+ raise exception.InvalidVolume(snap_error_msg)
+
+ volume = self.db.volume_get(context, backup['volume_id'])
+
+ volume_dir = self._local_volume_dir(volume)
+ active_file_path = os.path.join(
+ volume_dir,
+ self.get_active_image_from_info(volume))
+
+ info = self._qemu_img_info(active_file_path)
+
+ if info.backing_file is not None:
+ msg = _('No snapshots found in database, but '
+ '%(path)s has backing file '
+ '%(backing_file)s!') % {'path': active_file_path,
+ 'backing_file': info.backing_file}
+ LOG.error(msg)
+ raise exception.InvalidVolume(snap_error_msg)
+
+ if info.file_format != 'raw':
+ msg = _('Backup is only supported for raw-formatted '
+ 'GlusterFS volumes.')
+ raise exception.InvalidVolume(msg)
+
+ return super(GlusterfsDriver, self).backup_volume(
+ context, backup, backup_service)