from cinder.api import extensions
import cinder.api.openstack
from cinder.api.v1 import limits
+from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
from cinder.api.v1 import types
from cinder.api.v1 import volume_metadata
collection={'detail': 'GET'},
member={'action': 'POST'})
+ self.resources['snapshot_metadata'] = \
+ snapshot_metadata.create_resource()
+ snapshot_metadata_controller = self.resources['snapshot_metadata']
+
+ mapper.resource("snapshot_metadata", "metadata",
+ controller=snapshot_metadata_controller,
+ parent_resource=dict(member_name='snapshot',
+ collection_name='snapshots'))
+
self.resources['limits'] = limits.create_resource()
mapper.resource("limit", "limits",
controller=self.resources['limits'])
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import webob
+
+from cinder.api import common
+from cinder.api.openstack import wsgi
+from cinder import exception
+from cinder import volume
+from webob import exc
+
+
+class Controller(object):
+ """ The volume metadata API controller for the OpenStack API """
+
+ def __init__(self):
+ self.volume_api = volume.API()
+ super(Controller, self).__init__()
+
+ def _get_metadata(self, context, snapshot_id):
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ meta = self.volume_api.get_snapshot_metadata(context, snapshot)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+ return meta
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ def index(self, req, snapshot_id):
+ """ Returns the list of metadata for a given snapshot"""
+ context = req.environ['cinder.context']
+ return {'metadata': self._get_metadata(context, snapshot_id)}
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ @wsgi.deserializers(xml=common.MetadataDeserializer)
+ def create(self, req, snapshot_id, body):
+ try:
+ metadata = body['metadata']
+ except (KeyError, TypeError):
+ msg = _("Malformed request body")
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ context = req.environ['cinder.context']
+
+ new_metadata = self._update_snapshot_metadata(context,
+ snapshot_id,
+ metadata,
+ delete=False)
+
+ return {'metadata': new_metadata}
+
+ @wsgi.serializers(xml=common.MetaItemTemplate)
+ @wsgi.deserializers(xml=common.MetaItemDeserializer)
+ def update(self, req, snapshot_id, id, body):
+ try:
+ meta_item = body['meta']
+ except (TypeError, KeyError):
+ expl = _('Malformed request body')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ if id not in meta_item:
+ expl = _('Request body and URI mismatch')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ if len(meta_item) > 1:
+ expl = _('Request body contains too many items')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ context = req.environ['cinder.context']
+ self._update_snapshot_metadata(context,
+ snapshot_id,
+ meta_item,
+ delete=False)
+
+ return {'meta': meta_item}
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ @wsgi.deserializers(xml=common.MetadataDeserializer)
+ def update_all(self, req, snapshot_id, body):
+ try:
+ metadata = body['metadata']
+ except (TypeError, KeyError):
+ expl = _('Malformed request body')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ context = req.environ['cinder.context']
+ new_metadata = self._update_snapshot_metadata(context,
+ snapshot_id,
+ metadata,
+ delete=True)
+
+ return {'metadata': new_metadata}
+
+ def _update_snapshot_metadata(self, context,
+ snapshot_id, metadata,
+ delete=False):
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ return self.volume_api.update_snapshot_metadata(context,
+ snapshot,
+ metadata,
+ delete)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+
+ except (ValueError, AttributeError):
+ msg = _("Malformed request body")
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ except exception.InvalidVolumeMetadata as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
+
+ except exception.InvalidVolumeMetadataSize as error:
+ raise exc.HTTPRequestEntityTooLarge(explanation=unicode(error))
+
+ @wsgi.serializers(xml=common.MetaItemTemplate)
+ def show(self, req, snapshot_id, id):
+ """ Return a single metadata item """
+ context = req.environ['cinder.context']
+ data = self._get_metadata(context, snapshot_id)
+
+ try:
+ return {'meta': {id: data[id]}}
+ except KeyError:
+ msg = _("Metadata item was not found")
+ raise exc.HTTPNotFound(explanation=msg)
+
+ def delete(self, req, snapshot_id, id):
+ """ Deletes an existing metadata """
+ context = req.environ['cinder.context']
+
+ metadata = self._get_metadata(context, snapshot_id)
+
+ if id not in metadata:
+ msg = _("Metadata item was not found")
+ raise exc.HTTPNotFound(explanation=msg)
+
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ self.volume_api.delete_snapshot_metadata(context, snapshot, id)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+ return webob.Response(status_int=200)
+
+
+def create_resource():
+ return wsgi.Resource(Controller())
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
+ if snapshot.get('snapshot_metadata'):
+ metadata = snapshot.get('snapshot_metadata')
+ d['metadata'] = dict((item['key'], item['value']) for item in metadata)
+ # avoid circular ref when vol is a Volume instance
+ elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
+ dict):
+ d['metadata'] = snapshot['metadata']
+ else:
+ d['metadata'] = {}
return d
elem.set('display_name')
elem.set('display_description')
elem.set('volume_id')
+ elem.append(common.MetadataTemplate())
class SnapshotTemplate(xmlutil.TemplateBuilder):
@wsgi.serializers(xml=SnapshotTemplate)
def create(self, req, body):
"""Creates a new snapshot."""
+ kwargs = {}
context = req.environ['cinder.context']
if not self.is_valid_body(body, 'snapshot'):
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
+ kwargs['metadata'] = snapshot.get('metadata', None)
+
volume_id = snapshot['volume_id']
volume = self.volume_api.get(context, volume_id)
force = snapshot.get('force', False)
context,
volume,
snapshot.get('display_name'),
- snapshot.get('display_description'))
+ snapshot.get('display_description'),
+ **kwargs)
else:
new_snapshot = self.volume_api.create_snapshot(
context,
volume,
snapshot.get('display_name'),
- snapshot.get('display_description'))
+ snapshot.get('display_description'),
+ **kwargs)
retval = _translate_snapshot_detail_view(context, new_snapshot)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import webob
+
+from cinder.api import common
+from cinder.api.openstack import wsgi
+from cinder import exception
+from cinder import volume
+from webob import exc
+
+
+class Controller(object):
+ """ The volume metadata API controller for the OpenStack API """
+
+ def __init__(self):
+ self.volume_api = volume.API()
+ super(Controller, self).__init__()
+
+ def _get_metadata(self, context, snapshot_id):
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ meta = self.volume_api.get_snapshot_metadata(context, snapshot)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+ return meta
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ def index(self, req, snapshot_id):
+ """ Returns the list of metadata for a given snapshot"""
+ context = req.environ['cinder.context']
+ return {'metadata': self._get_metadata(context, snapshot_id)}
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ @wsgi.deserializers(xml=common.MetadataDeserializer)
+ def create(self, req, snapshot_id, body):
+ try:
+ metadata = body['metadata']
+ except (KeyError, TypeError):
+ msg = _("Malformed request body")
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ context = req.environ['cinder.context']
+
+ new_metadata = self._update_snapshot_metadata(context,
+ snapshot_id,
+ metadata,
+ delete=False)
+
+ return {'metadata': new_metadata}
+
+ @wsgi.serializers(xml=common.MetaItemTemplate)
+ @wsgi.deserializers(xml=common.MetaItemDeserializer)
+ def update(self, req, snapshot_id, id, body):
+ try:
+ meta_item = body['meta']
+ except (TypeError, KeyError):
+ expl = _('Malformed request body')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ if id not in meta_item:
+ expl = _('Request body and URI mismatch')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ if len(meta_item) > 1:
+ expl = _('Request body contains too many items')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ context = req.environ['cinder.context']
+ self._update_snapshot_metadata(context,
+ snapshot_id,
+ meta_item,
+ delete=False)
+
+ return {'meta': meta_item}
+
+ @wsgi.serializers(xml=common.MetadataTemplate)
+ @wsgi.deserializers(xml=common.MetadataDeserializer)
+ def update_all(self, req, snapshot_id, body):
+ try:
+ metadata = body['metadata']
+ except (TypeError, KeyError):
+ expl = _('Malformed request body')
+ raise exc.HTTPBadRequest(explanation=expl)
+
+ context = req.environ['cinder.context']
+ new_metadata = self._update_snapshot_metadata(context,
+ snapshot_id,
+ metadata,
+ delete=True)
+
+ return {'metadata': new_metadata}
+
+ def _update_snapshot_metadata(self, context,
+ snapshot_id, metadata,
+ delete=False):
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ return self.volume_api.update_snapshot_metadata(context,
+ snapshot,
+ metadata,
+ delete)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+
+ except (ValueError, AttributeError):
+ msg = _("Malformed request body")
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ except exception.InvalidVolumeMetadata as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
+
+ except exception.InvalidVolumeMetadataSize as error:
+ raise exc.HTTPRequestEntityTooLarge(explanation=unicode(error))
+
+ @wsgi.serializers(xml=common.MetaItemTemplate)
+ def show(self, req, snapshot_id, id):
+ """ Return a single metadata item """
+ context = req.environ['cinder.context']
+ data = self._get_metadata(context, snapshot_id)
+
+ try:
+ return {'meta': {id: data[id]}}
+ except KeyError:
+ msg = _("Metadata item was not found")
+ raise exc.HTTPNotFound(explanation=msg)
+
+ def delete(self, req, snapshot_id, id):
+ """ Deletes an existing metadata """
+ context = req.environ['cinder.context']
+
+ metadata = self._get_metadata(context, snapshot_id)
+
+ if id not in metadata:
+ msg = _("Metadata item was not found")
+ raise exc.HTTPNotFound(explanation=msg)
+
+ try:
+ snapshot = self.volume_api.get_snapshot(context, snapshot_id)
+ self.volume_api.delete_snapshot_metadata(context, snapshot, id)
+ except exception.SnapshotNotFound:
+ msg = _('snapshot does not exist')
+ raise exc.HTTPNotFound(explanation=msg)
+ return webob.Response(status_int=200)
+
+
+def create_resource():
+ return wsgi.Resource(Controller())
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
+ if snapshot.get('snapshot_metadata'):
+ metadata = snapshot.get('snapshot_metadata')
+ d['metadata'] = dict((item['key'], item['value']) for item in metadata)
+ # avoid circular ref when vol is a Volume instance
+ elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
+ dict):
+ d['metadata'] = snapshot['metadata']
+ else:
+ d['metadata'] = {}
return d
elem.set('name')
elem.set('description')
elem.set('volume_id')
+ elem.append(common.MetadataTemplate())
class SnapshotTemplate(xmlutil.TemplateBuilder):
@wsgi.serializers(xml=SnapshotTemplate)
def create(self, req, body):
"""Creates a new snapshot."""
+ kwargs = {}
context = req.environ['cinder.context']
if not self.is_valid_body(body, 'snapshot'):
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
+ kwargs['metadata'] = snapshot.get('metadata', None)
+
volume_id = snapshot['volume_id']
volume = self.volume_api.get(context, volume_id)
force = snapshot.get('force', False)
context,
volume,
snapshot.get('display_name'),
- snapshot.get('description'))
+ snapshot.get('description'),
+ **kwargs)
else:
new_snapshot = self.volume_api.create_snapshot(
context,
volume,
snapshot.get('display_name'),
- snapshot.get('description'))
+ snapshot.get('description'),
+ **kwargs)
retval = _translate_snapshot_detail_view(context, new_snapshot)
####################
+def snapshot_metadata_get(context, snapshot_id):
+ """Get all metadata for a snapshot."""
+ return IMPL.snapshot_metadata_get(context, snapshot_id)
+
+
+def snapshot_metadata_delete(context, snapshot_id, key):
+ """Delete the given metadata item."""
+ IMPL.snapshot_metadata_delete(context, snapshot_id, key)
+
+
+def snapshot_metadata_update(context, snapshot_id, metadata, delete):
+ """Update metadata if it exists, otherwise create it."""
+ IMPL.snapshot_metadata_update(context, snapshot_id, metadata, delete)
+
+
+####################
+
+
def volume_metadata_get(context, volume_id):
"""Get all metadata for a volume."""
return IMPL.volume_metadata_get(context, volume_id)
@require_context
def snapshot_create(context, values):
+ values['snapshot_metadata'] = _metadata_refs(values.get('metadata'),
+ models.SnapshotMetadata)
snapshot_ref = models.Snapshot()
if not values.get('id'):
values['id'] = str(uuid.uuid4())
session = get_session()
with session.begin():
snapshot_ref.save(session=session)
- return snapshot_ref
+
+ return snapshot_get(context, values['id'], session=session)
@require_admin_context
snapshot_ref.update(values)
snapshot_ref.save(session=session)
+####################
+
+
+def _snapshot_metadata_get_query(context, snapshot_id, session=None):
+ return model_query(context, models.SnapshotMetadata,
+ session=session, read_deleted="no").\
+ filter_by(snapshot_id=snapshot_id)
+
+
+@require_context
+@require_snapshot_exists
+def snapshot_metadata_get(context, snapshot_id):
+ rows = _snapshot_metadata_get_query(context, snapshot_id).all()
+ result = {}
+ for row in rows:
+ result[row['key']] = row['value']
+
+ return result
+
+
+@require_context
+@require_snapshot_exists
+def snapshot_metadata_delete(context, snapshot_id, key):
+ _snapshot_metadata_get_query(context, snapshot_id).\
+ filter_by(key=key).\
+ update({'deleted': True,
+ 'deleted_at': timeutils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
+@require_snapshot_exists
+def snapshot_metadata_get_item(context, snapshot_id, key, session=None):
+ result = _snapshot_metadata_get_query(context,
+ snapshot_id,
+ session=session).\
+ filter_by(key=key).\
+ first()
+
+ if not result:
+ raise exception.SnapshotMetadataNotFound(metadata_key=key,
+ snapshot_id=snapshot_id)
+ return result
+
+
+@require_context
+@require_snapshot_exists
+def snapshot_metadata_update(context, snapshot_id, metadata, delete):
+ session = get_session()
+
+ # Set existing metadata to deleted if delete argument is True
+ if delete:
+ original_metadata = snapshot_metadata_get(context, snapshot_id)
+ for meta_key, meta_value in original_metadata.iteritems():
+ if meta_key not in metadata:
+ meta_ref = snapshot_metadata_get_item(context, snapshot_id,
+ meta_key, session)
+ meta_ref.update({'deleted': True})
+ meta_ref.save(session=session)
+
+ meta_ref = None
+
+ # Now update all existing items with new values, or create new meta objects
+ for meta_key, meta_value in metadata.items():
+
+ # update the value whether it exists or not
+ item = {"value": meta_value}
+
+ try:
+ meta_ref = snapshot_metadata_get_item(context, snapshot_id,
+ meta_key, session)
+ except exception.SnapshotMetadataNotFound as e:
+ meta_ref = models.SnapshotMetadata()
+ item.update({"key": meta_key, "snapshot_id": snapshot_id})
+
+ meta_ref.update(item)
+ meta_ref.save(session=session)
+
+ return metadata
###################
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Boolean, Column, DateTime
+from sqlalchemy import Integer, MetaData, String, Table, ForeignKey
+
+from cinder.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def upgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ snapshots = Table('snapshots', meta, autoload=True)
+
+ # New table
+ snapshot_metadata = Table(
+ 'snapshot_metadata', meta,
+ Column('created_at', DateTime),
+ Column('updated_at', DateTime),
+ Column('deleted_at', DateTime),
+ Column('deleted', Boolean),
+ Column('id', Integer, primary_key=True, nullable=False),
+ Column('snapshot_id', String(length=36), ForeignKey('snapshots.id'),
+ nullable=False),
+ Column('key', String(length=255)),
+ Column('value', String(length=255)),
+ mysql_engine='InnoDB'
+ )
+
+ try:
+ snapshot_metadata.create()
+ except Exception:
+ LOG.error(_("Table |%s| not created!"), repr(snapshot_metadata))
+ raise
+
+
+def downgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+ snapshot_metadata = Table('snapshot_metadata',
+ meta,
+ autoload=True)
+ try:
+ snapshot_metadata.drop()
+ except Exception:
+ LOG.error(_("snapshot_metadata table not dropped"))
'Snapshot.deleted == False)')
+class SnapshotMetadata(BASE, CinderBase):
+ """Represents a metadata key/value pair for a snapshot."""
+ __tablename__ = 'snapshot_metadata'
+ id = Column(Integer, primary_key=True)
+ key = Column(String(255))
+ value = Column(String(255))
+ snapshot_id = Column(String(36),
+ ForeignKey('snapshots.id'),
+ nullable=False)
+ snapshot = relationship(Snapshot, backref="snapshot_metadata",
+ foreign_keys=snapshot_id,
+ primaryjoin='and_('
+ 'SnapshotMetadata.snapshot_id == Snapshot.id,'
+ 'SnapshotMetadata.deleted == False)')
+
+
class IscsiTarget(BASE, CinderBase):
"""Represents an iscsi target for a given host."""
__tablename__ = 'iscsi_targets'
SMVolume,
Volume,
VolumeMetadata,
+ SnapshotMetadata,
VolumeTypeExtraSpecs,
VolumeTypes,
VolumeGlanceMetadata,
message = _("Invalid metadata size") + ": %(reason)s"
+class SnapshotMetadataNotFound(NotFound):
+ message = _("Snapshot %(snapshot_id)s has no metadata with "
+ "key %(metadata_key)s.")
+
+
+class InvalidSnapshotMetadata(Invalid):
+ message = _("Invalid metadata") + ": %(reason)s"
+
+
+class InvalidSnapshotMetadataSize(Invalid):
+ message = _("Invalid metadata size") + ": %(reason)s"
+
+
class VolumeTypeNotFound(NotFound):
message = _("Volume type %(volume_type_id)s could not be found.")
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import webob
+
+from cinder.api import extensions
+from cinder.api.v1 import snapshot_metadata
+from cinder.api.v1 import snapshots
+import cinder.db
+from cinder import exception
+from cinder.openstack.common import cfg
+from cinder.openstack.common import jsonutils
+from cinder import test
+from cinder.tests.api import fakes
+
+
+CONF = cfg.CONF
+
+
+def return_create_snapshot_metadata_max(context,
+ snapshot_id,
+ metadata,
+ delete):
+ return stub_max_snapshot_metadata()
+
+
+def return_create_snapshot_metadata(context, snapshot_id, metadata, delete):
+ return stub_snapshot_metadata()
+
+
+def return_snapshot_metadata(context, snapshot_id):
+ if not isinstance(snapshot_id, str) or not len(snapshot_id) == 36:
+ msg = 'id %s must be a uuid in return snapshot metadata' % snapshot_id
+ raise Exception(msg)
+ return stub_snapshot_metadata()
+
+
+def return_empty_snapshot_metadata(context, snapshot_id):
+ return {}
+
+
+def delete_snapshot_metadata(context, snapshot_id, key):
+ pass
+
+
+def stub_snapshot_metadata():
+ metadata = {
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ }
+ return metadata
+
+
+def stub_max_snapshot_metadata():
+ metadata = {"metadata": {}}
+ for num in range(CONF.quota_metadata_items):
+ metadata['metadata']['key%i' % num] = "blah"
+ return metadata
+
+
+def return_snapshot(context, snapshot_id):
+ return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
+ 'name': 'fake',
+ 'status': 'available',
+ 'metadata': {}}
+
+
+def return_volume(context, volume_id):
+ return {'id': 'fake-vol-id',
+ 'size': 100,
+ 'name': 'fake',
+ 'host': 'fake-host',
+ 'status': 'available',
+ 'metadata': {}}
+
+
+def return_snapshot_nonexistent(context, snapshot_id):
+ raise exception.SnapshotNotFound('bogus test message')
+
+
+def fake_update_snapshot_metadata(self, context, snapshot, diff):
+ pass
+
+
+class SnapshotMetaDataTest(test.TestCase):
+
+ def setUp(self):
+ super(SnapshotMetaDataTest, self).setUp()
+ self.volume_api = cinder.volume.api.API()
+ fakes.stub_out_key_pair_funcs(self.stubs)
+ self.stubs.Set(cinder.db, 'volume_get', return_volume)
+ self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+
+ self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
+ fake_update_snapshot_metadata)
+
+ self.ext_mgr = extensions.ExtensionManager()
+ self.ext_mgr.extensions = {}
+ self.snapshot_controller = snapshots.SnapshotsController(self.ext_mgr)
+ self.controller = snapshot_metadata.Controller()
+ self.id = str(uuid.uuid4())
+ self.url = '/v1/fake/snapshots/%s/metadata' % self.id
+
+ snap = {"volume_size": 100,
+ "volume_id": "fake-vol-id",
+ "display_name": "Volume Test Name",
+ "display_description": "Volume Test Desc",
+ "availability_zone": "zone1:host1",
+ "host": "fake-host",
+ "metadata": {}}
+ body = {"snapshot": snap}
+ req = fakes.HTTPRequest.blank('/v1/snapshots')
+ self.snapshot_controller.create(req, body)
+
+ def test_index(self):
+ req = fakes.HTTPRequest.blank(self.url)
+ res_dict = self.controller.index(req, self.id)
+
+ expected = {
+ 'metadata': {
+ 'key1': 'value1',
+ 'key2': 'value2',
+ 'key3': 'value3',
+ },
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_index_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url)
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.index, req, self.url)
+
+ def test_index_no_data(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ res_dict = self.controller.index(req, self.id)
+ expected = {'metadata': {}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show(self):
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ res_dict = self.controller.show(req, self.id, 'key2')
+ expected = {'meta': {'key2': 'value2'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.show, req, self.id, 'key2')
+
+ def test_show_meta_not_found(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key6')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.show, req, self.id, 'key6')
+
+ def test_delete(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
+ delete_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ req.method = 'DELETE'
+ res = self.controller.delete(req, self.id, 'key2')
+
+ self.assertEqual(200, res.status_int)
+
+ def test_delete_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'DELETE'
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.delete, req, self.id, 'key1')
+
+ def test_delete_meta_not_found(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key6')
+ req.method = 'DELETE'
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.delete, req, self.id, 'key6')
+
+ def test_create(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+
+ req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
+ req.method = 'POST'
+ req.content_type = "application/json"
+ body = {"metadata": {"key9": "value9"}}
+ req.body = jsonutils.dumps(body)
+ res_dict = self.controller.create(req, self.id, body)
+ self.assertEqual(body, res_dict)
+
+ def test_create_empty_body(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'POST'
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, None)
+
+ def test_create_item_empty_key(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, body)
+
+ def test_create_item_key_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {("a" * 260): "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create,
+ req, self.id, body)
+
+ def test_create_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+
+ req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
+ req.method = 'POST'
+ req.content_type = "application/json"
+ body = {"metadata": {"key9": "value9"}}
+ req.body = jsonutils.dumps(body)
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.create, req, self.id, body)
+
+ def test_update_all(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {
+ 'metadata': {
+ 'key10': 'value10',
+ 'key99': 'value99',
+ },
+ }
+ req.body = jsonutils.dumps(expected)
+ res_dict = self.controller.update_all(req, self.id, expected)
+
+ self.assertEqual(expected, res_dict)
+
+ def test_update_all_empty_container(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'metadata': {}}
+ req.body = jsonutils.dumps(expected)
+ res_dict = self.controller.update_all(req, self.id, expected)
+
+ self.assertEqual(expected, res_dict)
+
+ def test_update_all_malformed_container(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'meta': {}}
+ req.body = jsonutils.dumps(expected)
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update_all, req, self.id, expected)
+
+ def test_update_all_malformed_data(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'metadata': ['asdf']}
+ req.body = jsonutils.dumps(expected)
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update_all, req, self.id, expected)
+
+ def test_update_all_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ body = {'metadata': {'key10': 'value10'}}
+ req.body = jsonutils.dumps(body)
+
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.update_all, req, '100', body)
+
+ def test_update_item(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res_dict = self.controller.update(req, self.id, 'key1', body)
+ expected = {'meta': {'key1': 'value1'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_update_item_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(
+ '/v1.1/fake/snapshots/asdf/metadata/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.update, req, self.id, 'key1', body)
+
+ def test_update_item_empty_body(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'key1', None)
+
+ def test_update_item_empty_key(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, '', body)
+
+ def test_update_item_key_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {("a" * 260): "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.update,
+ req, self.id, ("a" * 260), body)
+
+ def test_update_item_value_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": ("a" * 260)}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.update,
+ req, self.id, "key1", body)
+
+ def test_update_item_too_many_keys(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1", "key2": "value2"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'key1', body)
+
+ def test_update_item_body_uri_mismatch(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/bad')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'bad', body)
+
+ def test_invalid_metadata_items_on_create(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'POST'
+ req.headers["content-type"] = "application/json"
+
+ #test for long key
+ data = {"metadata": {"a" * 260: "value1"}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.create, req, self.id, data)
+
+ #test for long value
+ data = {"metadata": {"key": "v" * 260}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.create, req, self.id, data)
+
+ #test for empty key.
+ data = {"metadata": {"": "value1"}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, data)
'display_description': 'Default description', }
-def stub_snapshot_create(self, context, volume_id, name, description):
+def stub_snapshot_create(self, context,
+ volume_id, name,
+ description, metadata):
snapshot = _get_default_snapshot_param()
snapshot['volume_id'] = volume_id
snapshot['display_name'] = name
snapshot['display_description'] = description
+ snapshot['metadata'] = metadata
return snapshot
'created_at': None,
'display_name': 'Updated Test Name',
'display_description': 'Default description',
+ 'metadata': {},
}}
self.assertEquals(expected, res_dict)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import webob
+
+from cinder.api import extensions
+from cinder.api.v2 import snapshot_metadata
+from cinder.api.v2 import snapshots
+import cinder.db
+from cinder import exception
+from cinder.openstack.common import cfg
+from cinder.openstack.common import jsonutils
+from cinder import test
+from cinder.tests.api import fakes
+
+
+CONF = cfg.CONF
+
+
+def return_create_snapshot_metadata_max(context,
+ snapshot_id,
+ metadata,
+ delete):
+ return stub_max_snapshot_metadata()
+
+
+def return_create_snapshot_metadata(context, snapshot_id, metadata, delete):
+ return stub_snapshot_metadata()
+
+
+def return_snapshot_metadata(context, snapshot_id):
+ if not isinstance(snapshot_id, str) or not len(snapshot_id) == 36:
+ msg = 'id %s must be a uuid in return snapshot metadata' % snapshot_id
+ raise Exception(msg)
+ return stub_snapshot_metadata()
+
+
+def return_empty_snapshot_metadata(context, snapshot_id):
+ return {}
+
+
+def delete_snapshot_metadata(context, snapshot_id, key):
+ pass
+
+
+def stub_snapshot_metadata():
+ metadata = {
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ }
+ return metadata
+
+
+def stub_max_snapshot_metadata():
+ metadata = {"metadata": {}}
+ for num in range(CONF.quota_metadata_items):
+ metadata['metadata']['key%i' % num] = "blah"
+ return metadata
+
+
+def return_snapshot(context, snapshot_id):
+ return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
+ 'name': 'fake',
+ 'status': 'available',
+ 'metadata': {}}
+
+
+def return_volume(context, volume_id):
+ return {'id': 'fake-vol-id',
+ 'size': 100,
+ 'name': 'fake',
+ 'host': 'fake-host',
+ 'status': 'available',
+ 'metadata': {}}
+
+
+def return_snapshot_nonexistent(context, snapshot_id):
+ raise exception.SnapshotNotFound('bogus test message')
+
+
+def fake_update_snapshot_metadata(self, context, snapshot, diff):
+ pass
+
+
+class SnapshotMetaDataTest(test.TestCase):
+
+ def setUp(self):
+ super(SnapshotMetaDataTest, self).setUp()
+ self.volume_api = cinder.volume.api.API()
+ fakes.stub_out_key_pair_funcs(self.stubs)
+ self.stubs.Set(cinder.db, 'volume_get', return_volume)
+ self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+
+ self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
+ fake_update_snapshot_metadata)
+
+ self.ext_mgr = extensions.ExtensionManager()
+ self.ext_mgr.extensions = {}
+ self.snapshot_controller = snapshots.SnapshotsController(self.ext_mgr)
+ self.controller = snapshot_metadata.Controller()
+ self.id = str(uuid.uuid4())
+ self.url = '/v2/fake/snapshots/%s/metadata' % self.id
+
+ snap = {"volume_size": 100,
+ "volume_id": "fake-vol-id",
+ "display_name": "Volume Test Name",
+ "display_description": "Volume Test Desc",
+ "availability_zone": "zone1:host1",
+ "host": "fake-host",
+ "metadata": {}}
+ body = {"snapshot": snap}
+ req = fakes.HTTPRequest.blank('/v2/snapshots')
+ self.snapshot_controller.create(req, body)
+
+ def test_index(self):
+ req = fakes.HTTPRequest.blank(self.url)
+ res_dict = self.controller.index(req, self.id)
+
+ expected = {
+ 'metadata': {
+ 'key1': 'value1',
+ 'key2': 'value2',
+ 'key3': 'value3',
+ },
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_index_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url)
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.index, req, self.url)
+
+ def test_index_no_data(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ res_dict = self.controller.index(req, self.id)
+ expected = {'metadata': {}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show(self):
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ res_dict = self.controller.show(req, self.id, 'key2')
+ expected = {'meta': {'key2': 'value2'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.show, req, self.id, 'key2')
+
+ def test_show_meta_not_found(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key6')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.show, req, self.id, 'key6')
+
+ def test_delete(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
+ delete_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key2')
+ req.method = 'DELETE'
+ res = self.controller.delete(req, self.id, 'key2')
+
+ self.assertEqual(200, res.status_int)
+
+ def test_delete_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'DELETE'
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.delete, req, self.id, 'key1')
+
+ def test_delete_meta_not_found(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key6')
+ req.method = 'DELETE'
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.delete, req, self.id, 'key6')
+
+ def test_create(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_empty_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+
+ req = fakes.HTTPRequest.blank('/v2/snapshot_metadata')
+ req.method = 'POST'
+ req.content_type = "application/json"
+ body = {"metadata": {"key9": "value9"}}
+ req.body = jsonutils.dumps(body)
+ res_dict = self.controller.create(req, self.id, body)
+ self.assertEqual(body, res_dict)
+
+ def test_create_empty_body(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'POST'
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, None)
+
+ def test_create_item_empty_key(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, body)
+
+ def test_create_item_key_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {("a" * 260): "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create,
+ req, self.id, body)
+
+ def test_create_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_get',
+ return_snapshot_metadata)
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+
+ req = fakes.HTTPRequest.blank('/v2/snapshot_metadata')
+ req.method = 'POST'
+ req.content_type = "application/json"
+ body = {"metadata": {"key9": "value9"}}
+ req.body = jsonutils.dumps(body)
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.create, req, self.id, body)
+
+ def test_update_all(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {
+ 'metadata': {
+ 'key10': 'value10',
+ 'key99': 'value99',
+ },
+ }
+ req.body = jsonutils.dumps(expected)
+ res_dict = self.controller.update_all(req, self.id, expected)
+
+ self.assertEqual(expected, res_dict)
+
+ def test_update_all_empty_container(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'metadata': {}}
+ req.body = jsonutils.dumps(expected)
+ res_dict = self.controller.update_all(req, self.id, expected)
+
+ self.assertEqual(expected, res_dict)
+
+ def test_update_all_malformed_container(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'meta': {}}
+ req.body = jsonutils.dumps(expected)
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update_all, req, self.id, expected)
+
+ def test_update_all_malformed_data(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ expected = {'metadata': ['asdf']}
+ req.body = jsonutils.dumps(expected)
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update_all, req, self.id, expected)
+
+ def test_update_all_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ body = {'metadata': {'key10': 'value10'}}
+ req.body = jsonutils.dumps(body)
+
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.update_all, req, '100', body)
+
+ def test_update_item(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res_dict = self.controller.update(req, self.id, 'key1', body)
+ expected = {'meta': {'key1': 'value1'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_update_item_nonexistent_snapshot(self):
+ self.stubs.Set(cinder.db, 'snapshot_get',
+ return_snapshot_nonexistent)
+ req = fakes.HTTPRequest.blank(
+ '/v2/fake/snapshots/asdf/metadata/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.update, req, self.id, 'key1', body)
+
+ def test_update_item_empty_body(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'key1', None)
+
+ def test_update_item_empty_key(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, '', body)
+
+ def test_update_item_key_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {("a" * 260): "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.update,
+ req, self.id, ("a" * 260), body)
+
+ def test_update_item_value_too_long(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": ("a" * 260)}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.update,
+ req, self.id, "key1", body)
+
+ def test_update_item_too_many_keys(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/key1')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1", "key2": "value2"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'key1', body)
+
+ def test_update_item_body_uri_mismatch(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url + '/bad')
+ req.method = 'PUT'
+ body = {"meta": {"key1": "value1"}}
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.update, req, self.id, 'bad', body)
+
+ def test_invalid_metadata_items_on_create(self):
+ self.stubs.Set(cinder.db, 'snapshot_metadata_update',
+ return_create_snapshot_metadata)
+ req = fakes.HTTPRequest.blank(self.url)
+ req.method = 'POST'
+ req.headers["content-type"] = "application/json"
+
+ #test for long key
+ data = {"metadata": {"a" * 260: "value1"}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.create, req, self.id, data)
+
+ #test for long value
+ data = {"metadata": {"key": "v" * 260}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
+ self.controller.create, req, self.id, data)
+
+ #test for empty key.
+ data = {"metadata": {"": "value1"}}
+ req.body = jsonutils.dumps(data)
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller.create, req, self.id, data)
}
-def stub_snapshot_create(self, context, volume_id, name, description):
+def stub_snapshot_create(self, context,
+ volume_id, name,
+ description, metadata):
snapshot = _get_default_snapshot_param()
snapshot['volume_id'] = volume_id
snapshot['display_name'] = name
snapshot['display_description'] = description
+ snapshot['metadata'] = metadata
return snapshot
'created_at': None,
'name': 'Updated Test Name',
'description': 'Default description',
+ 'metadata': {},
}
}
self.assertEquals(expected, res_dict)
self.assertFalse(engine.dialect.has_table(engine.connect(),
"backups"))
+
+ def test_migration_009(self):
+ """Test adding snapshot_metadata table works correctly."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.INIT_VERSION)
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 8)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 9)
+
+ self.assertTrue(engine.dialect.has_table(engine.connect(),
+ "snapshot_metadata"))
+ snapshot_metadata = sqlalchemy.Table('snapshot_metadata',
+ metadata,
+ autoload=True)
+
+ self.assertTrue(isinstance(snapshot_metadata.c.created_at.type,
+ sqlalchemy.types.DATETIME))
+ self.assertTrue(isinstance(snapshot_metadata.c.updated_at.type,
+ sqlalchemy.types.DATETIME))
+ self.assertTrue(isinstance(snapshot_metadata.c.deleted_at.type,
+ sqlalchemy.types.DATETIME))
+ self.assertTrue(isinstance(snapshot_metadata.c.deleted.type,
+ sqlalchemy.types.BOOLEAN))
+ self.assertTrue(isinstance(snapshot_metadata.c.deleted.type,
+ sqlalchemy.types.BOOLEAN))
+ self.assertTrue(isinstance(snapshot_metadata.c.id.type,
+ sqlalchemy.types.INTEGER))
+ self.assertTrue(isinstance(snapshot_metadata.c.snapshot_id.type,
+ sqlalchemy.types.VARCHAR))
+ self.assertTrue(isinstance(snapshot_metadata.c.key.type,
+ sqlalchemy.types.VARCHAR))
+ self.assertTrue(isinstance(snapshot_metadata.c.value.type,
+ sqlalchemy.types.VARCHAR))
+
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 8)
+
+ self.assertFalse(engine.dialect.has_table(engine.connect(),
+ "snapshot_metadata"))
from oslo.config import cfg
from cinder.db import base
-from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import flags
from cinder.image import glance
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
-from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
import cinder.policy
from cinder import quota
connector,
force)
- def _create_snapshot(self, context, volume, name, description,
- force=False):
+ def _create_snapshot(self, context,
+ volume, name, description,
+ force=False, metadata=None):
check_policy(context, 'create_snapshot', volume)
if ((not force) and (volume['status'] != "available")):
msg = _("must be available")
raise exception.InvalidVolume(reason=msg)
+ self._check_metadata_properties(context, metadata)
options = {'volume_id': volume['id'],
'user_id': context.user_id,
'project_id': context.project_id,
'progress': '0%',
'volume_size': volume['size'],
'display_name': name,
- 'display_description': description}
+ 'display_description': description,
+ 'metadata': metadata}
snapshot = self.db.snapshot_create(context, options)
self.volume_rpcapi.create_snapshot(context, volume, snapshot)
return snapshot
- def create_snapshot(self, context, volume, name, description):
+ def create_snapshot(self, context,
+ volume, name,
+ description, metadata=None):
return self._create_snapshot(context, volume, name, description,
- False)
+ False, metadata)
- def create_snapshot_force(self, context, volume, name, description):
+ def create_snapshot_force(self, context,
+ volume, name,
+ description, metadata=None):
return self._create_snapshot(context, volume, name, description,
- True)
+ True, metadata)
@wrap_check_policy
def delete_snapshot(self, context, snapshot, force=False):
return i['value']
return None
+ def get_snapshot_metadata(self, context, snapshot):
+ """Get all metadata associated with a snapshot."""
+ rv = self.db.snapshot_metadata_get(context, snapshot['id'])
+ return dict(rv.iteritems())
+
+ def delete_snapshot_metadata(self, context, snapshot, key):
+ """Delete the given metadata item from a snapshot."""
+ self.db.snapshot_metadata_delete(context, snapshot['id'], key)
+
+ def update_snapshot_metadata(self, context,
+ snapshot, metadata,
+ delete=False):
+ """Updates or creates snapshot metadata.
+
+ If delete is True, metadata items that are not specified in the
+ `metadata` argument will be deleted.
+
+ """
+ orig_meta = self.get_snapshot_metadata(context, snapshot)
+ if delete:
+ _metadata = metadata
+ else:
+ _metadata = orig_meta.copy()
+ _metadata.update(metadata)
+
+ self._check_metadata_properties(context, _metadata)
+
+ self.db.snapshot_metadata_update(context,
+ snapshot['id'],
+ _metadata,
+ True)
+
+ # TODO(jdg): Implement an RPC call for drivers that may use this info
+
+ return _metadata
+
+ def get_snapshot_metadata_value(self, snapshot, key):
+ pass
+
@wrap_check_policy
def get_volume_image_metadata(self, context, volume):
db_data = self.db.volume_glance_metadata_get(context, volume['id'])