# calling manager.create_volume.
self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)
+ def _raise_metadata_copy_failure(self, method, dst_vol_id, **kwargs):
+ # MetadataCopyFailure exception will be raised if DB service is Down
+ # while copying the volume glance metadata
+ with mock.patch.object(db, method) as mock_db:
+ mock_db.side_effect = exception.MetadataCopyFailure(
+ reason="Because of DB service down.")
+ self.assertRaises(exception.MetadataCopyFailure,
+ self.volume.create_volume,
+ self.context,
+ dst_vol_id,
+ **kwargs)
+
+ # ensure that status of volume is 'error'
+ vol = db.volume_get(self.context, dst_vol_id)
+ self.assertEqual('error', vol['status'])
+
+ # cleanup resource
+ db.volume_destroy(self.context, dst_vol_id)
+
+ def test_create_volume_from_volume_with_glance_volume_metadata_none(self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ self.volume.create_volume(self.context, src_vol_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, src_vol['id'], {'bootable': True})
+
+ # create volume from source volume
+ dst_vol = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ self.volume.create_volume(self.context,
+ dst_vol['id'],
+ source_volid=src_vol_id)
+
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_glance_metadata_copy_from_volume_to_volume,
+ self.context, src_vol_id, dst_vol['id'])
+
+ # ensure that status of volume is 'available'
+ vol = db.volume_get(self.context, dst_vol['id'])
+ self.assertEqual('available', vol['status'])
+
+ # cleanup resource
+ db.volume_destroy(self.context, src_vol_id)
+ db.volume_destroy(self.context, dst_vol['id'])
+
+ def test_create_volume_from_volume_raise_metadata_copy_failure(
+ self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ self.volume.create_volume(self.context, src_vol_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, src_vol['id'], {'bootable': True})
+
+ # create volume from source volume
+ dst_vol = tests_utils.create_volume(self.context,
+ source_volid=src_vol_id,
+ **self.volume_params)
+ self._raise_metadata_copy_failure(
+ 'volume_glance_metadata_copy_from_volume_to_volume',
+ dst_vol['id'],
+ source_volid=src_vol_id)
+
+ # cleanup resource
+ db.volume_destroy(self.context, src_vol_id)
+
+ def test_create_volume_from_snapshot_raise_metadata_copy_failure(
+ self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ self.volume.create_volume(self.context, src_vol_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, src_vol['id'], {'bootable': True})
+
+ # create volume from snapshot
+ snapshot_id = self._create_snapshot(src_vol['id'])['id']
+ self.volume.create_snapshot(self.context, src_vol['id'], snapshot_id)
+
+ # ensure that status of snapshot is 'available'
+ snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
+ self.assertEqual('available', snapshot_ref)
+
+ dst_vol = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ self._raise_metadata_copy_failure(
+ 'volume_glance_metadata_copy_to_volume',
+ dst_vol['id'],
+ snapshot_id=snapshot_id)
+
+ # cleanup resource
+ db.snapshot_destroy(self.context, snapshot_id)
+ db.volume_destroy(self.context, src_vol_id)
+
+ @mock.patch(
+ 'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
+ def test_create_volume_from_srcreplica_raise_metadata_copy_failure(
+ self, _create_replica_test):
+ _create_replica_test.return_value = None
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ self.volume.create_volume(self.context, src_vol_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, src_vol['id'], {'bootable': True})
+
+ # create volume from source volume
+ dst_vol = tests_utils.create_volume(self.context,
+ source_volid=src_vol_id,
+ **self.volume_params)
+ self._raise_metadata_copy_failure(
+ 'volume_glance_metadata_copy_from_volume_to_volume',
+ dst_vol['id'],
+ source_volid=src_vol_id)
+
+ # cleanup resource
+ db.volume_destroy(self.context, src_vol_id)
+
+ def test_create_volume_from_snapshot_with_glance_volume_metadata_none(
+ self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ self.volume.create_volume(self.context, src_vol_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, src_vol['id'], {'bootable': True})
+
+ volume = db.volume_get(self.context, src_vol_id)
+
+ # create snapshot of volume
+ snapshot_id = self._create_snapshot(volume['id'])['id']
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+
+ # ensure that status of snapshot is 'available'
+ snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
+ self.assertEqual('available', snapshot_ref)
+
+ # create volume from snapshot
+ dst_vol = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ self.volume.create_volume(self.context,
+ dst_vol['id'],
+ snapshot_id=snapshot_id)
+
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_glance_metadata_copy_to_volume,
+ self.context, dst_vol['id'], snapshot_id)
+
+ # ensure that status of volume is 'available'
+ vol = db.volume_get(self.context, dst_vol['id'])
+ self.assertEqual('available', vol['status'])
+
+ # cleanup resource
+ db.snapshot_destroy(self.context, snapshot_id)
+ db.volume_destroy(self.context, src_vol_id)
+ db.volume_destroy(self.context, dst_vol['id'])
+
+ @mock.patch(
+ 'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
+ def test_create_volume_from_srcreplica_with_glance_volume_metadata_none(
+ self, _create_replica_test):
+ """Test volume can be created from a volume replica."""
+ _create_replica_test.return_value = None
+
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ self.volume.create_volume(self.context, volume_src['id'])
+ db.volume_update(self.context, volume_src['id'], {'bootable': True})
+
+ volume = db.volume_get(self.context, volume_src['id'])
+ volume_dst = tests_utils.create_volume(
+ self.context,
+ source_replicaid=volume['id'],
+ **self.volume_params)
+ self.volume.create_volume(self.context, volume_dst['id'],
+ source_replicaid=volume['id'])
+
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_glance_metadata_copy_from_volume_to_volume,
+ self.context, volume_src['id'], volume_dst['id'])
+
+ self.assertEqual('available',
+ db.volume_get(self.context,
+ volume_dst['id']).status)
+ self.assertTrue(_create_replica_test.called)
+
+ # cleanup resource
+ db.volume_destroy(self.context, volume_dst['id'])
+ db.volume_destroy(self.context, volume_src['id'])
+
def test_create_volume_from_snapshot_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
db.snapshot_destroy(ctxt, snap_id)
db.volume_destroy(ctxt, volume_id)
+ def test_create_snapshot_from_bootable_volume_with_volume_metadata_none(
+ self):
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
+ volume_id = volume['id']
+
+ self.volume.create_volume(self.context, volume_id)
+ # set bootable flag of volume to True
+ db.volume_update(self.context, volume_id, {'bootable': True})
+
+ snapshot_id = self._create_snapshot(volume['id'])['id']
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_snapshot_glance_metadata_get,
+ self.context, snapshot_id)
+
+ # ensure that status of snapshot is 'available'
+ snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
+ self.assertEqual('available', snapshot_ref)
+
+ # cleanup resource
+ db.snapshot_destroy(self.context, snapshot_id)
+ db.volume_destroy(self.context, volume_id)
+
def test_delete_busy_snapshot(self):
"""Test snapshot can be created and deleted."""