@patch('cinder.volume.drivers.ibm.gpfs.GPFSDriver._clone_image')
def test_clone_image_pub(self, mock_exec):
- self.driver.clone_image('', '', '', '')
+ self.driver.clone_image('', '', {'id': 1})
@patch('cinder.volume.drivers.ibm.gpfs.GPFSDriver._is_gpfs_path')
def test_is_cloneable_ok(self, mock_is_gpfs_path):
def setUp(self):
super(StorACDriverTestCase, self).setUp()
+
self.fake_conf_file = tempfile.mktemp(suffix='.xml')
self.addCleanup(os.remove, self.fake_conf_file)
drv._post_clone_image(volume)
mox.ReplayAll()
- drv.clone_image(volume, ('image_location', None), 'image_id', {})
+ drv.clone_image(volume, ('image_location', None), {'id': 'image_id'})
mox.VerifyAll()
def get_img_info(self, format):
mox.ReplayAll()
(prop, cloned) = drv. clone_image(
- volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id', {})
+ volume,
+ ('nfs://127.0.0.1:/share/img-id', None),
+ {'id': 'image_id'})
mox.VerifyAll()
if not cloned and not prop['provider_location']:
pass
mox.ReplayAll()
drv. clone_image(
- volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id', {})
+ volume,
+ ('nfs://127.0.0.1:/share/img-id', None),
+ {'id': 'image_id'})
mox.VerifyAll()
def test_clone_image_cloneableshare_notraw(self):
mox.ReplayAll()
drv.clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
+ volume, ('nfs://127.0.0.1/share/img-id', None), {'id': 'image_id'})
mox.VerifyAll()
def test_clone_image_file_not_discovered(self):
mox.ReplayAll()
vol_dict, result = drv. clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
+ volume, ('nfs://127.0.0.1/share/img-id', None), {'id': 'image_id'})
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
mox.ReplayAll()
vol_dict, result = drv. clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
+ volume, ('nfs://127.0.0.1/share/img-id', None), {'id': 'image_id'})
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
image_service.show.return_value = {'size': 1,
'disk_format': 'raw'}
- drv._check_get_nfs_path_segs = mock.Mock(return_value=
- ('ip1', '/openstack'))
+ drv._check_get_nfs_path_segs =\
+ mock.Mock(return_value=('ip1', '/openstack'))
drv._get_ip_verify_on_cluster = mock.Mock(return_value='ip1')
drv._get_host_ip = mock.Mock(return_value='ip2')
drv._get_export_path = mock.Mock(return_value='/exp_path')
None)
image_service.show.return_value = {'size': 1,
'disk_format': 'qcow2'}
- drv._check_get_nfs_path_segs = mock.Mock(return_value=
- ('ip1', '/openstack'))
+ drv._check_get_nfs_path_segs =\
+ mock.Mock(return_value=('ip1', '/openstack'))
drv._get_ip_verify_on_cluster = mock.Mock(return_value='ip1')
drv._get_host_ip = mock.Mock(return_value='ip2')
def test_create_vol_from_image_status_available(self):
"""Clone raw image then verify volume is in available state."""
- def _mock_clone_image(volume, image_location, image_id, image_meta):
+ def _mock_clone_image(volume, image_location, image_meta):
return {'provider_location': None}, True
with mock.patch.object(self.volume.driver, 'clone_image') as \
def test_create_vol_from_non_raw_image_status_available(self):
"""Clone non-raw image then verify volume is in available state."""
- def _mock_clone_image(volume, image_location, image_id, image_meta):
+ def _mock_clone_image(volume, image_location, image_meta):
return {'provider_location': None}, False
with mock.patch.object(self.volume.driver, 'clone_image') as \
with mock.patch.object(driver, '_is_cloneable', lambda *args: False):
image_loc = (mock.Mock(), mock.Mock())
- actual = driver.clone_image(mock.Mock(), image_loc,
- mock.Mock(), {})
+ actual = driver.clone_image(mock.Mock(), image_loc, {})
self.assertEqual(({}, False), actual)
self.assertEqual(({}, False),
- driver.clone_image(object(), None, None, {}))
+ driver.clone_image(object(), None, {}))
def test_clone_success(self):
expected = ({'provider_location': None}, True)
volume = {'name': 'vol1'}
actual = driver.clone_image(volume, image_loc,
- 'id.foo',
- {'disk_format': 'raw'})
+ {'disk_format': 'raw',
+ 'id': 'id.foo'})
self.assertEqual(expected, actual)
mock_clone.assert_called_once_with(volume,
size=None):
pass
- def fake_clone_image(volume_ref, image_location, image_id, image_meta):
+ def fake_clone_image(volume_ref, image_location, image_meta):
return {'provider_location': None}, True
dst_fd, dst_path = tempfile.mkstemp()
{'path': host_device}))
return {'conn': conn, 'device': device, 'connector': connector}
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
"""Create a volume efficiently from an existing image.
image_location is a string whose format depends on the
data['reserved_percentage'] = 0
self._stats = data
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
"""Create a volume from the specified image."""
- return self._clone_image(volume, image_location, image_id)
+ return self._clone_image(volume, image_location, image_meta['id'])
def _is_cloneable(self, image_id):
"""Return true if the specified image can be cloned by GPFS."""
finally:
self.delete_snapshot(temp_snapshot)
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
return None, False
def backup_volume(self, context, backup, backup_service):
LOG.warning(_LW('Exception during deleting %s'), ex.__str__())
return False
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
"""Create a volume efficiently from an existing image.
image_location is a string whose format depends on the
Returns a dict of volume properties eg. provider_location,
boolean indicating whether cloning occurred.
"""
-
+ image_id = image_meta['id']
cloned = False
post_clone = False
try:
dict(loc=image_location, err=e))
return False
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
image_location = image_location[0] if image_location else None
if image_location is None or not self._is_cloneable(
image_location, image_meta):
image_meta,
self.local_path(volume))
- def clone_image(self, volume, image_location, image_id, image_meta):
+ def clone_image(self, volume, image_location, image_meta):
"""Create a volume efficiently from an existing image.
image_location is a string whose format depends on the
# dict containing provider_location for cloned volume
# and clone status.
model_update, cloned = self.driver.clone_image(
- volume_ref, image_location, image_id, image_meta)
+ volume_ref, image_location, image_meta)
if not cloned:
# TODO(harlowja): what needs to be rolled back in the clone if this
# volume create fails?? Likely this should be a subflow or broken