select_ds_for_volume.assert_called_once_with(volume)
vops.clone_backing.assert_called_once_with(
volume['name'], backing, None, volumeops.FULL_CLONE_TYPE,
- datastore, disk_type, host, rp)
+ datastore, disk_type=disk_type, host=host, resource_pool=rp,
+ folder=folder)
delete_tmp_backing.assert_called_once_with(backing)
vops.update_backing_disk_uuid(clone, volume['id'])
else:
vops.rename_backing.assert_called_once_with(backing, uuid)
vops.clone_backing.assert_called_once_with(
vol['name'], backing, None, volumeops.FULL_CLONE_TYPE,
- datastore, vmdk.THIN_VMDK_TYPE, host, rp)
+ datastore, disk_type=vmdk.THIN_VMDK_TYPE, host=host,
+ resource_pool=rp, folder=folder)
vops.update_backing_disk_uuid.assert_called_once_with(clone, vol['id'])
delete_temp_backing.assert_called_once_with(backing)
vops.change_backing_profile.assert_called_once_with(clone,
summary = mock.Mock()
summary.datastore = mock.sentinel.datastore
select_ds.return_value = (mock.sentinel.host, mock.sentinel.rp,
- mock.ANY, summary)
+ mock.sentinel.folder, summary)
disk_type = vmdk.THIN_VMDK_TYPE
get_disk_type.return_value = disk_type
context, src_uuid, volume, tmp_file_path, backup_size)
vops.clone_backing.assert_called_once_with(
volume['name'], src, None, volumeops.FULL_CLONE_TYPE,
- summary.datastore, disk_type, mock.sentinel.host, mock.sentinel.rp)
+ summary.datastore, disk_type=disk_type, host=mock.sentinel.host,
+ resource_pool=mock.sentinel.rp, folder=mock.sentinel.folder)
vops.update_backing_disk_uuid.assert_called_once_with(dest,
volume['id'])
delete_temp_backing.assert_called_once_with(src)
context, src_uuid, volume, tmp_file_path, backup_size)
vops.clone_backing.assert_called_once_with(
dest_uuid, src, None, volumeops.FULL_CLONE_TYPE,
- summary.datastore, disk_type, mock.sentinel.host, mock.sentinel.rp)
+ summary.datastore, disk_type=disk_type, host=mock.sentinel.host,
+ resource_pool=mock.sentinel.rp, folder=mock.sentinel.folder)
vops.update_backing_disk_uuid.assert_called_once_with(dest,
volume['id'])
exp_rename_calls = [mock.call(backing, tmp_uuid),
None,
host=None,
resource_pool=None,
- extra_config=extra_config)
+ extra_config=extra_config,
+ folder=None)
volume_ops.update_backing_disk_uuid.assert_called_once_with(
clone, fake_volume['id'])
_select_ds_for_volume.assert_called_with(fake_volume)
extra_config = {vmdk.EXTRA_CONFIG_VOLUME_ID_KEY: fake_volume['id']}
- volume_ops.clone_backing.assert_called_with(fake_volume['name'],
- fake_backing,
- fake_snapshot,
- volumeops.FULL_CLONE_TYPE,
- fake_datastore,
- host=fake_host,
- resource_pool=
- fake_resource_pool,
- extra_config=extra_config)
+ volume_ops.clone_backing.assert_called_with(
+ fake_volume['name'],
+ fake_backing,
+ fake_snapshot,
+ volumeops.FULL_CLONE_TYPE,
+ fake_datastore,
+ host=fake_host,
+ resource_pool=fake_resource_pool,
+ extra_config=extra_config,
+ folder=fake_folder)
volume_ops.update_backing_disk_uuid.assert_called_once_with(
clone, fake_volume['id'])
Test suite for VMware VMDK driver volumeops module.
"""
+import ddt
import mock
from oslo_utils import units
from oslo_vmware import exceptions
from cinder.volume.drivers.vmware import volumeops
+@ddt.ddt
class VolumeOpsTestCase(test.TestCase):
"""Unit tests for volumeops module."""
disk_device)
self._verify_extra_config(ret.config.extraConfig, key, value)
+ @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.'
+ '_get_folder')
@mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.'
'_get_clone_spec')
- def test_clone_backing(self, get_clone_spec):
- folder = mock.Mock(name='folder', spec=object)
- folder._type = 'Folder'
- task = mock.sentinel.task
- self.session.invoke_api.side_effect = [folder, task, folder, task,
- folder, task]
- task_info = mock.Mock(spec=object)
- task_info.result = mock.sentinel.new_backing
- self.session.wait_for_task.return_value = task_info
+ def _test_clone_backing(
+ self, clone_type, folder, get_clone_spec, get_folder):
+ backing_folder = mock.sentinel.backing_folder
+ get_folder.return_value = backing_folder
+
clone_spec = mock.sentinel.clone_spec
get_clone_spec.return_value = clone_spec
- # Test non-linked clone_backing
+
+ task = mock.sentinel.task
+ self.session.invoke_api.return_value = task
+
+ clone = mock.sentinel.clone
+ self.session.wait_for_task.return_value = mock.Mock(result=clone)
+
name = mock.sentinel.name
- backing = mock.Mock(spec=object)
- backing._type = 'VirtualMachine'
+ backing = mock.sentinel.backing
snapshot = mock.sentinel.snapshot
- clone_type = "anything-other-than-linked"
- datastore = mock.sentinel.datstore
- ret = self.vops.clone_backing(name, backing, snapshot, clone_type,
- datastore)
- # verify calls
- self.assertEqual(mock.sentinel.new_backing, ret)
- disk_move_type = 'moveAllDiskBackingsAndDisallowSharing'
- get_clone_spec.assert_called_with(
- datastore, disk_move_type, snapshot, backing, None, host=None,
- resource_pool=None, extra_config=None)
- expected = [mock.call(vim_util, 'get_object_property',
- self.session.vim, backing, 'parent'),
- mock.call(self.session.vim, 'CloneVM_Task', backing,
- folder=folder, name=name, spec=clone_spec)]
- self.assertEqual(expected, self.session.invoke_api.mock_calls)
-
- # Test linked clone_backing
- clone_type = volumeops.LINKED_CLONE_TYPE
- self.session.invoke_api.reset_mock()
- ret = self.vops.clone_backing(name, backing, snapshot, clone_type,
- datastore)
- # verify calls
- self.assertEqual(mock.sentinel.new_backing, ret)
- disk_move_type = 'createNewChildDiskBacking'
- get_clone_spec.assert_called_with(
- datastore, disk_move_type, snapshot, backing, None, host=None,
- resource_pool=None, extra_config=None)
- expected = [mock.call(vim_util, 'get_object_property',
- self.session.vim, backing, 'parent'),
- mock.call(self.session.vim, 'CloneVM_Task', backing,
- folder=folder, name=name, spec=clone_spec)]
- self.assertEqual(expected, self.session.invoke_api.mock_calls)
-
- # Test with optional params (disk_type, host, resource_pool and
- # extra_config).
- clone_type = None
- disk_type = 'thin'
+ datastore = mock.sentinel.datastore
+ disk_type = mock.sentinel.disk_type
host = mock.sentinel.host
- rp = mock.sentinel.rp
+ resource_pool = mock.sentinel.resource_pool
extra_config = mock.sentinel.extra_config
- self.session.invoke_api.reset_mock()
- ret = self.vops.clone_backing(name, backing, snapshot, clone_type,
- datastore, disk_type, host, rp,
- extra_config)
-
- self.assertEqual(mock.sentinel.new_backing, ret)
- disk_move_type = 'moveAllDiskBackingsAndDisallowSharing'
- get_clone_spec.assert_called_with(
- datastore, disk_move_type, snapshot, backing, disk_type, host=host,
- resource_pool=rp, extra_config=extra_config)
- expected = [mock.call(vim_util, 'get_object_property',
- self.session.vim, backing, 'parent'),
- mock.call(self.session.vim, 'CloneVM_Task', backing,
- folder=folder, name=name, spec=clone_spec)]
- self.assertEqual(expected, self.session.invoke_api.mock_calls)
+ ret = self.vops.clone_backing(
+ name, backing, snapshot, clone_type, datastore,
+ disk_type=disk_type, host=host, resource_pool=resource_pool,
+ extra_config=extra_config, folder=folder)
- # Clear side effects.
- self.session.invoke_api.side_effect = None
+ if folder:
+ self.assertFalse(get_folder.called)
+ else:
+ get_folder.assert_called_once_with(backing)
+
+ if clone_type == 'linked':
+ exp_disk_move_type = 'createNewChildDiskBacking'
+ else:
+ exp_disk_move_type = 'moveAllDiskBackingsAndDisallowSharing'
+ get_clone_spec.assert_called_once_with(
+ datastore, exp_disk_move_type, snapshot, backing, disk_type,
+ host=host, resource_pool=resource_pool, extra_config=extra_config)
+
+ exp_folder = folder if folder else backing_folder
+ self.session.invoke_api.assert_called_once_with(
+ self.session.vim, 'CloneVM_Task', backing, folder=exp_folder,
+ name=name, spec=clone_spec)
+
+ self.session.wait_for_task.assert_called_once_with(task)
+ self.assertEqual(clone, ret)
+
+ @ddt.data('linked', 'full')
+ def test_clone_backing(self, clone_type):
+ self._test_clone_backing(clone_type, mock.sentinel.folder)
+
+ def test_clone_backing_with_empty_folder(self):
+ self._test_clone_backing('linked', None)
@mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.'
'_create_specs_for_disk_add')
if disk_conversion:
# Clone the temporary backing for disk type conversion.
- (host, rp, _folder, summary) = self._select_ds_for_volume(
+ (host, rp, folder, summary) = self._select_ds_for_volume(
volume)
datastore = summary.datastore
LOG.debug("Cloning temporary backing: %s for disk type "
None,
volumeops.FULL_CLONE_TYPE,
datastore,
- disk_type,
- host,
- rp)
+ disk_type=disk_type,
+ host=host,
+ resource_pool=rp,
+ folder=folder)
self._delete_temp_backing(backing)
backing = clone
self.volumeops.update_backing_disk_uuid(backing, volume['id'])
return False
(host, rp, summary) = best_candidate
+ dc = self.volumeops.get_dc(rp)
+ folder = self._get_volume_group_folder(dc, volume['project_id'])
new_datastore = summary.datastore
if datastore.value != new_datastore.value:
# Datastore changed; relocate the backing.
backing)
self.volumeops.relocate_backing(
backing, new_datastore, rp, host, new_disk_type)
-
- dc = self.volumeops.get_dc(rp)
- folder = self._get_volume_group_folder(dc,
- volume['project_id'])
self.volumeops.move_backing_to_folder(backing, folder)
elif need_disk_type_conversion:
# Same datastore, but clone is needed for disk type conversion.
new_backing = self.volumeops.clone_backing(
volume['name'], backing, None,
- volumeops.FULL_CLONE_TYPE, datastore, new_disk_type,
- host, rp)
+ volumeops.FULL_CLONE_TYPE, datastore,
+ disk_type=new_disk_type, host=host,
+ resource_pool=rp, folder=folder)
self.volumeops.update_backing_disk_uuid(new_backing,
volume['id'])
self._delete_temp_backing(backing)
renamed = False
try:
# Find datastore for clone.
- (host, rp, _folder, summary) = self._select_ds_for_volume(volume)
+ (host, rp, folder, summary) = self._select_ds_for_volume(volume)
datastore = summary.datastore
disk_type = VMwareVcVmdkDriver._get_disk_type(volume)
dest = self.volumeops.clone_backing(dest_name, src, None,
volumeops.FULL_CLONE_TYPE,
- datastore, disk_type, host, rp)
+ datastore, disk_type=disk_type,
+ host=host, resource_pool=rp,
+ folder=folder)
self.volumeops.update_backing_disk_uuid(dest, volume['id'])
if new_backing:
LOG.debug("Created new backing: %s for restoring backup.",
datastore = None
host = None
rp = None
+ folder = None
if not clone_type == volumeops.LINKED_CLONE_TYPE:
# Pick a datastore where to create the full clone under any host
- (host, rp, _folder, summary) = self._select_ds_for_volume(volume)
+ (host, rp, folder, summary) = self._select_ds_for_volume(volume)
datastore = summary.datastore
extra_config = self._get_extra_config(volume)
clone = self.volumeops.clone_backing(volume['name'], backing,
snapshot, clone_type, datastore,
host=host, resource_pool=rp,
- extra_config=extra_config)
+ extra_config=extra_config,
+ folder=folder)
self.volumeops.update_backing_disk_uuid(clone, volume['id'])
# If the volume size specified by the user is greater than
# the size of the source volume, the newly created volume will
def clone_backing(self, name, backing, snapshot, clone_type, datastore,
disk_type=None, host=None, resource_pool=None,
- extra_config=None):
+ extra_config=None, folder=None):
"""Clone backing.
If the clone_type is 'full', then a full clone of the source volume
:param resource_pool: Target resource pool
:param extra_config: Key-value pairs to be written to backing's
extra-config
+ :param folder: The location of the clone
"""
LOG.debug("Creating a clone of backing: %(back)s, named: %(name)s, "
"clone type: %(type)s from snapshot: %(snap)s on "
{'back': backing, 'name': name, 'type': clone_type,
'snap': snapshot, 'ds': datastore, 'disk_type': disk_type,
'host': host, 'resource_pool': resource_pool})
- folder = self._get_folder(backing)
+
+ if folder is None:
+ # Use source folder as the location of the clone.
+ folder = self._get_folder(backing)
+
if clone_type == LINKED_CLONE_TYPE:
disk_move_type = 'createNewChildDiskBacking'
else: