self.driver.delete_snapshot(self.snapshot)
- args = [str(self.snapshot_name)]
- proxy.remove_snap.assert_called_with(*args)
- proxy.unprotect_snap.assert_called_with(*args)
+ proxy.remove_snap.assert_called_with(self.snapshot_name)
+ proxy.unprotect_snap.assert_called_with(self.snapshot_name)
+
+ @common_mocks
+ def test_delete_busy_snapshot(self):
+ proxy = self.mock_proxy.return_value
+ proxy.__enter__.return_value = proxy
+
+ proxy.unprotect_snap.side_effect = (
+ self.mock_rbd.ImageBusy)
+
+ with mock.patch.object(self.driver, '_get_children_info') as \
+ mock_get_children_info:
+ mock_get_children_info.return_value = [('pool', 'volume2')]
+
+ with mock.patch.object(driver, 'LOG') as \
+ mock_log:
+
+ self.assertRaises(exception.SnapshotIsBusy,
+ self.driver.delete_snapshot,
+ self.snapshot)
+
+ mock_get_children_info.assert_called_once_with(
+ proxy,
+ self.snapshot_name)
+
+ self.assertTrue(mock_log.info.called)
+ self.assertTrue(proxy.unprotect_snap.called)
+ self.assertFalse(proxy.remove_snap.called)
+
+ @common_mocks
+ def test_get_children_info(self):
+ volume = self.mock_proxy
+ volume.set_snap = mock.Mock()
+ volume.list_children = mock.Mock()
+ list_children = [('pool', 'volume2')]
+ volume.list_children.return_value = list_children
+
+ info = self.driver._get_children_info(volume,
+ self.snapshot_name)
+
+ self.assertEqual(list_children, info)
@common_mocks
def test_get_clone_info(self):
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import units
-import six
from six.moves import urllib
from cinder import exception
if parent_snap == "%s.clone_snap" % volume_name:
return pool, parent, parent_snap
except self.rbd.ImageNotFound:
- LOG.debug("volume %s is not a clone", volume_name)
+ LOG.debug("Volume %s is not a clone.", volume_name)
volume.set_snap(None)
return (None, None, None)
+ def _get_children_info(self, volume, snap):
+ """List children for the given snapshot of an volume(image).
+
+ Returns a list of (pool, image).
+ """
+
+ children_list = []
+
+ if snap:
+ volume.set_snap(snap)
+ children_list = volume.list_children()
+ volume.set_snap(None)
+
+ return children_list
+
def _delete_clone_parent_refs(self, client, parent_name, parent_snap):
"""Walk back up the clone chain and delete references.
# utf-8 otherwise librbd will barf.
volume_name = utils.convert_str(snapshot['volume_name'])
snap_name = utils.convert_str(snapshot['name'])
+
with RBDVolumeProxy(self, volume_name) as volume:
try:
volume.unprotect_snap(snap_name)
except self.rbd.ImageBusy:
+ children_list = self._get_children_info(volume, snap_name)
+
+ if children_list:
+ for (pool, image) in children_list:
+ LOG.info(_LI('Image %(pool)s/%(image)s is dependent '
+ 'on the snapshot %(snap)s.'),
+ {'pool': pool,
+ 'image': image,
+ 'snap': snap_name})
+
raise exception.SnapshotIsBusy(snapshot_name=snap_name)
volume.remove_snap(snap_name)
})
if volume['host'] != host['host']:
- LOG.error(_LE('Retype with host migration not supported'))
+ LOG.error(_LE('Retype with host migration not supported.'))
return False
if diff['encryption']:
- LOG.error(_LE('Retype of encryption type not supported'))
+ LOG.error(_LE('Retype of encryption type not supported.'))
return False
if diff['extra_specs']:
- LOG.error(_LE('Retype of extra_specs not supported'))
+ LOG.error(_LE('Retype of extra_specs not supported.'))
return False
return True
try:
fsid, pool, image, snapshot = self._parse_location(image_location)
except exception.ImageUnacceptable as e:
- LOG.debug('not cloneable: %s', six.text_type(e))
+ LOG.debug('not cloneable: %s.', e)
return False
if self._get_fsid() != fsid:
- LOG.debug('%s is in a different ceph cluster', image_location)
+ LOG.debug('%s is in a different ceph cluster.', image_location)
return False
if image_meta['disk_format'] != 'raw':
- LOG.debug(("rbd image clone requires image format to be "
- "'raw' but image {0} is '{1}'").format(
- image_location, image_meta['disk_format']))
+ LOG.debug("rbd image clone requires image format to be "
+ "'raw' but image %(image)s is '%(format)s'",
+ {"image", image_location,
+ "format", image_meta['disk_format']})
return False
# check that we can read the image
read_only=True):
return True
except self.rbd.Error as e:
- LOG.debug('Unable to open image %(loc)s: %(err)s',
+ LOG.debug('Unable to open image %(loc)s: %(err)s.',
dict(loc=image_location, err=e))
return False
if tmpdir == self.configuration.volume_tmp_dir:
LOG.warning(_LW('volume_tmp_dir is now deprecated, please use '
- 'image_conversion_dir'))
+ 'image_conversion_dir.'))
# ensure temporary directory exists
if not os.path.exists(tmpdir):