client = self.mock_client.return_value
client.__enter__.return_value = client
- with mock.patch.object(self.driver, '_supports_layering') as \
- mock_supports_layering:
- mock_supports_layering.return_value = True
-
- self.driver.create_volume(self.volume)
-
- chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
- order = int(math.log(chunk_size, 2))
- args = [client.ioctx, str(self.volume_name),
- self.volume_size * units.Gi, order]
- kwargs = {'old_format': False,
- 'features': self.mock_rbd.RBD_FEATURE_LAYERING}
- self.mock_rbd.RBD.return_value.create.assert_called_once_with(
- *args, **kwargs)
- client.__enter__.assert_called_once_with()
- client.__exit__.assert_called_once_with(None, None, None)
- mock_supports_layering.assert_called_once_with()
+ self.driver.create_volume(self.volume)
+
+ chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
+ order = int(math.log(chunk_size, 2))
+ args = [client.ioctx, str(self.volume_name),
+ self.volume_size * units.Gi, order]
+ kwargs = {'old_format': False,
+ 'features': self.mock_rbd.RBD_FEATURE_LAYERING}
+ self.mock_rbd.RBD.return_value.create.assert_called_once_with(
+ *args, **kwargs)
+ client.__enter__.assert_called_once_with()
+ client.__exit__.assert_called_once_with(None, None, None)
@common_mocks
def test_manage_existing_get_size(self):
self.assertEqual(RAISED_EXCEPTIONS,
[self.mock_rbd.ImageExists])
- @common_mocks
- def test_create_volume_no_layering(self):
- client = self.mock_client.return_value
- client.__enter__.return_value = client
-
- with mock.patch.object(self.driver, '_supports_layering') as \
- mock_supports_layering:
- mock_supports_layering.return_value = False
-
- self.driver.create_volume(self.volume)
-
- chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
- order = int(math.log(chunk_size, 2))
- args = [client.ioctx, str(self.volume_name),
- self.volume_size * units.Gi, order]
- kwargs = {'old_format': True,
- 'features': 0}
- self.mock_rbd.RBD.return_value.create.assert_called_once_with(
- *args, **kwargs)
- client.__enter__.assert_called_once_with()
- client.__exit__.assert_called_once_with(None, None, None)
- mock_supports_layering.assert_called_once_with()
-
@common_mocks
def test_delete_backup_snaps(self):
self.driver.rbd.Image.remove_snap = mock.Mock()
self._update_volume_stats()
return self._stats
- def _supports_layering(self):
- return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
-
def _get_clone_depth(self, client, volume_name, depth=0):
"""Returns the number of ancestral clones (if any) of the given volume.
"""
LOG.debug("creating volume '%s'" % (volume['name']))
- old_format = True
- features = 0
chunk_size = CONF.rbd_store_chunk_size * units.Mi
order = int(math.log(chunk_size, 2))
- if self._supports_layering():
- old_format = False
- features = self.rbd.RBD_FEATURE_LAYERING
+ features = self.rbd.RBD_FEATURE_LAYERING
with RADOSClient(self) as client:
self.rbd.RBD().create(client.ioctx,
encodeutils.safe_encode(volume['name']),
size,
order,
- old_format=old_format,
+ old_format=False,
features=features)
def _flatten(self, pool, volume_name):
with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
snap = encodeutils.safe_encode(snapshot['name'])
volume.create_snap(snap)
- if self._supports_layering():
- volume.protect_snap(snap)
+ volume.protect_snap(snap)
def delete_snapshot(self, snapshot):
"""Deletes an rbd snapshot."""
volume_name = encodeutils.safe_encode(snapshot['volume_name'])
snap_name = encodeutils.safe_encode(snapshot['name'])
with RBDVolumeProxy(self, volume_name) as volume:
- if self._supports_layering():
- try:
- volume.unprotect_snap(snap_name)
- except self.rbd.ImageBusy:
- raise exception.SnapshotIsBusy(snapshot_name=snap_name)
+ try:
+ volume.unprotect_snap(snap_name)
+ except self.rbd.ImageBusy:
+ raise exception.SnapshotIsBusy(snapshot_name=snap_name)
volume.remove_snap(snap_name)
def retype(self, context, volume, new_type, diff, host):
args = ['rbd', 'import',
'--pool', self.configuration.rbd_pool,
'--order', order,
- tmp.name, volume['name']]
- if self._supports_layering():
- args.append('--new-format')
+ tmp.name, volume['name'],
+ '--new-format']
args.extend(self._ceph_args())
self._try_execute(*args)
self._resize(volume)