dst_name = u'volume-00000002'
self.cfg.rbd_max_clone_depth = 1
- self.mock_rbd.RBD.return_value.clone.side_effect = (
- self.mock_rbd.RBD.Error)
- with mock.patch.object(self.driver, '_get_clone_depth') as \
- mock_get_clone_depth:
- # Try with no flatten required
- mock_get_clone_depth.return_value = 1
+ with mock.patch.object(self.driver, '_get_clone_info') as \
+ mock_get_clone_info:
+ mock_get_clone_info.return_value = (
+ ('fake_pool', dst_name, '.'.join((dst_name, 'clone_snap'))))
+ with mock.patch.object(self.driver, '_get_clone_depth') as \
+ mock_get_clone_depth:
+ # Try with no flatten required
+ mock_get_clone_depth.return_value = 1
- self.assertRaises(self.mock_rbd.RBD.Error,
- self.driver.create_cloned_volume,
- dict(name=dst_name), dict(name=src_name))
+ self.assertRaises(self.mock_rbd.RBD.Error,
+ self.driver.create_cloned_volume,
+ dict(name=dst_name), dict(name=src_name))
- (self.mock_rbd.Image.return_value.create_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
- (self.mock_rbd.Image.return_value.protect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
- self.assertEqual(
- 1, self.mock_rbd.RBD.return_value.clone.call_count)
- (self.mock_rbd.Image.return_value.unprotect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
- (self.mock_rbd.Image.return_value.remove_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
- self.mock_rbd.Image.return_value.close.assert_called_once_with()
- self.assertTrue(mock_get_clone_depth.called)
+ (self.mock_rbd.Image.return_value.create_snap
+ .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ (self.mock_rbd.Image.return_value.protect_snap
+ .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ self.assertEqual(
+ 1, self.mock_rbd.RBD.return_value.clone.call_count)
+ (self.mock_rbd.Image.return_value.unprotect_snap
+ .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ (self.mock_rbd.Image.return_value.remove_snap
+ .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+
+ # We expect the driver to close both volumes, so 2 is expected
+ self.assertEqual(
+ 2, self.mock_rbd.Image.return_value.close.call_count)
+ self.assertTrue(mock_get_clone_depth.called)
@common_mocks
def test_create_cloned_volume_w_clone_exception(self):
# If clone depth was reached, flatten should have occurred so if it has
# been exceeded then something has gone wrong.
- if depth > CONF.rbd_max_clone_depth:
+ if depth > self.configuration.rbd_max_clone_depth:
raise Exception(_("clone depth exceeds limit of %s") %
- (CONF.rbd_max_clone_depth))
+ (self.configuration.rbd_max_clone_depth))
return self._get_clone_depth(client, parent, depth + 1)
flatten_parent = False
# Do full copy if requested
- if CONF.rbd_max_clone_depth <= 0:
+ if self.configuration.rbd_max_clone_depth <= 0:
with RBDVolumeProxy(self, src_name, read_only=True) as vol:
vol.copy(vol.ioctx, dest_name)
# If source volume is a clone and rbd_max_clone_depth reached,
# flatten the source before cloning. Zero rbd_max_clone_depth means
# infinite is allowed.
- if depth == CONF.rbd_max_clone_depth:
+ if depth == self.configuration.rbd_max_clone_depth:
LOG.debug("maximum clone depth (%d) has been reached - "
"flattening source volume",
- CONF.rbd_max_clone_depth)
+ self.configuration.rbd_max_clone_depth)
flatten_parent = True
src_volume = self.rbd.Image(client.ioctx, src_name)