drv._post_clone_image(volume)
mox.ReplayAll()
- drv. clone_image(volume, ('image_location', None), 'image_id')
+ drv.clone_image(volume, ('image_location', None), 'image_id', {})
mox.VerifyAll()
def get_img_info(self, format):
mox.ReplayAll()
(prop, cloned) = drv. clone_image(
- volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id')
+ volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id', {})
mox.VerifyAll()
if not cloned and not prop['provider_location']:
pass
mox.ReplayAll()
drv. clone_image(
- volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id')
+ volume, ('nfs://127.0.0.1:/share/img-id', None), 'image_id', {})
mox.VerifyAll()
def test_clone_image_cloneableshare_notraw(self):
mox.ReplayAll()
drv. clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id')
+ volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
mox.VerifyAll()
def test_clone_image_file_not_discovered(self):
mox.ReplayAll()
vol_dict, result = drv. clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id')
+ volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
mox.ReplayAll()
vol_dict, result = drv. clone_image(
- volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id')
+ volume, ('nfs://127.0.0.1/share/img-id', None), 'image_id', {})
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
from cinder import units
from cinder.volume import configuration as conf
import cinder.volume.drivers.rbd as driver
+from cinder.volume.flows import create_volume
LOG = logging.getLogger(__name__)
self.assertRaises(exception.ImageUnacceptable,
self.driver._parse_location,
loc)
- self.assertFalse(self.driver._is_cloneable(loc))
+ self.assertFalse(
+ self.driver._is_cloneable(loc, {'disk_format': 'raw'}))
def test_cloneable(self):
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
self.mox.ReplayAll()
- self.assertTrue(self.driver._is_cloneable(location))
+ self.assertTrue(
+ self.driver._is_cloneable(location, {'disk_format': 'raw'}))
def test_uncloneable_different_fsid(self):
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
location = 'rbd://def/pool/image/snap'
- self.assertFalse(self.driver._is_cloneable(location))
+ self.assertFalse(
+ self.driver._is_cloneable(location, {'disk_format': 'raw'}))
def test_uncloneable_unreadable(self):
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
self.mox.ReplayAll()
- self.assertFalse(self.driver._is_cloneable(location))
+ self.assertFalse(
+ self.driver._is_cloneable(location, {'disk_format': 'raw'}))
+
+ def test_uncloneable_bad_format(self):
+ self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
+ location = 'rbd://abc/pool/image/snap'
+ formats = ['qcow2', 'vmdk', 'vdi']
+ for f in formats:
+ self.assertFalse(
+ self.driver._is_cloneable(location, {'disk_format': f}))
def _copy_image(self):
@contextlib.contextmanager
super(ManagedRBDTestCase, self).setUp()
fake_image.stub_out_image_service(self.stubs)
self.volume.driver.set_initialized()
+ self.called = []
- def _clone_volume_from_image(self, expected_status,
- clone_works=True):
+ def _create_volume_from_image(self, expected_status, raw=False,
+ clone_error=False):
"""Try to clone a volume from an image, and check the status
afterwards.
- """
- def fake_clone_image(volume, image_location, image_id):
- return {'provider_location': None}, True
- def fake_clone_error(volume, image_location, image_id):
- raise exception.CinderException()
+ NOTE: if clone_error is True we force the image type to raw otherwise
+ clone_image is not called
+ """
+ def mock_clone_image(volume, image_location, image_id, image_meta):
+ self.called.append('clone_image')
+ if clone_error:
+ raise exception.CinderException()
+ else:
+ return {'provider_location': None}, True
- self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
- if clone_works:
- self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_image)
+ # See tests.image.fake for image types.
+ if raw:
+ image_id = '155d900f-4e14-4e4c-a73d-069cbf4541e6'
else:
- self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_error)
+ image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
- image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume_id = 1
+
# creating volume testdata
db.volume_create(self.context,
{'id': volume_id,
'status': 'creating',
'instance_uuid': None,
'host': 'dummy'})
- try:
- if clone_works:
- self.volume.create_volume(self.context,
- volume_id,
- image_id=image_id)
- else:
- self.assertRaises(exception.CinderException,
- self.volume.create_volume,
- self.context,
- volume_id,
- image_id=image_id)
-
- volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], expected_status)
- finally:
- # cleanup
- db.volume_destroy(self.context, volume_id)
- def test_create_vol_from_image_status_available(self):
- """Verify that before cloning, an image is in the available state."""
- self._clone_volume_from_image('available', True)
-
- def test_create_vol_from_image_status_error(self):
- """Verify that before cloning, an image is in the available state."""
- self._clone_volume_from_image('error', False)
+ mpo = mock.patch.object
+ with mpo(self.volume.driver, 'create_volume') as mock_create_volume:
+ with mpo(self.volume.driver, 'clone_image', mock_clone_image):
+ with mpo(create_volume.CreateVolumeFromSpecTask,
+ '_copy_image_to_volume') as mock_copy_image_to_volume:
+
+ try:
+ if not clone_error:
+ self.volume.create_volume(self.context,
+ volume_id,
+ image_id=image_id)
+ else:
+ self.assertRaises(exception.CinderException,
+ self.volume.create_volume,
+ self.context,
+ volume_id,
+ image_id=image_id)
+
+ volume = db.volume_get(self.context, volume_id)
+ self.assertEqual(volume['status'], expected_status)
+ finally:
+ # cleanup
+ db.volume_destroy(self.context, volume_id)
+
+ self.assertEqual(self.called, ['clone_image'])
+ mock_create_volume.assert_called()
+ mock_copy_image_to_volume.assert_called()
- def test_clone_image(self):
- # Test Failure Case(s)
- expected = ({}, False)
+ def test_create_vol_from_image_status_available(self):
+ """Clone raw image then verify volume is in available state."""
+ self._create_volume_from_image('available', raw=True)
- self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: False)
- image_loc = (object(), object())
- actual = self.volume.driver.clone_image(object(), image_loc, object())
- self.assertEqual(expected, actual)
+ def test_create_vol_from_non_raw_image_status_available(self):
+ """Clone non-raw image then verify volume is in available state."""
+ self._create_volume_from_image('available', raw=False)
- self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
- self.assertEqual(expected,
- self.volume.driver.clone_image(object(), None, None))
+ def test_create_vol_from_image_status_error(self):
+ """Fail to clone raw image then verify volume is in error state."""
+ self._create_volume_from_image('error', raw=True, clone_error=True)
- # Test Success Case(s)
- expected = ({'provider_location': None}, True)
+ def test_clone_failure(self):
+ driver = self.volume.driver
- self.stubs.Set(self.volume.driver, '_parse_location',
- lambda x: ('a', 'b', 'c', 'd'))
+ with mock.patch.object(driver, '_is_cloneable', lambda *args: False):
+ image_loc = (mock.Mock(), mock.Mock())
+ actual = driver.clone_image(mock.Mock(), image_loc,
+ mock.Mock(), {})
+ self.assertEqual(({}, False), actual)
- self.stubs.Set(self.volume.driver, '_clone', lambda *args: None)
- self.stubs.Set(self.volume.driver, '_resize', lambda *args: None)
- actual = self.volume.driver.clone_image(object(), image_loc, object())
- self.assertEqual(expected, actual)
+ self.assertEqual(({}, False),
+ driver.clone_image(object(), None, None, {}))
def test_clone_success(self):
- self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
- self.stubs.Set(self.volume.driver, 'clone_image', lambda a, b, c: True)
- image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
- self.assertTrue(self.volume.driver.clone_image({}, image_id, image_id))
+ expected = ({'provider_location': None}, True)
+ driver = self.volume.driver
+ mpo = mock.patch.object
+ with mpo(driver, '_is_cloneable', lambda *args: True):
+ with mpo(driver, '_parse_location', lambda x: (1, 2, 3, 4)):
+ with mpo(driver, '_clone') as mock_clone:
+ with mpo(driver, '_resize') as mock_resize:
+ image_loc = (mock.Mock(), mock.Mock())
+ actual = driver.clone_image(mock.Mock(),
+ image_loc,
+ mock.Mock(),
+ {'disk_format': 'raw'})
+ self.assertEqual(expected, actual)
+ mock_clone.assert_called()
+ mock_resize.assert_called()