import mock
from oslo_utils import units
+from cinder import context
from cinder import exception
from cinder.i18n import _
import cinder.image.glance
from cinder.image import image_utils
from cinder import objects
from cinder import test
+from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit import test_volume
from cinder.tests.unit import utils
configuration=self.cfg)
self.driver.set_initialized()
- self.volume_name = u'volume-00000001'
- self.snapshot_name = u'snapshot-00000001'
- self.volume_size = 1
- self.volume = dict(name=self.volume_name, size=self.volume_size)
- self.snapshot = dict(volume_name=self.volume_name,
- name=self.snapshot_name)
+ self.context = context.get_admin_context()
+
+ self.volume_a = fake_volume.fake_volume_obj(
+ self.context,
+ **{'name': u'volume-0000000a',
+ 'id': '4c39c3c7-168f-4b32-b585-77f1b3bf0a38',
+ 'size': 10})
+
+ self.volume_b = fake_volume.fake_volume_obj(
+ self.context,
+ **{'name': u'volume-0000000b',
+ 'id': '0c7d1f44-5a06-403f-bb82-ae7ad0d693a6',
+ 'size': 10})
+
+ self.snapshot = fake_snapshot.fake_snapshot_obj(
+ self.context, name='snapshot-0000000a')
@ddt.data({'cluster_name': None, 'pool_name': 'rbd'},
{'cluster_name': 'volumes', 'pool_name': None})
client = self.mock_client.return_value
client.__enter__.return_value = client
- self.driver.create_volume(self.volume)
+ self.driver.create_volume(self.volume_a)
chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
order = int(math.log(chunk_size, 2))
- args = [client.ioctx, str(self.volume_name),
- self.volume_size * units.Gi, order]
+ args = [client.ioctx, str(self.volume_a.name),
+ self.volume_a.size * units.Gi, order]
kwargs = {'old_format': False,
'features': client.features}
self.mock_rbd.RBD.return_value.create.assert_called_once_with(
with mock.patch.object(self.driver.rbd.Image(), 'close') \
as mock_rbd_image_close:
mock_rbd_image_size.return_value = 2 * units.Gi
- existing_ref = {'source-name': self.volume_name}
+ existing_ref = {'source-name': self.volume_a.name}
return_size = self.driver.manage_existing_get_size(
- self.volume,
+ self.volume_a,
existing_ref)
self.assertEqual(2, return_size)
mock_rbd_image_size.assert_called_once_with()
with mock.patch.object(self.driver.rbd.Image(), 'close') \
as mock_rbd_image_close:
mock_rbd_image_size.return_value = 'abcd'
- existing_ref = {'source-name': self.volume_name}
+ existing_ref = {'source-name': self.volume_a.name}
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.manage_existing_get_size,
- self.volume, existing_ref)
+ self.volume_a, existing_ref)
mock_rbd_image_size.assert_called_once_with()
mock_rbd_image_close.assert_called_once_with()
exist_volume = 'vol-exist'
existing_ref = {'source-name': exist_volume}
mock_rbd_image_rename.return_value = 0
- self.driver.manage_existing(self.volume, existing_ref)
+ self.driver.manage_existing(self.volume_a, existing_ref)
mock_rbd_image_rename.assert_called_with(
client.ioctx,
exist_volume,
- self.volume_name)
+ self.volume_a.name)
@common_mocks
def test_manage_existing_with_exist_rbd_image(self):
existing_ref = {'source-name': exist_volume}
self.assertRaises(self.mock_rbd.ImageExists,
self.driver.manage_existing,
- self.volume, existing_ref)
+ self.volume_a, existing_ref)
# Make sure the exception was raised
self.assertEqual(RAISED_EXCEPTIONS,
mock_delete_backup_snaps:
mock_get_clone_info.return_value = (None, None, None)
- self.driver.delete_volume(self.volume)
+ self.driver.delete_volume(self.volume_a)
mock_get_clone_info.assert_called_once_with(
self.mock_rbd.Image.return_value,
- self.volume_name,
+ self.volume_a.name,
None)
(self.driver.rbd.Image.return_value
.list_snaps.assert_called_once_with())
@common_mocks
def delete_volume_not_found(self):
self.mock_rbd.Image.side_effect = self.mock_rbd.ImageNotFound
- self.assertIsNone(self.driver.delete_volume(self.volume))
+ self.assertIsNone(self.driver.delete_volume(self.volume_a))
self.mock_rbd.Image.assert_called_once_with()
# Make sure the exception was raised
self.assertEqual(RAISED_EXCEPTIONS, [self.mock_rbd.ImageNotFound])
with mock.patch.object(driver, 'RADOSClient') as \
mock_rados_client:
self.assertRaises(exception.VolumeIsBusy,
- self.driver.delete_volume, self.volume)
+ self.driver.delete_volume, self.volume_a)
mock_get_clone_info.assert_called_once_with(
self.mock_rbd.Image.return_value,
- self.volume_name,
+ self.volume_a.name,
None)
(self.mock_rbd.Image.return_value.list_snaps
.assert_called_once_with())
mock_delete_backup_snaps:
with mock.patch.object(driver, 'RADOSClient') as \
mock_rados_client:
- self.assertIsNone(self.driver.delete_volume(self.volume))
+ self.assertIsNone(self.driver.delete_volume(self.volume_a))
mock_get_clone_info.assert_called_once_with(
self.mock_rbd.Image.return_value,
- self.volume_name,
+ self.volume_a.name,
None)
(self.mock_rbd.Image.return_value.list_snaps
.assert_called_once_with())
[self.mock_rbd.ImageNotFound])
@common_mocks
- def test_create_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_create_snapshot(self, volume_get_by_id):
+ volume_get_by_id.return_value = self.volume_a
proxy = self.mock_proxy.return_value
proxy.__enter__.return_value = proxy
self.driver.create_snapshot(self.snapshot)
- args = [str(self.snapshot_name)]
+ args = [str(self.snapshot.name)]
proxy.create_snap.assert_called_with(*args)
proxy.protect_snap.assert_called_with(*args)
@common_mocks
- def test_delete_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_delete_snapshot(self, volume_get_by_id):
+ volume_get_by_id.return_value = self.volume_a
proxy = self.mock_proxy.return_value
proxy.__enter__.return_value = proxy
self.driver.delete_snapshot(self.snapshot)
- proxy.remove_snap.assert_called_with(self.snapshot_name)
- proxy.unprotect_snap.assert_called_with(self.snapshot_name)
+ proxy.remove_snap.assert_called_with(self.snapshot.name)
+ proxy.unprotect_snap.assert_called_with(self.snapshot.name)
@common_mocks
- def test_delete_notfound_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_delete_notfound_snapshot(self, volume_get_by_id):
+ volume_get_by_id.return_value = self.volume_a
proxy = self.mock_proxy.return_value
proxy.__enter__.return_value = proxy
self.driver.delete_snapshot(self.snapshot)
- proxy.remove_snap.assert_called_with(self.snapshot_name)
- proxy.unprotect_snap.assert_called_with(self.snapshot_name)
+ proxy.remove_snap.assert_called_with(self.snapshot.name)
+ proxy.unprotect_snap.assert_called_with(self.snapshot.name)
@common_mocks
- def test_delete_unprotected_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_delete_unprotected_snapshot(self, volume_get_by_id):
+ volume_get_by_id.return_value = self.volume_a
proxy = self.mock_proxy.return_value
proxy.__enter__.return_value = proxy
proxy.unprotect_snap.side_effect = self.mock_rbd.InvalidArgument
self.assertTrue(proxy.remove_snap.called)
@common_mocks
- def test_delete_busy_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_delete_busy_snapshot(self, volume_get_by_id):
+ volume_get_by_id.return_value = self.volume_a
proxy = self.mock_proxy.return_value
proxy.__enter__.return_value = proxy
mock_get_children_info.assert_called_once_with(
proxy,
- self.snapshot_name)
+ self.snapshot.name)
self.assertTrue(mock_log.info.called)
self.assertTrue(proxy.unprotect_snap.called)
volume.list_children.return_value = list_children
info = self.driver._get_children_info(volume,
- self.snapshot_name)
+ self.snapshot['name'])
self.assertEqual(list_children, info)
volume = self.mock_rbd.Image()
volume.set_snap = mock.Mock()
volume.parent_info = mock.Mock()
- parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_name))
+ parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_a.name))
volume.parent_info.return_value = parent_info
- info = self.driver._get_clone_info(volume, self.volume_name)
+ info = self.driver._get_clone_info(volume, self.volume_a.name)
self.assertEqual(parent_info, info)
volume = self.mock_rbd.Image()
volume.set_snap = mock.Mock()
volume.parent_info = mock.Mock()
- parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_name))
+ parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_a.name))
volume.parent_info.return_value = parent_info
snapshot = self.mock_rbd.ImageSnapshot()
- info = self.driver._get_clone_info(volume, self.volume_name,
+ info = self.driver._get_clone_info(volume, self.volume_a.name,
snap=snapshot)
self.assertEqual(parent_info, info)
snapshot = self.mock_rbd.ImageSnapshot()
- info = self.driver._get_clone_info(volume, self.volume_name,
+ info = self.driver._get_clone_info(volume, self.volume_a.name,
snap=snapshot)
self.assertEqual((None, None, None), info)
volume = self.mock_rbd.Image()
volume.set_snap = mock.Mock()
volume.parent_info = mock.Mock()
- parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_name))
+ parent_info = ('a', 'b', '%s.clone_snap' % (self.volume_a.name))
volume.parent_info.return_value = parent_info
info = self.driver._get_clone_info(volume,
- "%s.deleted" % (self.volume_name))
+ "%s.deleted" % (self.volume_a.name))
self.assertEqual(parent_info, info)
@common_mocks
def test_create_cloned_volume_same_size(self):
- src_name = u'volume-00000001'
- dst_name = u'volume-00000002'
-
self.cfg.rbd_max_clone_depth = 2
with mock.patch.object(self.driver, '_get_clone_depth') as \
with mock.patch.object(self.driver, '_resize') as mock_resize:
mock_get_clone_depth.return_value = 1
- self.driver.create_cloned_volume({'name': dst_name,
- 'size': 10},
- {'name': src_name,
- 'size': 10})
+ self.driver.create_cloned_volume(self.volume_b, self.volume_a)
(self.mock_rbd.Image.return_value.create_snap
- .assert_called_once_with('.'.join((dst_name,
- 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.protect_snap
- .assert_called_once_with('.'.join((dst_name,
- 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
self.assertEqual(
1, self.mock_rbd.RBD.return_value.clone.call_count)
self.mock_rbd.Image.return_value.close \
@common_mocks
def test_create_cloned_volume_different_size(self):
- src_name = u'volume-00000001'
- dst_name = u'volume-00000002'
-
self.cfg.rbd_max_clone_depth = 2
with mock.patch.object(self.driver, '_get_clone_depth') as \
with mock.patch.object(self.driver, '_resize') as mock_resize:
mock_get_clone_depth.return_value = 1
- self.driver.create_cloned_volume({'name': dst_name,
- 'size': 20},
- {'name': src_name,
- 'size': 10})
+ self.volume_b.size = 20
+ self.driver.create_cloned_volume(self.volume_b, self.volume_a)
(self.mock_rbd.Image.return_value.create_snap
- .assert_called_once_with('.'.join((dst_name,
- 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.protect_snap
- .assert_called_once_with('.'.join((dst_name,
- 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
self.assertEqual(
1, self.mock_rbd.RBD.return_value.clone.call_count)
self.mock_rbd.Image.return_value.close \
@common_mocks
def test_create_cloned_volume_w_flatten(self):
- src_name = u'volume-00000001'
- dst_name = u'volume-00000002'
-
self.cfg.rbd_max_clone_depth = 1
with mock.patch.object(self.driver, '_get_clone_info') as \
mock_get_clone_info:
mock_get_clone_info.return_value = (
- ('fake_pool', dst_name, '.'.join((dst_name, 'clone_snap'))))
+ ('fake_pool', self.volume_b.name,
+ '.'.join((self.volume_b.name, 'clone_snap'))))
with mock.patch.object(self.driver, '_get_clone_depth') as \
mock_get_clone_depth:
# Try with no flatten required
mock_get_clone_depth.return_value = 1
- self.assertRaises(self.mock_rbd.RBD.Error,
- self.driver.create_cloned_volume,
- dict(name=dst_name), dict(name=src_name))
+ self.driver.create_cloned_volume(self.volume_b, self.volume_a)
(self.mock_rbd.Image.return_value.create_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.protect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
self.assertEqual(
1, self.mock_rbd.RBD.return_value.clone.call_count)
(self.mock_rbd.Image.return_value.unprotect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.remove_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
# We expect the driver to close both volumes, so 2 is expected
self.assertEqual(
@common_mocks
def test_create_cloned_volume_w_clone_exception(self):
- src_name = u'volume-00000001'
- dst_name = u'volume-00000002'
-
self.cfg.rbd_max_clone_depth = 2
self.mock_rbd.RBD.return_value.clone.side_effect = (
self.mock_rbd.RBD.Error)
self.assertRaises(self.mock_rbd.RBD.Error,
self.driver.create_cloned_volume,
- {'name': dst_name}, {'name': src_name})
+ self.volume_b, self.volume_a)
(self.mock_rbd.Image.return_value.create_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.protect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
self.assertEqual(
1, self.mock_rbd.RBD.return_value.clone.call_count)
(self.mock_rbd.Image.return_value.unprotect_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
(self.mock_rbd.Image.return_value.remove_snap
- .assert_called_once_with('.'.join((dst_name, 'clone_snap'))))
+ .assert_called_once_with('.'.join(
+ (self.volume_b.name, 'clone_snap'))))
self.mock_rbd.Image.return_value.close.assert_called_once_with()
@common_mocks
with mock.patch.object(self.driver, 'delete_volume'):
with mock.patch.object(self.driver, '_resize'):
mock_image_service = mock.MagicMock()
- args = [None, {'name': 'test', 'size': 1},
+ args = [None, self.volume_a,
mock_image_service, None]
self.driver.copy_image_to_volume(*args)
mock_get_mon_addrs:
mock_get_mon_addrs.return_value = (hosts, ports)
- volume_id = '0a83f0a3-ef6e-47b6-a8aa-20436bc9ed01'
expected = {
'driver_volume_type': 'rbd',
'data': {
'name': '%s/%s' % (self.cfg.rbd_pool,
- self.volume_name),
+ self.volume_a.name),
'hosts': hosts,
'ports': ports,
'auth_enabled': False,
'auth_username': None,
'secret_type': 'ceph',
'secret_uuid': None,
- 'volume_id': volume_id
+ 'volume_id': self.volume_a.id
}
}
- volume = dict(name=self.volume_name, id=volume_id)
- actual = self.driver.initialize_connection(volume, None)
+ actual = self.driver.initialize_connection(self.volume_a, None)
self.assertDictMatch(expected, actual)
self.assertTrue(mock_get_mon_addrs.called)
# capture both rados client used to perform the clone
client.__enter__.side_effect = mock__enter__(client)
- self.driver._clone(self.volume, src_pool, src_image, src_snap)
+ self.driver._clone(self.volume_a, src_pool, src_image, src_snap)
args = [client_stack[0].ioctx, str(src_image), str(src_snap),
- client_stack[1].ioctx, str(self.volume_name)]
+ client_stack[1].ioctx, str(self.volume_a.name)]
kwargs = {'features': client.features}
self.mock_rbd.RBD.return_value.clone.assert_called_once_with(
*args, **kwargs)
@common_mocks
def test_extend_volume(self):
fake_size = '20'
- fake_vol = {'project_id': 'testprjid', 'name': self.volume_name,
- 'size': fake_size,
- 'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
-
size = int(fake_size) * units.Gi
with mock.patch.object(self.driver, '_resize') as mock_resize:
- self.driver.extend_volume(fake_vol, fake_size)
- mock_resize.assert_called_once_with(fake_vol, size=size)
+ self.driver.extend_volume(self.volume_a, fake_size)
+ mock_resize.assert_called_once_with(self.volume_a, size=size)
@common_mocks
def test_retype(self):
with mock.patch.object(self.driver.rbd.RBD(), 'rename') as mock_rename:
context = {}
- current_volume = {'id': 'curr_id',
- 'name': 'curr_name',
- 'provider_location': 'curr_provider_location'}
- original_volume = {'id': 'orig_id',
- 'name': 'orig_name',
- 'provider_location': 'orig_provider_location'}
mock_rename.return_value = 0
model_update = self.driver.update_migrated_volume(context,
- original_volume,
- current_volume,
+ self.volume_a,
+ self.volume_b,
'available')
mock_rename.assert_called_with(client.ioctx,
- 'volume-%s' % current_volume['id'],
- 'volume-%s' % original_volume['id'])
+ 'volume-%s' % self.volume_b.id,
+ 'volume-%s' % self.volume_a.id)
self.assertEqual({'_name_id': None,
'provider_location': None}, model_update)
def test_rbd_volume_proxy_init(self):
mock_driver = mock.Mock(name='driver')
mock_driver._connect_to_rados.return_value = (None, None)
- with driver.RBDVolumeProxy(mock_driver, self.volume_name):
+ with driver.RBDVolumeProxy(mock_driver, self.volume_a.name):
self.assertEqual(1, mock_driver._connect_to_rados.call_count)
self.assertFalse(mock_driver._disconnect_from_rados.called)
mock_driver.reset_mock()
snap = u'snapshot-name'
- with driver.RBDVolumeProxy(mock_driver, self.volume_name,
+ with driver.RBDVolumeProxy(mock_driver, self.volume_a.name,
snapshot=snap):
self.assertEqual(1, mock_driver._connect_to_rados.call_count)
self.assertFalse(mock_driver._disconnect_from_rados.called)
and that clone has rbd_max_clone_depth clones behind it, the source
volume will be flattened.
"""
- src_name = utils.convert_str(src_vref['name'])
- dest_name = utils.convert_str(volume['name'])
+ src_name = utils.convert_str(src_vref.name)
+ dest_name = utils.convert_str(volume.name)
flatten_parent = False
# Do full copy if requested
finally:
src_volume.close()
- if volume['size'] != src_vref['size']:
+ if volume.size != src_vref.size:
LOG.debug("resize volume '%(dst_vol)s' from %(src_size)d to "
"%(dst_size)d",
- {'dst_vol': volume['name'], 'src_size': src_vref['size'],
- 'dst_size': volume['size']})
+ {'dst_vol': volume.name, 'src_size': src_vref.size,
+ 'dst_size': volume.size})
self._resize(volume)
LOG.debug("clone created successfully")
def create_volume(self, volume):
"""Creates a logical volume."""
- size = int(volume['size']) * units.Gi
+ size = int(volume.size) * units.Gi
- LOG.debug("creating volume '%s'", volume['name'])
+ LOG.debug("creating volume '%s'", volume.name)
chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
order = int(math.log(chunk_size, 2))
with RADOSClient(self) as client:
self.RBDProxy().create(client.ioctx,
- utils.convert_str(volume['name']),
+ utils.convert_str(volume.name),
size,
order,
old_format=False,
def _clone(self, volume, src_pool, src_image, src_snap):
LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
dict(pool=src_pool, img=src_image, snap=src_snap,
- dst=volume['name']))
+ dst=volume.name))
with RADOSClient(self, src_pool) as src_client:
with RADOSClient(self) as dest_client:
self.RBDProxy().clone(src_client.ioctx,
utils.convert_str(src_image),
utils.convert_str(src_snap),
dest_client.ioctx,
- utils.convert_str(volume['name']),
+ utils.convert_str(volume.name),
features=src_client.features)
def _resize(self, volume, **kwargs):
size = kwargs.get('size', None)
if not size:
- size = int(volume['size']) * units.Gi
+ size = int(volume.size) * units.Gi
- with RBDVolumeProxy(self, volume['name']) as vol:
+ with RBDVolumeProxy(self, volume.name) as vol:
vol.resize(size)
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
self._clone(volume, self.configuration.rbd_pool,
- snapshot['volume_name'], snapshot['name'])
+ snapshot.volume_name, snapshot.name)
if self.configuration.rbd_flatten_volume_from_snapshot:
- self._flatten(self.configuration.rbd_pool, volume['name'])
- if int(volume['size']):
+ self._flatten(self.configuration.rbd_pool, volume.name)
+ if int(volume.size):
self._resize(volume)
def _delete_backup_snaps(self, rbd_image):
"""Deletes a logical volume."""
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
# utf-8 otherwise librbd will barf.
- volume_name = utils.convert_str(volume['name'])
+ volume_name = utils.convert_str(volume.name)
with RADOSClient(self) as client:
try:
rbd_image = self.rbd.Image(client.ioctx, volume_name)
try:
snaps = rbd_image.list_snaps()
for snap in snaps:
- if snap['name'].endswith('.clone_snap'):
+ if snap.name.endswith('.clone_snap'):
LOG.debug("volume has clone snapshot(s)")
# We grab one of these and use it when fetching parent
# info in case the volume has been flattened.
- clone_snap = snap['name']
+ clone_snap = snap.name
break
raise exception.VolumeIsBusy(volume_name=volume_name)
def create_snapshot(self, snapshot):
"""Creates an rbd snapshot."""
- with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
- snap = utils.convert_str(snapshot['name'])
+ with RBDVolumeProxy(self, snapshot.volume_name) as volume:
+ snap = utils.convert_str(snapshot.name)
volume.create_snap(snap)
volume.protect_snap(snap)
"""Deletes an rbd snapshot."""
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
# utf-8 otherwise librbd will barf.
- volume_name = utils.convert_str(snapshot['volume_name'])
- snap_name = utils.convert_str(snapshot['name'])
+ volume_name = utils.convert_str(snapshot.volume_name)
+ snap_name = utils.convert_str(snapshot.name)
with RBDVolumeProxy(self, volume_name) as volume:
try:
'driver_volume_type': 'rbd',
'data': {
'name': '%s/%s' % (self.configuration.rbd_pool,
- volume['name']),
+ volume.name),
'hosts': hosts,
'ports': ports,
'auth_enabled': (self.configuration.rbd_user is not None),
'auth_username': self.configuration.rbd_user,
'secret_type': 'ceph',
'secret_uuid': self.configuration.rbd_secret_uuid,
- 'volume_id': volume['id'],
+ 'volume_id': volume.id,
}
}
LOG.debug('connection data: %s', data)
image_utils.fetch_to_raw(context, image_service, image_id,
tmp.name,
self.configuration.volume_dd_blocksize,
- size=volume['size'])
+ size=volume.size)
self.delete_volume(volume)
args = ['rbd', 'import',
'--pool', self.configuration.rbd_pool,
'--order', order,
- tmp.name, volume['name'],
+ tmp.name, volume.name,
'--new-format']
args.extend(self._ceph_args())
self._try_execute(*args)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
tmp_dir = self._image_conversion_dir()
tmp_file = os.path.join(tmp_dir,
- volume['name'] + '-' + image_meta['id'])
+ volume.name + '-' + image_meta['id'])
with fileutils.remove_path_on_error(tmp_file):
args = ['rbd', 'export',
'--pool', self.configuration.rbd_pool,
- volume['name'], tmp_file]
+ volume.name, tmp_file]
args.extend(self._ceph_args())
self._try_execute(*args)
image_utils.upload_volume(context, image_service,
def backup_volume(self, context, backup, backup_service):
"""Create a new backup from an existing volume."""
- volume = self.db.volume_get(context, backup['volume_id'])
+ volume = self.db.volume_get(context, backup.volume_id)
- with RBDVolumeProxy(self, volume['name'],
+ with RBDVolumeProxy(self, volume.name,
self.configuration.rbd_pool) as rbd_image:
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
self.configuration.rbd_user,
def restore_backup(self, context, backup, volume, backup_service):
"""Restore an existing backup to a new or existing volume."""
- with RBDVolumeProxy(self, volume['name'],
+ with RBDVolumeProxy(self, volume.name,
self.configuration.rbd_pool) as rbd_image:
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
self.configuration.rbd_user,
self.configuration.rbd_ceph_conf)
rbd_fd = RBDImageIOWrapper(rbd_meta)
- backup_service.restore(backup, volume['id'], rbd_fd)
+ backup_service.restore(backup, volume.id, rbd_fd)
LOG.debug("volume restore complete.")
def extend_volume(self, volume, new_size):
"""Extend an existing volume."""
- old_size = volume['size']
+ old_size = volume.size
try:
size = int(new_size) * units.Gi
self._resize(volume, size=size)
except Exception:
msg = _('Failed to Extend Volume '
- '%(volname)s') % {'volname': volume['name']}
+ '%(volname)s') % {'volname': volume.name}
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
rbd_name = existing_ref['source-name']
self.RBDProxy().rename(client.ioctx,
utils.convert_str(rbd_name),
- utils.convert_str(volume['name']))
+ utils.convert_str(volume.name))
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of an existing image for manage_existing.
name_id = None
provider_location = None
- existing_name = CONF.volume_name_template % new_volume['id']
- wanted_name = CONF.volume_name_template % volume['id']
+ existing_name = CONF.volume_name_template % new_volume.id
+ wanted_name = CONF.volume_name_template % volume.id
with RADOSClient(self) as client:
try:
self.RBDProxy().rename(client.ioctx,
utils.convert_str(wanted_name))
except self.rbd.ImageNotFound:
LOG.error(_LE('Unable to rename the logical volume '
- 'for volume %s.'), volume['id'])
+ 'for volume %s.'), volume.id)
# If the rename fails, _name_id should be set to the new
# volume id and provider_location should be set to the
# one from the new volume as well.
- name_id = new_volume['_name_id'] or new_volume['id']
+ name_id = new_volume._name_id or new_volume.id
provider_location = new_volume['provider_location']
return {'_name_id': name_id, 'provider_location': provider_location}