fake_image.stub_out_image_service(self.stubs)
test_notifier.NOTIFICATIONS = []
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
+ self.stubs.Set(os.path, 'exists', lambda x: True)
def tearDown(self):
try:
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_id)
+ def test_delete_no_dev_fails(self):
+ """Test delete snapshot with no dev file fails."""
+ self.stubs.Set(os.path, 'exists', lambda x: False)
+ self.volume.driver.vg = FakeBrickLVM('cinder-volumes',
+ False,
+ None,
+ 'default')
+
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
+ volume_id = volume['id']
+ self.volume.create_volume(self.context, volume_id)
+ snapshot_id = self._create_snapshot(volume_id)['id']
+ self.volume.create_snapshot(self.context, volume_id, snapshot_id)
+
+ self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
+
+ self.volume.driver.delete_snapshot(
+ mox.IgnoreArg()).AndRaise(
+ exception.SnapshotIsBusy(snapshot_name='fake'))
+ self.mox.ReplayAll()
+ self.volume.delete_snapshot(self.context, snapshot_id)
+ snapshot_ref = db.snapshot_get(self.context, snapshot_id)
+ self.assertEqual(snapshot_id, snapshot_ref.id)
+ self.assertEqual("available", snapshot_ref.status)
+
+ self.mox.UnsetStubs()
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.volume.delete_snapshot,
+ self.context,
+ snapshot_id)
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
+
def _create_volume_from_image(self, fakeout_copy_image_to_volume=False,
fakeout_clone_image=False):
"""Test function of create_volume_from_image.
lvm_driver = lvm.LVMVolumeDriver(configuration=configuration)
self.stubs.Set(volutils, 'copy_volume',
lambda x, y, z, sync=False, execute='foo': True)
+ self.stubs.Set(os.path, 'exists', lambda x: True)
fake_volume = {'name': 'test1',
'volume_name': 'test1',
# zero out old volumes to prevent data leaking between users
# TODO(ja): reclaiming space should be done lazy and low priority
- dev_path = self.local_path(volume)
-
- # TODO(jdg): Maybe we could optimize this for snaps by looking at
- # the cow table and only overwriting what's necessary?
- # for now we're still skipping on snaps due to hang issue
- if os.path.exists(dev_path) and not is_snapshot:
- self.clear_volume(volume)
+ self.clear_volume(volume, is_snapshot)
name = volume['name']
if is_snapshot:
name = self._escape_snapshot(volume['name'])
self._delete_volume(volume)
- def clear_volume(self, volume):
+ def clear_volume(self, volume, is_snapshot=False):
"""unprovision old volumes to prevent data leaking between users."""
if self.configuration.volume_clear == 'none':
return
- vol_path = self.local_path(volume)
+ if is_snapshot and not self.configuration.lvm_type == 'thin':
+ # if the volume to be cleared is a snapshot of another volume
+ # we need to clear out the volume using the -cow instead of the
+ # directly volume path. We need to skip this if we are using
+ # thin provisioned LVs.
+ # bug# lp1191812
+ dev_path = self.local_path(volume) + "-cow"
+ else:
+ dev_path = self.local_path(volume)
+
+ if not os.path.exists(dev_path):
+ msg = (_('Volume device file path %s does not exist.') % dev_path)
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
size_in_g = volume.get('size', volume.get('volume_size', None))
if size_in_g is None:
LOG.warning(_("Size for volume: %s not found, "
if self.configuration.volume_clear == 'zero':
if size_in_m == 0:
return volutils.copy_volume('/dev/zero',
- vol_path, size_in_g * 1024,
+ dev_path, size_in_g * 1024,
sync=True,
execute=self._execute)
else:
self.configuration.volume_clear)
return
- clear_cmd.append(vol_path)
+ clear_cmd.append(dev_path)
self._execute(*clear_cmd, run_as_root=True)
def create_snapshot(self, snapshot):