import mock
import os
import tempfile
+import time
+import traceback
import mox as mox_lib
from mox import IgnoreArg
stub = mox_lib.MockObject(attr_to_replace)
self.stubs.Set(obj, attr_name, stub)
+ def assertRaisesAndMessageMatches(
+ self, excClass, msg, callableObj, *args, **kwargs):
+ """Ensure that 'excClass' was raised and its message contains 'msg'."""
+
+ caught = False
+ try:
+ callableObj(*args, **kwargs)
+ except Exception as exc:
+ caught = True
+ self.assertEqual(excClass, type(exc),
+ 'Wrong exception caught: %s Stacktrace: %s' %
+ (exc, traceback.print_exc()))
+ self.assertIn(msg, str(exc))
+
+ if not caught:
+ self.fail('Expected raised exception but nothing caught.')
+
def test_set_execute(self):
mox = self._mox
drv = self._driver
drv.set_execute(my_execute)
+ mox.VerifyAll()
+
def test_local_path(self):
"""local_path common use case."""
CONF.set_override("glusterfs_mount_point_base",
drv._get_mount_point_for_share(self.TEST_EXPORT1)
+ mox.VerifyAll()
+
def test_get_available_capacity_with_df(self):
"""_get_available_capacity should calculate correct value."""
mox = self._mox
"""do_setup should throw error if shares config is not configured."""
drv = self._driver
- CONF.set_override("glusterfs_shares_config",
- self.TEST_SHARES_CONFIG_FILE)
+ drv.configuration.glusterfs_shares_config = None
- self.assertRaises(exception.GlusterfsException,
- drv.do_setup, IsA(context.RequestContext))
+ self.assertRaisesAndMessageMatches(exception.GlusterfsException,
+ 'no Gluster config file configured',
+ drv.do_setup,
+ IsA(context.RequestContext))
def test_setup_should_throw_exception_if_client_is_not_installed(self):
"""do_setup should throw exception if client is not installed."""
mox.ReplayAll()
- self.assertRaises(exception.GlusterfsException,
- drv.do_setup, IsA(context.RequestContext))
+ self.assertRaisesAndMessageMatches(exception.GlusterfsException,
+ 'mount.glusterfs is not installed',
+ drv.do_setup,
+ IsA(context.RequestContext))
mox.VerifyAll()
mox.StubOutWithMock(image_utils, 'convert_image')
mox.StubOutWithMock(drv, '_copy_volume_from_snapshot')
- volume_file = 'volume-%s' % self.VOLUME_UUID
- volume_path = '%s/%s/%s' % (self.TEST_MNT_POINT_BASE,
- drv._get_hash_str(self.TEST_EXPORT1),
- volume_file)
-
volume = self._simple_volume()
src_vref = self._simple_volume()
src_vref['id'] = '375e32b2-804a-49f2-b282-85d1d5a5b9e1'
src_vref['name'] = 'volume-%s' % src_vref['id']
- volume_file = 'volume-%s' % src_vref['id']
- volume_path = '%s/%s/%s' % (self.TEST_MNT_POINT_BASE,
- drv._get_hash_str(self.TEST_EXPORT1),
- volume_file)
- src_info_path = '%s.info' % volume_path
volume_ref = {'id': volume['id'],
'name': volume['name'],
'status': volume['status'],
drv._create_snapshot(snap_ref)
- snap_info = {'active': volume_file,
- snap_ref['id']: volume_path + '-clone'}
-
- drv._read_info_file(src_info_path).AndReturn(snap_info)
-
drv._copy_volume_from_snapshot(snap_ref, volume_ref, volume['size'])
drv._delete_snapshot(mox_lib.IgnoreArg())
drv.create_cloned_volume(volume, src_vref)
+ mox.VerifyAll()
+
@mock.patch('cinder.openstack.common.fileutils.delete_if_exists')
def test_delete_volume(self, mock_delete_if_exists):
volume = self._simple_volume()
mox.StubOutWithMock(db, 'snapshot_get')
mox.StubOutWithMock(drv, '_write_info_file')
mox.StubOutWithMock(drv, '_nova')
+ # Stub out the busy wait.
+ self.stub_out_not_replaying(time, 'sleep')
drv._create_qcow2_snap_file(snap_ref, volume_file, snap_path)
drv._nova.create_volume_snapshot(ctxt, self.VOLUME_UUID, create_info)
- snap_ref['status'] = 'creating'
- snap_ref['progress'] = '0%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress = snap_ref.copy()
+ snap_ref_progress['status'] = 'creating'
- snap_ref['progress'] = '50%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_0p = snap_ref_progress.copy()
+ snap_ref_progress_0p['progress'] = '0%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_0p)
- snap_ref['progress'] = '90%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_50p = snap_ref_progress.copy()
+ snap_ref_progress_50p['progress'] = '50%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_50p)
+
+ snap_ref_progress_90p = snap_ref_progress.copy()
+ snap_ref_progress_90p['progress'] = '90%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_90p)
snap_info = {'active': snap_file,
self.SNAP_UUID: snap_file}
drv.create_snapshot(snap_ref)
+ mox.VerifyAll()
+
def test_create_snapshot_online_novafailure(self):
(mox, drv) = self._mox, self._driver
volume_path = '%s/%s/%s' % (self.TEST_MNT_POINT_BASE,
hashed,
volume_file)
- info_path = '%s.info' % volume_path
ctxt = context.RequestContext('fake_user', 'fake_project')
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_create_qcow2_snap_file')
mox.StubOutWithMock(drv, '_nova')
+ # Stub out the busy wait.
+ self.stub_out_not_replaying(time, 'sleep')
mox.StubOutWithMock(db, 'snapshot_get')
mox.StubOutWithMock(drv, '_write_info_file')
drv._nova.create_volume_snapshot(ctxt, self.VOLUME_UUID, create_info)
- snap_ref['status'] = 'creating'
- snap_ref['progress'] = '0%'
-
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress = snap_ref.copy()
+ snap_ref_progress['status'] = 'creating'
- snap_ref['progress'] = '50%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_0p = snap_ref_progress.copy()
+ snap_ref_progress_0p['progress'] = '0%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_0p)
- snap_ref['progress'] = '99%'
- snap_ref['status'] = 'error'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
-
- snap_info = {'active': snap_file,
- self.SNAP_UUID: snap_file}
+ snap_ref_progress_50p = snap_ref_progress.copy()
+ snap_ref_progress_50p['progress'] = '50%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_50p)
- drv._write_info_file(info_path, snap_info)
+ snap_ref_progress_99p = snap_ref_progress.copy()
+ snap_ref_progress_99p['progress'] = '99%'
+ snap_ref_progress_99p['status'] = 'error'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_99p)
mox.ReplayAll()
- self.assertRaises(exception.GlusterfsException,
- drv.create_snapshot,
- snap_ref)
+ self.assertRaisesAndMessageMatches(
+ exception.GlusterfsException,
+ 'Nova returned "error" status while creating snapshot.',
+ drv.create_snapshot,
+ snap_ref)
+
+ mox.VerifyAll()
def test_delete_snapshot_online_1(self):
"""Delete the newest snapshot, with only one snap present."""
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_nova')
+ # Stub out the busy wait.
+ self.stub_out_not_replaying(time, 'sleep')
mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, '_write_info_file')
- mox.StubOutWithMock(os.path, 'exists')
mox.StubOutWithMock(db, 'snapshot_get')
mox.StubOutWithMock(image_utils, 'qemu_img_info')
mox.StubOutWithMock(drv, '_ensure_share_writable')
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(snap_info)
- os.path.exists(snap_path).AndReturn(True)
-
qemu_img_info_output = """image: %s
file format: qcow2
virtual size: 1.0G (1073741824 bytes)
drv._read_info_file(info_path).AndReturn(snap_info)
- snap_ref['status'] = 'deleting'
- snap_ref['progress'] = '0%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress = snap_ref.copy()
+ snap_ref_progress['status'] = 'deleting'
- snap_ref['progress'] = '50%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_0p = snap_ref_progress.copy()
+ snap_ref_progress_0p['progress'] = '0%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_0p)
- snap_ref['progress'] = '90%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_50p = snap_ref_progress.copy()
+ snap_ref_progress_50p['progress'] = '50%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_50p)
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_90p = snap_ref_progress.copy()
+ snap_ref_progress_90p['progress'] = '90%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_90p)
drv._write_info_file(info_path, snap_info)
drv.delete_snapshot(snap_ref)
+ mox.VerifyAll()
+
def test_delete_snapshot_online_2(self):
"""Delete the middle of 3 snapshots."""
(mox, drv) = self._mox, self._driver
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_nova')
+ # Stub out the busy wait.
+ self.stub_out_not_replaying(time, 'sleep')
mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, '_write_info_file')
- mox.StubOutWithMock(os.path, 'exists')
mox.StubOutWithMock(db, 'snapshot_get')
mox.StubOutWithMock(image_utils, 'qemu_img_info')
mox.StubOutWithMock(drv, '_ensure_share_writable')
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(snap_info)
- os.path.exists(snap_path).AndReturn(True)
-
qemu_img_info_output = """image: %s
file format: qcow2
virtual size: 1.0G (1073741824 bytes)
drv._read_info_file(info_path).AndReturn(snap_info)
- snap_ref['status'] = 'deleting'
- snap_ref['progress'] = '0%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress = snap_ref.copy()
+ snap_ref_progress['status'] = 'deleting'
- snap_ref['progress'] = '50%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_0p = snap_ref_progress.copy()
+ snap_ref_progress_0p['progress'] = '0%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_0p)
- snap_ref['progress'] = '90%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_50p = snap_ref_progress.copy()
+ snap_ref_progress_50p['progress'] = '50%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_50p)
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_90p = snap_ref_progress.copy()
+ snap_ref_progress_90p['progress'] = '90%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_90p)
drv._write_info_file(info_path, snap_info)
drv.delete_snapshot(snap_ref)
+ mox.VerifyAll()
+
def test_delete_snapshot_online_novafailure(self):
"""Delete the newest snapshot."""
(mox, drv) = self._mox, self._driver
hashed = drv._get_hash_str(self.TEST_EXPORT1)
volume_file = 'volume-%s' % self.VOLUME_UUID
+ volume_dir = os.path.join(self.TEST_MNT_POINT_BASE, hashed)
volume_path = '%s/%s/%s' % (self.TEST_MNT_POINT_BASE,
hashed,
volume_file)
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_nova')
+ # Stub out the busy wait.
+ self.stub_out_not_replaying(time, 'sleep')
mox.StubOutWithMock(drv, '_read_info_file')
- mox.StubOutWithMock(drv, '_write_info_file')
- mox.StubOutWithMock(os.path, 'exists')
mox.StubOutWithMock(db, 'snapshot_get')
mox.StubOutWithMock(image_utils, 'qemu_img_info')
+ mox.StubOutWithMock(drv, '_ensure_share_writable')
snap_info = {'active': snap_file,
self.SNAP_UUID: snap_file}
+ drv._ensure_share_writable(volume_dir)
+
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(snap_info)
- os.path.exists(snap_path).AndReturn(True)
-
qemu_img_info_output = """image: %s
file format: qcow2
virtual size: 1.0G (1073741824 bytes)
""" % (snap_file, volume_file)
img_info = imageutils.QemuImgInfo(qemu_img_info_output)
+ vol_qemu_img_info_output = """image: %s
+ file format: raw
+ virtual size: 1.0G (1073741824 bytes)
+ disk size: 173K
+ """ % volume_file
+ volume_img_info = imageutils.QemuImgInfo(vol_qemu_img_info_output)
+
image_utils.qemu_img_info(snap_path).AndReturn(img_info)
+ image_utils.qemu_img_info(volume_path).AndReturn(volume_img_info)
+
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(snap_info)
delete_info = {
'type': 'qcow2',
'merge_target_file': None,
- 'file_to_merge': volume_file,
+ 'file_to_merge': None,
'volume_id': self.VOLUME_UUID
}
drv._read_info_file(info_path).AndReturn(snap_info)
- snap_ref['status'] = 'deleting'
- snap_ref['progress'] = '0%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
-
- snap_ref['progress'] = '50%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress = snap_ref.copy()
+ snap_ref_progress['status'] = 'deleting'
- snap_ref['status'] = 'error_deleting'
- snap_ref['progress'] = '90%'
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_0p = snap_ref_progress.copy()
+ snap_ref_progress_0p['progress'] = '0%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_0p)
- db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref)
+ snap_ref_progress_50p = snap_ref_progress.copy()
+ snap_ref_progress_50p['progress'] = '50%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_50p)
- drv._write_info_file(info_path, snap_info)
-
- drv._execute('rm', '-f', volume_path, run_as_root=True)
+ snap_ref_progress_90p = snap_ref_progress.copy()
+ snap_ref_progress_90p['status'] = 'error_deleting'
+ snap_ref_progress_90p['progress'] = '90%'
+ db.snapshot_get(ctxt, self.SNAP_UUID).AndReturn(snap_ref_progress_90p)
mox.ReplayAll()
- self.assertRaises(exception.GlusterfsException,
- drv.delete_snapshot,
- snap_ref)
+ self.assertRaisesAndMessageMatches(exception.GlusterfsException,
+ 'Unable to delete snapshot',
+ drv.delete_snapshot,
+ snap_ref)
+
+ mox.VerifyAll()
def test_get_backing_chain_for_path(self):
(mox, drv) = self._mox, self._driver
vol_path_2 = '%s/%s' % (vol_dir, vol_filename_2)
vol_path_3 = '%s/%s' % (vol_dir, vol_filename_3)
- mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_local_volume_dir')
mox.StubOutWithMock(image_utils, 'qemu_img_info')
info_2 = imageutils.QemuImgInfo(qemu_img_output_2)
info_3 = imageutils.QemuImgInfo(qemu_img_output_3)
- drv._local_volume_dir(volume).AndReturn(vol_dir)
image_utils.qemu_img_info(vol_path_3).\
AndReturn(info_3)
drv._local_volume_dir(volume).AndReturn(vol_dir)
chain = drv._get_backing_chain_for_path(volume, vol_path_3)
+ mox.VerifyAll()
+
# Verify chain contains all expected data
item_1 = drv._get_matching_backing_file(chain, vol_filename)
self.assertEqual(item_1['filename'], vol_filename_2)
drv._copy_volume_from_snapshot(snapshot, dest_volume, size)
+ mox.VerifyAll()
+
def test_create_volume_from_snapshot(self):
(mox, drv) = self._mox, self._driver
- volume = self._simple_volume('c1073000-0000-0000-0000-0000000c1073')
src_volume = self._simple_volume()
-
- mox.StubOutWithMock(drv, '_create_snapshot')
- mox.StubOutWithMock(drv, '_copy_volume_from_snapshot')
- mox.StubOutWithMock(drv, '_delete_snapshot')
-
snap_ref = {'volume_name': src_volume['name'],
'name': 'clone-snap-%s' % src_volume['id'],
'size': src_volume['size'],
'volume_size': src_volume['size'],
'volume_id': src_volume['id'],
'id': 'tmp-snap-%s' % src_volume['id'],
- 'volume': src_volume}
+ 'volume': src_volume,
+ 'status': 'available'}
- volume_ref = {'id': volume['id'],
- 'size': volume['size'],
- 'status': volume['status'],
- 'provider_location': volume['provider_location'],
- 'name': 'volume-' + volume['id']}
+ new_volume = DumbVolume()
+ new_volume['size'] = snap_ref['size']
- drv._create_snapshot(snap_ref)
+ mox.StubOutWithMock(drv, '_ensure_shares_mounted')
+ mox.StubOutWithMock(drv, '_find_share')
+ mox.StubOutWithMock(drv, '_do_create_volume')
+ mox.StubOutWithMock(drv, '_copy_volume_from_snapshot')
+
+ drv._ensure_shares_mounted()
+
+ drv._find_share(new_volume['size']).AndReturn(self.TEST_EXPORT1)
+
+ drv._do_create_volume(new_volume)
drv._copy_volume_from_snapshot(snap_ref,
- volume_ref,
- src_volume['size'])
- drv._delete_snapshot(snap_ref)
+ new_volume,
+ new_volume['size'])
mox.ReplayAll()
- drv.create_cloned_volume(volume, src_volume)
+ drv.create_volume_from_snapshot(new_volume, snap_ref)
+
+ mox.VerifyAll()
def test_initialize_connection(self):
(mox, drv) = self._mox, self._driver
conn_info = drv.initialize_connection(volume, None)
+ mox.VerifyAll()
+
self.assertEqual(conn_info['data']['format'], 'raw')
self.assertEqual(conn_info['driver_volume_type'], 'glusterfs')
self.assertEqual(conn_info['data']['name'], volume['name'])
(mox, drv) = self._mox, self._driver
- mox.StubOutWithMock(drv, '_qemu_img_info')
mox.StubOutWithMock(drv.db, 'volume_get')
- mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
- mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, 'get_active_image_from_info')
+ mox.StubOutWithMock(drv, '_qemu_img_info')
+ mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
backup = {'volume_id': volume['id']}
- drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn({})
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+
drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/path')
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
- drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
drv._qemu_img_info(IgnoreArg()).AndReturn(info)
base_driver.VolumeDriver.backup_volume(IgnoreArg(),
drv.backup_volume(ctxt, backup, IgnoreArg())
+ mox.VerifyAll()
+
def test_backup_volume_previous_snap(self):
"""Backup a volume that previously had a snapshot.
(mox, drv) = self._mox, self._driver
- mox.StubOutWithMock(drv, '_qemu_img_info')
mox.StubOutWithMock(drv.db, 'volume_get')
- mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, 'get_active_image_from_info')
+ mox.StubOutWithMock(drv, '_qemu_img_info')
mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
backup = {'volume_id': volume['id']}
- drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn(
- {'active': 'file2'})
+ drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
+
drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/file2')
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
- drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
drv._qemu_img_info(IgnoreArg()).AndReturn(info)
base_driver.VolumeDriver.backup_volume(IgnoreArg(),
drv.backup_volume(ctxt, backup, IgnoreArg())
+ mox.VerifyAll()
+
def test_backup_snap_failure_1(self):
"""Backup fails if snapshot exists (database)."""
(mox, drv) = self._mox, self._driver
mox.StubOutWithMock(drv.db, 'snapshot_get_all_for_volume')
- mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
drv.db.snapshot_get_all_for_volume(ctxt, volume['id']).AndReturn(
[{'snap1': 'a'}, {'snap2': 'b'}])
- base_driver.VolumeDriver.backup_volume(IgnoreArg(),
- IgnoreArg(),
- IgnoreArg())
-
mox.ReplayAll()
self.assertRaises(exception.InvalidVolume,
drv.backup_volume,
ctxt, backup, IgnoreArg())
+ mox.VerifyAll()
+
def test_backup_snap_failure_2(self):
"""Backup fails if snapshot exists (on-disk)."""
(mox, drv) = self._mox, self._driver
- mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv.db, 'volume_get')
mox.StubOutWithMock(drv, 'get_active_image_from_info')
mox.StubOutWithMock(drv, '_qemu_img_info')
- mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
- drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn(
- {'id1': 'file1',
- 'id2': 'file2',
- 'active': 'file2'})
-
drv.get_active_image_from_info(IgnoreArg()).\
AndReturn('/some/path/file2')
drv._qemu_img_info(IgnoreArg()).AndReturn(info)
- base_driver.VolumeDriver.backup_volume(IgnoreArg(),
- IgnoreArg(),
- IgnoreArg())
-
mox.ReplayAll()
self.assertRaises(exception.InvalidVolume,
drv.backup_volume,
ctxt, backup, IgnoreArg())
+ mox.VerifyAll()
+
def test_backup_failure_unsupported_format(self):
"""Attempt to backup a volume with a qcow2 base."""
mox.StubOutWithMock(drv, '_qemu_img_info')
mox.StubOutWithMock(drv.db, 'volume_get')
- mox.StubOutWithMock(base_driver.VolumeDriver, 'backup_volume')
- mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, 'get_active_image_from_info')
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
backup = {'volume_id': volume['id']}
- drv._read_info_file(IgnoreArg(), empty_if_missing=True).AndReturn({})
drv.get_active_image_from_info(IgnoreArg()).AndReturn('/some/path')
info = imageutils.QemuImgInfo()
drv.db.volume_get(ctxt, volume['id']).AndReturn(volume)
drv._qemu_img_info(IgnoreArg()).AndReturn(info)
- base_driver.VolumeDriver.backup_volume(IgnoreArg(),
- IgnoreArg(),
- IgnoreArg())
-
mox.ReplayAll()
self.assertRaises(exception.InvalidVolume,
drv.backup_volume,
ctxt, backup, IgnoreArg())
+
+ mox.VerifyAll()
+
+ def test_copy_volume_to_image_raw_image(self):
+ drv = self._driver
+
+ volume = self._simple_volume()
+ volume_path = '%s/%s' % (self.TEST_MNT_POINT, volume['name'])
+
+ with contextlib.nested(
+ mock.patch.object(drv, 'get_active_image_from_info'),
+ mock.patch.object(drv, '_local_volume_dir'),
+ mock.patch.object(image_utils, 'qemu_img_info'),
+ mock.patch.object(image_utils, 'upload_volume')
+ ) as (mock_get_active_image_from_info, mock_local_volume_dir,
+ mock_qemu_img_info, mock_upload_volume):
+ mock_get_active_image_from_info.return_value = volume['name']
+
+ mock_local_volume_dir.return_value = self.TEST_MNT_POINT
+
+ qemu_img_output = """image: %s
+ file format: raw
+ virtual size: 1.0G (1073741824 bytes)
+ disk size: 173K
+ """ % volume['name']
+ img_info = imageutils.QemuImgInfo(qemu_img_output)
+ mock_qemu_img_info.return_value = img_info
+
+ upload_path = volume_path
+
+ drv.copy_volume_to_image(mock.ANY, volume, mock.ANY, mock.ANY)
+
+ mock_get_active_image_from_info.assert_called_once_with(volume)
+ mock_local_volume_dir.assert_called_once_with(volume)
+ mock_qemu_img_info.assert_called_once_with(volume_path)
+ mock_upload_volume.assert_called_once_with(
+ mock.ANY, mock.ANY, mock.ANY, upload_path)
+
+ def test_copy_volume_to_image_qcow2_image(self):
+ """Upload a qcow2 image file which has to be converted to raw first."""
+ drv = self._driver
+
+ volume = self._simple_volume()
+ volume_path = '%s/%s' % (self.TEST_MNT_POINT, volume['name'])
+ image_meta = {'id': '10958016-e196-42e3-9e7f-5d8927ae3099'}
+
+ with contextlib.nested(
+ mock.patch.object(drv, 'get_active_image_from_info'),
+ mock.patch.object(drv, '_local_volume_dir'),
+ mock.patch.object(image_utils, 'qemu_img_info'),
+ mock.patch.object(image_utils, 'convert_image'),
+ mock.patch.object(image_utils, 'upload_volume'),
+ mock.patch.object(drv, '_execute')
+ ) as (mock_get_active_image_from_info, mock_local_volume_dir,
+ mock_qemu_img_info, mock_convert_image, mock_upload_volume,
+ mock_execute):
+ mock_get_active_image_from_info.return_value = volume['name']
+
+ mock_local_volume_dir.return_value = self.TEST_MNT_POINT
+
+ qemu_img_output = """image: %s
+ file format: qcow2
+ virtual size: 1.0G (1073741824 bytes)
+ disk size: 173K
+ """ % volume['name']
+ img_info = imageutils.QemuImgInfo(qemu_img_output)
+ mock_qemu_img_info.return_value = img_info
+
+ upload_path = '%s/%s.temp_image.%s' % (self.TEST_MNT_POINT,
+ volume['id'],
+ image_meta['id'])
+
+ drv.copy_volume_to_image(mock.ANY, volume, mock.ANY, image_meta)
+
+ mock_get_active_image_from_info.assert_called_once_with(volume)
+ mock_local_volume_dir.assert_called_with(volume)
+ mock_qemu_img_info.assert_called_once_with(volume_path)
+ mock_convert_image.assert_called_once_with(
+ volume_path, upload_path, 'raw')
+ mock_upload_volume.assert_called_once_with(
+ mock.ANY, mock.ANY, mock.ANY, upload_path)
+ mock_execute.assert_called_once_with('rm', '-f', upload_path)
+
+ def test_copy_volume_to_image_snapshot_exists(self):
+ """Upload an active snapshot which has to be converted to raw first."""
+ drv = self._driver
+
+ volume = self._simple_volume()
+ volume_path = '%s/volume-%s' % (self.TEST_MNT_POINT, self.VOLUME_UUID)
+ volume_filename = 'volume-%s' % self.VOLUME_UUID
+ image_meta = {'id': '10958016-e196-42e3-9e7f-5d8927ae3099'}
+
+ with contextlib.nested(
+ mock.patch.object(drv, 'get_active_image_from_info'),
+ mock.patch.object(drv, '_local_volume_dir'),
+ mock.patch.object(image_utils, 'qemu_img_info'),
+ mock.patch.object(image_utils, 'convert_image'),
+ mock.patch.object(image_utils, 'upload_volume'),
+ mock.patch.object(drv, '_execute')
+ ) as (mock_get_active_image_from_info, mock_local_volume_dir,
+ mock_qemu_img_info, mock_convert_image, mock_upload_volume,
+ mock_execute):
+ mock_get_active_image_from_info.return_value = volume['name']
+
+ mock_local_volume_dir.return_value = self.TEST_MNT_POINT
+
+ qemu_img_output = """image: volume-%s.%s
+ file format: qcow2
+ virtual size: 1.0G (1073741824 bytes)
+ disk size: 173K
+ backing file: %s
+ """ % (self.VOLUME_UUID, self.SNAP_UUID, volume_filename)
+ img_info = imageutils.QemuImgInfo(qemu_img_output)
+ mock_qemu_img_info.return_value = img_info
+
+ upload_path = '%s/%s.temp_image.%s' % (self.TEST_MNT_POINT,
+ volume['id'],
+ image_meta['id'])
+
+ drv.copy_volume_to_image(mock.ANY, volume, mock.ANY, image_meta)
+
+ mock_get_active_image_from_info.assert_called_once_with(volume)
+ mock_local_volume_dir.assert_called_with(volume)
+ mock_qemu_img_info.assert_called_once_with(volume_path)
+ mock_convert_image.assert_called_once_with(
+ volume_path, upload_path, 'raw')
+ mock_upload_volume.assert_called_once_with(
+ mock.ANY, mock.ANY, mock.ANY, upload_path)
+ mock_execute.assert_called_once_with('rm', '-f', upload_path)