class FakeObject(object):
- fields = {}
+ def __init__(self):
+ self._fields = {}
def __setitem__(self, key, value):
- self.fields[key] = value
+ self._fields[key] = value
def __getitem__(self, item):
- return self.fields[item]
+ return self._fields[item]
class FakeManagedObjectReference(object):
self._volumeops.get_hosts().AndReturn(retrieve_result)
m.StubOutWithMock(self._driver, '_create_backing')
volume = FakeObject()
+ volume['name'] = 'vol_name'
backing = FakeMor('VirtualMachine', 'my_back')
mux = self._driver._create_backing(volume, host1.obj)
mux.AndRaise(error_util.VimException('Maintenance mode'))
m.StubOutWithMock(self._volumeops, 'get_backing')
snapshot = FakeObject()
snapshot['volume_name'] = 'volume_name'
+ snapshot['name'] = 'snap_name'
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'available'
self._volumeops.get_backing(snapshot['volume_name'])
m.ReplayAll()
snapshot['volume_name'] = 'volume_name'
snapshot['name'] = 'snapshot_name'
snapshot['display_description'] = 'snapshot_desc'
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'available'
backing = FakeMor('VirtualMachine', 'my_back')
self._volumeops.get_backing(snapshot['volume_name']).AndReturn(backing)
m.StubOutWithMock(self._volumeops, 'create_snapshot')
m.UnsetStubs()
m.VerifyAll()
+ def test_create_snapshot_when_attached(self):
+ """Test vmdk.create_snapshot when volume is attached."""
+ snapshot = FakeObject()
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'in-use'
+
+ self.assertRaises(exception.InvalidVolume,
+ self._driver.create_snapshot, snapshot)
+
def test_get_snapshot_from_tree(self):
"""Test _get_snapshot_from_tree."""
volops = volumeops.VMwareVolumeOps
m.StubOutWithMock(self._volumeops, 'get_backing')
snapshot = FakeObject()
snapshot['volume_name'] = 'volume_name'
+ snapshot['name'] = 'snap_name'
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'available'
self._volumeops.get_backing(snapshot['volume_name'])
m.ReplayAll()
snapshot = FakeObject()
snapshot['name'] = 'snapshot_name'
snapshot['volume_name'] = 'volume_name'
+ snapshot['name'] = 'snap_name'
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'available'
backing = FakeMor('VirtualMachine', 'my_back')
self._volumeops.get_backing(snapshot['volume_name']).AndReturn(backing)
m.StubOutWithMock(self._volumeops, 'delete_snapshot')
m.UnsetStubs()
m.VerifyAll()
+ def test_delete_snapshot_when_attached(self):
+ """Test delete_snapshot when volume is attached."""
+ snapshot = FakeObject()
+ snapshot['volume'] = FakeObject()
+ snapshot['volume']['status'] = 'in-use'
+
+ self.assertRaises(exception.InvalidVolume,
+ self._driver.delete_snapshot, snapshot)
+
def test_create_cloned_volume_without_backing(self):
"""Test create_cloned_volume without a backing."""
m = self.mox
m.StubOutWithMock(self._volumeops, 'get_backing')
volume = FakeObject()
volume['name'] = 'volume_name'
+ volume['status'] = 'available'
src_vref = FakeObject()
src_vref['name'] = 'src_volume_name'
self._volumeops.get_backing(src_vref['name'])
volume['name'] = 'volume_name'
snapshot = FakeObject()
snapshot['volume_name'] = 'volume_name'
+ snapshot['name'] = 'snap_name'
self._volumeops.get_backing(snapshot['volume_name'])
m.ReplayAll()
image_meta['disk_format'] = 'novmdk'
volume = FakeObject()
volume['name'] = 'vol-name'
+ volume['status'] = 'available'
m.ReplayAll()
self.assertRaises(exception.ImageUnacceptable,
m.UnsetStubs()
m.VerifyAll()
+ def test_copy_volume_to_image_when_attached(self):
+ """Test copy_volume_to_image when volume is attached."""
+ m = self.mox
+ volume = FakeObject()
+ volume['status'] = 'in-use'
+
+ m.ReplayAll()
+ self.assertRaises(exception.InvalidVolume,
+ self._driver.copy_volume_to_image,
+ mox.IgnoreArg(), volume,
+ mox.IgnoreArg(), mox.IgnoreArg())
+ m.UnsetStubs()
+ m.VerifyAll()
+
def test_copy_volume_to_image_vmdk(self):
"""Test copy_volume_to_image for a valid vmdk disk format."""
m = self.mox
volume = FakeObject()
volume['name'] = vol_name
volume['project_id'] = project_id
+ volume['status'] = 'available'
# volumeops.get_backing
backing = FakeMor("VirtualMachine", "my_vm")
m.StubOutWithMock(self._volumeops, 'get_backing')
vmdk_file_path = '[%s] %s' % (datastore_name, file_path)
m.StubOutWithMock(self._volumeops, 'get_vmdk_path')
self._volumeops.get_vmdk_path(backing).AndReturn(vmdk_file_path)
- # volumeops.create_snapshot
- snapshot_name = 'snapshot-%s' % image_id
- m.StubOutWithMock(self._volumeops, 'create_snapshot')
- self._volumeops.create_snapshot(backing, snapshot_name, None, True)
tmp_vmdk = '[datastore1] %s.vmdk' % image_id
# volumeops.get_host
host = FakeMor('Host', 'my_host')
backing = FakeMor('VirtualMachine', 'my_back')
src_vref = FakeObject()
src_vref['name'] = 'src_vol_name'
+ src_vref['status'] = 'available'
self._volumeops.get_backing(src_vref['name']).AndReturn(backing)
volume = FakeObject()
volume['volume_type_id'] = None
self._driver.create_cloned_volume(volume, src_vref)
m.UnsetStubs()
- def test_create_lined_cloned_volume_with_backing(self):
+ def test_create_linked_cloned_volume_with_backing(self):
"""Test create_cloned_volume with clone type - linked."""
m = self.mox
m.StubOutWithMock(self._driver.__class__, 'volumeops')
backing = FakeMor('VirtualMachine', 'my_back')
src_vref = FakeObject()
src_vref['name'] = 'src_vol_name'
+ src_vref['status'] = 'available'
self._volumeops.get_backing(src_vref['name']).AndReturn(backing)
volume = FakeObject()
volume['id'] = 'volume_id'
m.ReplayAll()
self._driver.create_cloned_volume(volume, src_vref)
m.UnsetStubs()
+
+ def test_create_linked_cloned_volume_when_attached(self):
+ """Test create_cloned_volume linked clone when volume is attached."""
+ m = self.mox
+ m.StubOutWithMock(self._driver.__class__, 'volumeops')
+ self._driver.volumeops = self._volumeops
+ m.StubOutWithMock(self._volumeops, 'get_backing')
+ backing = FakeMor('VirtualMachine', 'my_back')
+ src_vref = FakeObject()
+ src_vref['name'] = 'src_vol_name'
+ src_vref['status'] = 'in-use'
+ volume = FakeObject()
+ self._volumeops.get_backing(src_vref['name']).AndReturn(backing)
+ m.StubOutWithMock(vmdk.VMwareVcVmdkDriver, '_get_clone_type')
+ moxed = vmdk.VMwareVcVmdkDriver._get_clone_type(volume)
+ moxed.AndReturn(volumeops.LINKED_CLONE_TYPE)
+
+ m.ReplayAll()
+ self.assertRaises(exception.InvalidVolume,
+ self._driver.create_cloned_volume, volume, src_vref)
+ m.UnsetStubs()
m.VerifyAll()
If the volume does not have a backing then simply pass, else create
a snapshot.
+ Snapshot of only available volume is supported.
:param snapshot: Snapshot object
"""
+
+ volume = snapshot['volume']
+ if volume['status'] != 'available':
+ msg = _("Snapshot of volume not supported in state: %s.")
+ LOG.error(msg % volume['status'])
+ raise exception.InvalidVolume(msg % volume['status'])
backing = self.volumeops.get_backing(snapshot['volume_name'])
if not backing:
LOG.info(_("There is no backing, so will not create "
If the volume does not have a backing or the snapshot does not exist
then simply pass, else delete the snapshot.
+ Snapshot deletion of only available volume is supported.
:param snapshot: Snapshot object
"""
+
+ volume = snapshot['volume']
+ if volume['status'] != 'available':
+ msg = _("Delete snapshot of volume not supported in state: %s.")
+ LOG.error(msg % volume['status'])
+ raise exception.InvalidVolume(msg % volume['status'])
backing = self.volumeops.get_backing(snapshot['volume_name'])
if not backing:
LOG.info(_("There is no backing, and so there is no "
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Creates glance image from volume.
+ Upload of only available volume is supported.
Steps followed are:
1. Get the name of the vmdk file which the volume points to right now.
Can be a chain of snapshots, so we need to know the last in the
chain.
- 2. Create the snapshot. A new vmdk is created which the volume points
- to now. The earlier vmdk becomes read-only.
- 3. Call CopyVirtualDisk which coalesces the disk chain to form a
+ 2. Call CopyVirtualDisk which coalesces the disk chain to form a
single vmdk, rather a .vmdk metadata file and a -flat.vmdk disk
data file.
- 4. Now upload the -flat.vmdk file to the image store.
- 5. Delete the coalesced .vmdk and -flat.vmdk created.
+ 3. Now upload the -flat.vmdk file to the image store.
+ 4. Delete the coalesced .vmdk and -flat.vmdk created.
"""
+
+ if volume['status'] != 'available':
+ msg = _("Upload to glance of volume not supported in state: %s.")
+ LOG.error(msg % volume['status'])
+ raise exception.InvalidVolume(msg % volume['status'])
+
LOG.debug(_("Copy Volume: %s to new image.") % volume['name'])
VMwareEsxVmdkDriver._validate_disk_format(image_meta['disk_format'])
vmdk_file_path = self.volumeops.get_vmdk_path(backing)
datastore_name = volumeops.split_datastore_path(vmdk_file_path)[0]
- # Create a snapshot
+ # Create a copy of the vmdk into a tmp file
image_id = image_meta['id']
- snapshot_name = "snapshot-%s" % image_id
- self.volumeops.create_snapshot(backing, snapshot_name, None, True)
-
- # Create a copy of the snapshotted vmdk into a tmp file
tmp_vmdk_file_path = '[%s] %s.vmdk' % (datastore_name, image_id)
host = self.volumeops.get_host(backing)
datacenter = self.volumeops.get_dc(host)
"""Creates volume clone.
If source volume's backing does not exist, then pass.
+ Linked clone of attached volume is not supported.
:param volume: New Volume object
:param src_vref: Source Volume object
"""
+
backing = self.volumeops.get_backing(src_vref['name'])
if not backing:
LOG.info(_("There is no backing for the source volume: %(src)s. "
clone_type = VMwareVcVmdkDriver._get_clone_type(volume)
snapshot = None
if clone_type == volumeops.LINKED_CLONE_TYPE:
+ if src_vref['status'] != 'available':
+ msg = _("Linked clone of source volume not supported "
+ "in state: %s.")
+ LOG.error(msg % src_vref['status'])
+ raise exception.InvalidVolume(msg % src_vref['status'])
# For performing a linked clone, we snapshot the volume and
# then create the linked clone out of this snapshot point.
name = 'snapshot-%s' % volume['id']