'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
+ session.query(models.SnapshotMetadata).\
+ filter_by(snapshot_id=snapshot_id).\
+ update({'deleted': True,
+ 'deleted_at': timeutils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
@require_context
pass
@staticmethod
- def _create_snapshot(volume_id, size='0'):
+ def _create_snapshot(volume_id, size='0', metadata=None):
"""Create a snapshot object."""
snap = {}
snap['volume_size'] = size
snap['project_id'] = 'fake'
snap['volume_id'] = volume_id
snap['status'] = "creating"
+ if metadata is not None:
+ snap['metadata'] = metadata
return db.snapshot_create(context.get_admin_context(), snap)
def test_create_delete_snapshot(self):
snapshot_id)
self.volume.delete_volume(self.context, volume['id'])
+ def test_create_delete_snapshot_with_metadata(self):
+ """Test snapshot can be created with metadata and deleted."""
+ test_meta = {'fake_key': 'fake_value'}
+ volume = self._create_volume(0, None)
+ volume_id = volume['id']
+ self.volume.create_volume(self.context, volume_id)
+ snapshot = self._create_snapshot(volume['id'], metadata=test_meta)
+ snapshot_id = snapshot['id']
+ result_meta = {
+ snapshot.snapshot_metadata[0].key:
+ snapshot.snapshot_metadata[0].value}
+ self.assertEqual(result_meta, test_meta)
+
+ self.volume.delete_snapshot(self.context, snapshot_id)
+ self.assertRaises(exception.NotFound,
+ db.snapshot_get,
+ self.context,
+ snapshot_id)
+
def test_cant_delete_volume_in_use(self):
"""Test volume can't be deleted in invalid stats."""
# create a volume and assign to host