@staticmethod
def _create_volume(size=0, snapshot_id=None, image_id=None,
- metadata=None, status="creating"):
+ source_volid=None, metadata=None, status="creating"):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['snapshot_id'] = snapshot_id
vol['image_id'] = image_id
+ vol['source_volid'] = source_volid
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['availability_zone'] = CONF.storage_availability_zone
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_id)
- def _create_volume_from_image(self, expected_status,
- fakeout_copy_image_to_volume=False):
+ def _create_volume_from_image(self, fakeout_copy_image_to_volume=False):
"""Call copy image to volume, Test the status of volume after calling
copying image to volume.
"""
fake_copy_image_to_volume)
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
- volume_id = 1
+ volume_id = self._create_volume(status='creating')['id']
# creating volume testdata
- db.volume_create(self.context,
- {'id': volume_id,
- 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'display_description': 'Test Desc',
- 'size': 20,
- 'status': 'creating',
- 'instance_uuid': None,
- 'host': 'dummy'})
try:
self.volume.create_volume(self.context,
volume_id,
image_id=image_id)
-
- volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], expected_status)
finally:
# cleanup
- db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
+ volume = db.volume_get(self.context, volume_id)
+ return volume
def test_create_volume_from_image_status_available(self):
"""Verify that before copying image to volume, it is in available
state.
"""
- self._create_volume_from_image('available')
+ volume = self._create_volume_from_image()
+ self.assertEqual(volume['status'], 'available')
+ self.volume.delete_volume(self.context, volume['id'])
def test_create_volume_from_image_exception(self):
"""Verify that create volume from image, the volume status is
# clean up
self.volume.delete_volume(self.context, volume['id'])
+ def test_create_volume_from_sourcevol(self):
+ """Test volume can be created from a source volume."""
+ def fake_create_cloned_volume(volume, src_vref):
+ pass
+
+ self.stubs.Set(self.volume.driver, 'create_cloned_volume',
+ fake_create_cloned_volume)
+ volume_src = self._create_volume()
+ self.volume.create_volume(self.context, volume_src['id'])
+ volume_dst = self._create_volume(source_volid=volume_src['id'])
+ self.volume.create_volume(self.context, volume_dst['id'],
+ source_volid=volume_src['id'])
+ self.assertEqual('available',
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).status)
+ self.volume.delete_volume(self.context, volume_dst['id'])
+ self.volume.delete_volume(self.context, volume_src['id'])
+
+ def test_create_volume_from_sourcevol_with_glance_metadata(self):
+ """Test glance metadata can be correctly copied to new volume."""
+ def fake_create_cloned_volume(volume, src_vref):
+ pass
+
+ self.stubs.Set(self.volume.driver, 'create_cloned_volume',
+ fake_create_cloned_volume)
+ volume_src = self._create_volume_from_image()
+ self.volume.create_volume(self.context, volume_src['id'])
+ volume_dst = self._create_volume(source_volid=volume_src['id'])
+ self.volume.create_volume(self.context, volume_dst['id'],
+ source_volid=volume_src['id'])
+ self.assertEqual('available',
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).status)
+ src_glancemeta = db.volume_get(context.get_admin_context(),
+ volume_src['id']).volume_glance_metadata
+ dst_glancemeta = db.volume_get(context.get_admin_context(),
+ volume_dst['id']).volume_glance_metadata
+ for meta_src in src_glancemeta:
+ for meta_dst in dst_glancemeta:
+ if meta_dst.key == meta_src.key:
+ self.assertEquals(meta_dst.value, meta_src.value)
+ self.volume.delete_volume(self.context, volume_src['id'])
+ self.volume.delete_volume(self.context, volume_dst['id'])
+
+ def test_create_volume_from_sourcevol_failed_clone(self):
+ """Test src vol status will be restore by error handling code."""
+ def fake_error_create_cloned_volume(volume, src_vref):
+ db.volume_update(context, src_vref['id'], {'status': 'error'})
+ raise exception.CinderException('fake exception')
+
+ def fake_reschedule_or_reraise(context, volume_id, exc_info,
+ snapshot_id, image_id, request_spec,
+ filter_properties, allow_reschedule):
+ pass
+
+ self.stubs.Set(self.volume, '_reschedule_or_reraise',
+ fake_reschedule_or_reraise)
+ self.stubs.Set(self.volume.driver, 'create_cloned_volume',
+ fake_error_create_cloned_volume)
+ volume_src = self._create_volume()
+ self.volume.create_volume(self.context, volume_src['id'])
+ volume_dst = self._create_volume(0, source_volid=volume_src['id'])
+ self.volume.create_volume(self.context, volume_dst['id'],
+ source_volid=volume_src['id'])
+ self.assertEqual(volume_src['status'], 'creating')
+ self.volume.delete_volume(self.context, volume_dst['id'])
+ self.volume.delete_volume(self.context, volume_src['id'])
+
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""