db.snapshot_destroy(self.context, snapshot_ref['id'])
db.volume_destroy(self.context, volume['id'])
+ def test_create_snapshot_from_bootable_volume(self):
+ """Test create snapshot from bootable volume."""
+ # create bootable volume from image
+ volume = self._create_volume_from_image()
+ volume_id = volume['id']
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volume['bootable'], True)
+
+ # get volume's volume_glance_metadata
+ ctxt = context.get_admin_context()
+ vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
+ self.assertTrue(vol_glance_meta)
+
+ # create snapshot from bootable volume
+ snap_id = self._create_snapshot(volume_id)['id']
+ self.volume.create_snapshot(ctxt, volume_id, snap_id)
+
+ # get snapshot's volume_glance_metadata
+ snap_glance_meta = db.volume_snapshot_glance_metadata_get(
+ ctxt, snap_id)
+ self.assertTrue(snap_glance_meta)
+
+ # ensure that volume's glance metadata is copied
+ # to snapshot's glance metadata
+ self.assertEqual(len(vol_glance_meta), len(snap_glance_meta))
+ vol_glance_dict = dict((x.key, x.value) for x in vol_glance_meta)
+ snap_glance_dict = dict((x.key, x.value) for x in snap_glance_meta)
+ self.assertDictMatch(vol_glance_dict, snap_glance_dict)
+
+ # ensure that snapshot's status is changed to 'available'
+ snapshot_ref = db.snapshot_get(ctxt, snap_id)['status']
+ self.assertEqual('available', snapshot_ref)
+
+ # cleanup resource
+ db.snapshot_destroy(ctxt, snap_id)
+ db.volume_destroy(ctxt, volume_id)
+
+ def test_create_snapshot_from_bootable_volume_fail(self):
+ """Test create snapshot from bootable volume.
+
+ But it fails to volume_glance_metadata_copy_to_snapshot.
+ As a result, status of snapshot is changed to ERROR.
+ """
+ # create bootable volume from image
+ volume = self._create_volume_from_image()
+ volume_id = volume['id']
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volume['bootable'], True)
+
+ # get volume's volume_glance_metadata
+ ctxt = context.get_admin_context()
+ vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
+ self.assertTrue(vol_glance_meta)
+ snap = self._create_snapshot(volume_id)
+ snap_id = snap['id']
+ snap_stat = snap['status']
+ self.assertTrue(snap_id)
+ self.assertTrue(snap_stat)
+
+ # set to return DB exception
+ with mock.patch.object(db, 'volume_glance_metadata_copy_to_snapshot')\
+ as mock_db:
+ mock_db.side_effect = exception.MetadataCopyFailure(
+ reason="Because of DB service down.")
+ # create snapshot from bootable volume
+ self.assertRaises(exception.MetadataCopyFailure,
+ self.volume.create_snapshot,
+ ctxt,
+ volume_id,
+ snap_id)
+
+ # get snapshot's volume_glance_metadata
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_snapshot_glance_metadata_get,
+ ctxt, snap_id)
+
+ # ensure that status of snapshot is 'error'
+ snapshot_ref = db.snapshot_get(ctxt, snap_id)['status']
+ self.assertEqual('error', snapshot_ref)
+
+ # cleanup resource
+ db.snapshot_destroy(ctxt, snap_id)
+ db.volume_destroy(ctxt, volume_id)
+
def test_delete_busy_snapshot(self):
"""Test snapshot can be created and deleted."""
snapshot_ref['id'],
{'status': 'error'})
- self.db.snapshot_update(context,
- snapshot_ref['id'], {'status': 'available',
- 'progress': '100%'})
-
vol_ref = self.db.volume_get(context, volume_id)
if vol_ref.bootable:
try:
" %(volume_id)s metadata") %
{'volume_id': volume_id,
'snapshot_id': snapshot_id})
+ self.db.snapshot_update(context,
+ snapshot_ref['id'],
+ {'status': 'error'})
raise exception.MetadataCopyFailure(reason=ex)
+
+ self.db.snapshot_update(context,
+ snapshot_ref['id'], {'status': 'available',
+ 'progress': '100%'})
+
LOG.info(_("snapshot %s: created successfully"), snapshot_ref['id'])
self._notify_about_snapshot_usage(context, snapshot_ref, "create.end")
return snapshot_id