Abstract volumes and volume snapshots into objects.
Get, create, and delete snapshot APIs were changed to
use new snapshot objects. A skeleton volume object was
created, but cinder internals were not changed to use
the volume object, although volume is referenced and
used by the snapshot object. The internals will be
changed to use volume object in a subsequent patch.
Change-Id: I387018e80c8539565e99454db65d976030002c0f
Implements: blueprint cinder-objects
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
- if snapshot.get('snapshot_metadata'):
- metadata = snapshot.get('snapshot_metadata')
- d['metadata'] = dict((item['key'], item['value']) for item in metadata)
- # avoid circular ref when vol is a Volume instance
- elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
- dict):
+ if snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
+ dict):
d['metadata'] = snapshot['metadata']
else:
d['metadata'] = {}
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
- if snapshot.get('snapshot_metadata'):
- metadata = snapshot.get('snapshot_metadata')
- d['metadata'] = dict((item['key'], item['value']) for item in metadata)
- # avoid circular ref when vol is a Volume instance
- elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
- dict):
+ if snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
+ dict):
d['metadata'] = snapshot['metadata']
else:
d['metadata'] = {}
# NOTE(danms): You must make sure your object gets imported in this
# function in order for it to be registered by services that may
# need to receive it via RPC.
- pass
+ __import__('cinder.objects.volume')
+ __import__('cinder.objects.snapshot')
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+
+from cinder import db
+from cinder import exception
+from cinder.i18n import _
+from cinder import objects
+from cinder.objects import base
+from cinder.objects import fields
+from cinder.openstack.common import log as logging
+from cinder import utils
+
+CONF = cfg.CONF
+# NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They are
+# typically the relationship in the sqlalchemy object.
+OPTIONAL_FIELDS = ['volume', 'metadata']
+LOG = logging.getLogger(__name__)
+
+
+class Snapshot(base.CinderPersistentObject, base.CinderObject,
+ base.CinderObjectDictCompat):
+ # Version 1.0: Initial version
+ VERSION = '1.0'
+
+ fields = {
+ 'id': fields.UUIDField(),
+ 'name': fields.StringField(nullable=True),
+ 'volume_name': fields.StringField(nullable=True),
+
+ 'user_id': fields.UUIDField(nullable=True),
+ 'project_id': fields.UUIDField(nullable=True),
+
+ 'volume_id': fields.UUIDField(),
+ 'cgsnapshot_id': fields.UUIDField(nullable=True),
+ 'status': fields.StringField(nullable=True),
+ 'progress': fields.StringField(nullable=True),
+ 'volume_size': fields.IntegerField(),
+
+ 'display_name': fields.StringField(nullable=True),
+ 'display_description': fields.StringField(nullable=True),
+
+ 'encryption_key_id': fields.UUIDField(nullable=True),
+ 'volume_type_id': fields.UUIDField(nullable=True),
+
+ 'provider_location': fields.StringField(nullable=True),
+ 'metadata': fields.DictOfStringsField(),
+
+ 'volume': fields.ObjectField('Volume', nullable=True),
+ }
+
+ @property
+ def name(self):
+ return CONF.snapshot_name_template % self.id
+
+ def __init__(self, *args, **kwargs):
+ super(Snapshot, self).__init__(*args, **kwargs)
+ self._orig_metadata = {}
+
+ self._reset_metadata_tracking()
+
+ def obj_reset_changes(self, fields=None):
+ super(Snapshot, self).obj_reset_changes(fields)
+ self._reset_metadata_tracking(fields=fields)
+
+ def _reset_metadata_tracking(self, fields=None):
+ if fields is None or 'metadata' in fields:
+ self._orig_metadata = (dict(self.metadata)
+ if 'metadata' in self else {})
+
+ def obj_what_changed(self):
+ changes = super(Snapshot, self).obj_what_changed()
+ if 'metadata' in self and self.metadata != self._orig_metadata:
+ changes.add('metadata')
+
+ return changes
+
+ def obj_make_compatible(self, primitive, target_version):
+ """Make an object representation compatible with a target version."""
+ target_version = utils.convert_version_to_tuple(target_version)
+
+ @staticmethod
+ def _from_db_object(context, snapshot, db_snapshot, expected_attrs=None):
+ if expected_attrs is None:
+ expected_attrs = []
+ for name, field in snapshot.fields.items():
+ if name in OPTIONAL_FIELDS:
+ continue
+ value = db_snapshot.get(name)
+ if isinstance(field, fields.IntegerField):
+ value = value if value is not None else 0
+ snapshot[name] = value
+
+ if 'volume' in expected_attrs:
+ volume = objects.Volume(context)
+ volume._from_db_object(context, volume, db_snapshot['volume'])
+ snapshot.volume = volume
+ if 'metadata' in expected_attrs:
+ snapshot.metadata = db.snapshot_metadata_get(context,
+ db_snapshot['id'])
+
+ snapshot._context = context
+ snapshot.obj_reset_changes()
+ return snapshot
+
+ @base.remotable_classmethod
+ def get_by_id(cls, context, id):
+ db_snapshot = db.snapshot_get(context, id)
+ return cls._from_db_object(context, cls(context), db_snapshot,
+ expected_attrs=['metadata'])
+
+ @base.remotable
+ def create(self, context):
+ if self.obj_attr_is_set('id'):
+ raise exception.ObjectActionError(action='create',
+ reason=_('already created'))
+ updates = self.obj_get_changes()
+
+ if 'volume' in updates:
+ raise exception.ObjectActionError(action='create',
+ reason=_('volume assigned'))
+
+ db_snapshot = db.snapshot_create(context, updates)
+ self._from_db_object(context, self, db_snapshot)
+
+ @base.remotable
+ def save(self, context):
+ updates = self.obj_get_changes()
+ if updates:
+ if 'volume' in updates:
+ raise exception.ObjectActionError(action='save',
+ reason=_('volume changed'))
+
+ if 'metadata' in updates:
+ # Metadata items that are not specified in the
+ # self.metadata will be deleted
+ metadata = updates.pop('metadata', None)
+ self.metadata = db.snapshot_metadata_update(context, self.id,
+ metadata, True)
+
+ db.snapshot_update(context, self.id, updates)
+
+ self.obj_reset_changes()
+
+ @base.remotable
+ def destroy(self, context):
+ db.snapshot_destroy(context, self.id)
+
+ def obj_load_attr(self, attrname):
+ if attrname not in OPTIONAL_FIELDS:
+ raise exception.ObjectActionError(
+ action='obj_load_attr',
+ reason=_('attribute %s not lazy-loadable') % attrname)
+ if not self._context:
+ raise exception.OrphanedObjectError(method='obj_load_attr',
+ objtype=self.obj_name())
+
+ self.volume = objects.Volume.get_by_id(self._context, self.volume_id)
+ self.obj_reset_changes(fields=['volume'])
+
+ def delete_metadata_key(self, context, key):
+ db.snapshot_metadata_delete(context, self.id, key)
+ md_was_changed = 'metadata' in self.obj_what_changed()
+
+ del self.metadata[key]
+ self._orig_metadata.pop(key, None)
+
+ if not md_was_changed:
+ self.obj_reset_changes(['metadata'])
+
+
+class SnapshotList(base.ObjectListBase, base.CinderObject):
+ VERSION = '1.0'
+
+ fields = {
+ 'objects': fields.ListOfObjectsField('Snapshot'),
+ }
+ child_versions = {
+ '1.0': '1.0'
+ }
+
+ @base.remotable_classmethod
+ def get_all(cls, context):
+ snapshots = db.snapshot_get_all(context)
+ return base.obj_make_list(context, cls(), objects.Snapshot,
+ snapshots,
+ expected_attrs=['metadata'])
+
+ @base.remotable_classmethod
+ def get_all_by_project(cls, context, project_id):
+ snapshots = db.snapshot_get_all_by_project(context, project_id)
+ return base.obj_make_list(context, cls(context), objects.Snapshot,
+ snapshots, expected_attrs=['metadata'])
+
+ @base.remotable_classmethod
+ def get_all_for_volume(cls, context, volume_id):
+ snapshots = db.snapshot_get_all_for_volume(context, volume_id)
+ return base.obj_make_list(context, cls(context), objects.Snapshot,
+ snapshots, expected_attrs=['metadata'])
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+
+from cinder import db
+from cinder import exception
+from cinder.i18n import _
+from cinder import objects
+from cinder.objects import base
+from cinder.objects import fields
+from cinder.openstack.common import log as logging
+from cinder import utils
+
+CONF = cfg.CONF
+OPTIONAL_FIELDS = []
+LOG = logging.getLogger(__name__)
+
+
+class Volume(base.CinderPersistentObject, base.CinderObject,
+ base.CinderObjectDictCompat):
+ # Version 1.0: Initial version
+ VERSION = '1.0'
+
+ fields = {
+ 'id': fields.UUIDField(),
+ '_name_id': fields.UUIDField(nullable=True),
+ 'ec2_id': fields.UUIDField(nullable=True),
+ 'user_id': fields.UUIDField(nullable=True),
+ 'project_id': fields.UUIDField(nullable=True),
+
+ 'snapshot_id': fields.UUIDField(nullable=True),
+
+ 'host': fields.StringField(nullable=True),
+ 'size': fields.IntegerField(),
+ 'availability_zone': fields.StringField(),
+ 'instance_uuid': fields.UUIDField(nullable=True),
+ 'attached_host': fields.StringField(nullable=True),
+ 'mountpoint': fields.StringField(nullable=True),
+ 'attach_time': fields.StringField(nullable=True),
+ 'status': fields.StringField(),
+ 'attach_status': fields.StringField(),
+ 'migration_status': fields.StringField(nullable=True),
+
+ 'scheduled_at': fields.DateTimeField(nullable=True),
+ 'launched_at': fields.DateTimeField(nullable=True),
+ 'terminated_at': fields.DateTimeField(nullable=True),
+
+ 'display_name': fields.StringField(nullable=True),
+ 'display_description': fields.StringField(nullable=True),
+
+ 'provider_location': fields.StringField(nullable=True),
+ 'provider_auth': fields.StringField(nullable=True),
+ 'provider_geometry': fields.StringField(nullable=True),
+
+ 'volume_type_id': fields.UUIDField(nullable=True),
+ 'source_volid': fields.UUIDField(nullable=True),
+ 'encryption_key_id': fields.UUIDField(nullable=True),
+
+ 'consistencygroup_id': fields.UUIDField(nullable=True),
+
+ 'deleted': fields.BooleanField(default=False),
+ 'bootable': fields.BooleanField(default=False),
+
+ 'replication_status': fields.StringField(nullable=True),
+ 'replication_extended_status': fields.StringField(nullable=True),
+ 'replication_driver_data': fields.StringField(nullable=True),
+ }
+
+ @property
+ def name_id(self):
+ return self.id if not self._name_id else self._name_id
+
+ @name_id.setter
+ def name_id(self, value):
+ self._name_id = value
+
+ @property
+ def name(self):
+ return CONF.volume_name_template % self.name_id
+
+ def __init__(self, *args, **kwargs):
+ super(Volume, self).__init__(*args, **kwargs)
+
+ def obj_make_compatible(self, primitive, target_version):
+ """Make an object representation compatible with a target version."""
+ target_version = utils.convert_version_to_tuple(target_version)
+
+ @staticmethod
+ def _from_db_object(context, volume, db_volume):
+ for name, field in volume.fields.items():
+ value = db_volume[name]
+ if isinstance(field, fields.IntegerField):
+ value = value or 0
+ volume[name] = value
+
+ volume._context = context
+ volume.obj_reset_changes()
+ return volume
+
+ @base.remotable_classmethod
+ def get_by_id(cls, context, id):
+ db_volume = db.volume_get(context, id)
+ return cls._from_db_object(context, cls(context), db_volume)
+
+ @base.remotable
+ def create(self, context):
+ if self.obj_attr_is_set('id'):
+ raise exception.ObjectActionError(action='create',
+ reason=_('already created'))
+ updates = self.obj_get_changes()
+ db_volume = db.volume_create(context, updates)
+ self._from_db_object(context, self, db_volume)
+
+ def save(self):
+ context = self._context
+ updates = self.obj_get_changes()
+ if updates:
+ db.volume_update(context, self.id, updates)
+
+ self.obj_reset_changes()
+
+ @base.remotable
+ def destroy(self, context):
+ db.volume_destroy(context, self.id)
+
+
+class VolumeList(base.ObjectListBase, base.CinderObject):
+ VERSION = '1.0'
+
+ fields = {
+ 'objects': fields.ListOfObjectsField('Volume'),
+ }
+ child_versions = {
+ '1.0': '1.0'
+ }
+
+ @base.remotable_classmethod
+ def get_all(cls, context, marker, limit, sort_key, sort_dir,
+ filters=None):
+ volumes = db.volume_get_all(context, marker, limit, sort_key,
+ sort_dir, filters=filters)
+ return base.obj_make_list(context, cls(context), objects.Volume,
+ volumes)
from cinder.db import migration
from cinder.db.sqlalchemy import api as sqla_api
from cinder import i18n
+from cinder import objects
from cinder.openstack.common import log as oslo_logging
from cinder import rpc
from cinder import service
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
+ # Import cinder objects for test cases
+ objects.register_all()
+
# Unit tests do not need to use lazy gettext
i18n.enable_lazy(False)
import ast
import fixtures
+import mock
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
from cinder.tests import cast_as_call
+from cinder.tests import fake_snapshot
from cinder.volume import api as volume_api
from cinder.volume.targets import tgt
# volume is deleted
self.assertRaises(exception.NotFound, db.volume_get, ctx, volume['id'])
- def test_force_delete_snapshot(self):
+ @mock.patch.object(volume_api.API, 'delete_snapshot', return_value=True)
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ @mock.patch.object(db, 'snapshot_get')
+ @mock.patch.object(db, 'volume_get')
+ def test_force_delete_snapshot(self, volume_get, snapshot_get, get_by_id,
+ delete_snapshot):
ctx = context.RequestContext('admin', 'fake', True)
- snapshot = stubs.stub_snapshot(1, host='foo')
- self.stubs.Set(db, 'volume_get', lambda x, y: snapshot)
- self.stubs.Set(db, 'snapshot_get', lambda x, y: snapshot)
- self.stubs.Set(volume_api.API, 'delete_snapshot', lambda *x, **y: True)
+ volume = stubs.stub_volume(1)
+ snapshot = stubs.stub_snapshot(1)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ volume_get.return_value = volume
+ snapshot_get.return_value = snapshot
+ get_by_id.return_value = snapshot_obj
+
path = '/v2/fake/snapshots/%s/action' % snapshot['id']
req = webob.Request.blank(path)
req.method = 'POST'
from lxml import etree
+import mock
from oslo_serialization import jsonutils
import webob
from cinder.api.contrib import extended_snapshot_attributes
+from cinder import context
from cinder import test
from cinder.tests.api import fakes
-from cinder import volume
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
UUID1 = '00000000-0000-0000-0000-000000000001'
'display_name': 'Default name',
'display_description': 'Default description',
'project_id': 'fake',
- 'progress': '0%'}
+ 'progress': '0%',
+ 'expected_attrs': ['metadata']}
def fake_snapshot_get(self, context, snapshot_id):
def setUp(self):
super(ExtendedSnapshotAttributesTest, self).setUp()
- self.stubs.Set(volume.api.API, 'get_snapshot', fake_snapshot_get)
- self.stubs.Set(volume.api.API, 'get_all_snapshots',
- fake_snapshot_get_all)
def _make_request(self, url):
req = webob.Request.blank(url)
project_id)
self.assertEqual(snapshot.get('%sprogress' % self.prefix), progress)
- def test_show(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get):
+ ctx = context.RequestContext('fake', 'fake', auth_token=True)
+ snapshot = _get_default_snapshot_param()
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
url = '/v2/fake/snapshots/%s' % UUID1
res = self._make_request(url)
import uuid
+import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
import webob
from cinder.api import extensions
from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
+from cinder import context
import cinder.db
from cinder import exception
from cinder import test
from cinder.tests.api import fakes
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
CONF = cfg.CONF
req = fakes.HTTPRequest.blank('/v1/snapshots')
self.snapshot_controller.create(req, body)
- def test_index(self):
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key1': 'value1',
+ 'key2': 'value2',
+ 'key3': 'value3'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
}
self.assertEqual(expected, res_dict)
- def test_index_nonexistent_snapshot(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_nonexistent)
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index_nonexistent_snapshot(self, snapshot_get_by_id):
+ snapshot_get_by_id.side_effect = \
+ exception.SnapshotNotFound(snapshot_id=self.req_id)
+
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.index, req, self.url)
- def test_index_no_data(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index_no_data(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
- def test_show(self):
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key2': 'value2'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, self.req_id, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
- def test_show_nonexistent_snapshot(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_nonexistent)
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show_nonexistent_snapshot(self, snapshot_get_by_id):
+ snapshot_get_by_id.side_effect = \
+ exception.SnapshotNotFound(snapshot_id=self.req_id)
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key2')
- def test_show_meta_not_found(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show_meta_not_found(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key6')
- def test_delete(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_metadata)
- self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
- delete_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_delete')
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key2': 'value2'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_delete(self, snapshot_get_by_id, snapshot_metadata_get,
+ snapshot_metadata_delete):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, self.req_id, 'key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key1')
- def test_delete_meta_not_found(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_delete_meta_not_found(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key6')
- def test_create(self):
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
res_dict = self.controller.create(req, self.req_id, body)
self.assertEqual(body, res_dict)
- def test_create_with_keys_in_uppercase_and_lowercase(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create_with_keys_in_uppercase_and_lowercase(
+ self, snapshot_get_by_id, snapshot_update, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
# if the keys in uppercase_and_lowercase, should return the one
# which server added
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata_insensitive)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.create, req, self.req_id, body)
- def test_update_all(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all(self, snapshot_get_by_id, snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': []
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_new_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
self.assertEqual(expected, res_dict)
- def test_update_all_with_keys_in_uppercase_and_lowercase(self):
+ @mock.patch('cinder.db.snapshot_update',
+ return_value={'key10': 'value10',
+ 'key99': 'value99',
+ 'KEY20': 'value20'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all_with_keys_in_uppercase_and_lowercase(
+ self, snapshot_get_by_id, snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_create_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
self.assertEqual(expected, res_dict)
- def test_update_all_empty_container(self):
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all_empty_container(self, snapshot_get_by_id,
+ snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': []
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_empty_container_metadata)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update_all, req, '100', body)
- def test_update_item(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_update',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_update', return_value=dict())
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item(self, snapshot_get_by_id, snapshot_metadata_get,
+ snapshot_update, snapshot_metadata_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, '', body)
- def test_update_item_key_too_long(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item_key_too_long(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
self.controller.update,
req, self.req_id, ("a" * 260), body)
- def test_update_item_value_too_long(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item_value_too_long(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
self.controller.update, req, self.req_id, 'bad',
body)
- def test_invalid_metadata_items_on_create(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_invalid_metadata_items_on_create(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
import datetime
from lxml import etree
+import mock
import webob
from cinder.api.v1 import snapshots
+from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v1 import stubs
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
from cinder import volume
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, body)
- def test_snapshot_update(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
- self.stubs.Set(volume.api.API, "update_snapshot",
- stubs.stub_snapshot_update)
+ @mock.patch.object(volume.api.API, "update_snapshot",
+ side_effect=stubs.stub_snapshot_update)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_update(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get, update_snapshot):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
updates = {"display_name": "Updated Test Name", }
body = {"snapshot": updates}
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
res_dict = self.controller.update(req, UUID, body)
expected = {'snapshot': {
'id': UUID,
- 'volume_id': 12,
- 'status': 'available',
+ 'volume_id': '1',
+ 'status': u'available',
'size': 100,
'created_at': None,
- 'display_name': 'Updated Test Name',
- 'display_description': 'Default description',
+ 'display_name': u'Updated Test Name',
+ 'display_description': u'Default description',
'metadata': {},
}}
self.assertEqual(expected, res_dict)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.update, req,
'not-the-uuid', body)
- def test_snapshot_delete(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
- self.stubs.Set(volume.api.API, "delete_snapshot", stub_snapshot_delete)
+ @mock.patch.object(volume.api.API, "delete_snapshot",
+ side_effect=stubs.stub_snapshot_update)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get, delete_snapshot):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
snapshot_id = UUID
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % snapshot_id)
req,
snapshot_id)
- def test_snapshot_show(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
resp_dict = self.controller.show(req, UUID)
resp_snapshot = resp_snapshots.pop()
self.assertEqual(resp_snapshot['id'], UUID)
- def test_snapshot_list_by_status(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_snapshot_list_by_status(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, display_name='backup1',
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 0)
- def test_snapshot_list_by_volume(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_snapshot_list_by_volume(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, volume_id='vol1', status='creating'),
self.assertEqual(resp['snapshots'][0]['volume_id'], 'vol1')
self.assertEqual(resp['snapshots'][0]['status'], 'available')
- def test_snapshot_list_by_name(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_snapshot_list_by_name(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, display_name='backup1'),
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 0)
- def test_admin_list_snapshots_limited_to_project(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_admin_list_snapshots_limited_to_project(self,
+ snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots',
use_admin_context=True)
res = self.controller.index(req)
# non_admin case
list_snapshots_with_limit_and_offset(is_admin=False)
- def test_admin_list_snapshots_all_tenants(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
- def test_all_tenants_non_admin_gets_all_tenants(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_all_tenants_non_admin_gets_all_tenants(self,
+ snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots?all_tenants=1')
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
- def test_non_admin_get_by_project(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_non_admin_get_by_project(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots')
res = self.controller.index(req)
self.assertIn('snapshots', res)
import uuid
+import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
import webob
from cinder.api import extensions
from cinder.api.v2 import snapshot_metadata
from cinder.api.v2 import snapshots
+from cinder import context
import cinder.db
from cinder import exception
from cinder import test
from cinder.tests.api import fakes
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
CONF = cfg.CONF
req = fakes.HTTPRequest.blank('/v2/snapshots')
self.snapshot_controller.create(req, body)
- def test_index(self):
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key1': 'value1',
+ 'key2': 'value2',
+ 'key3': 'value3'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
}
self.assertEqual(expected, res_dict)
- def test_index_nonexistent_snapshot(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_nonexistent)
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index_nonexistent_snapshot(self, snapshot_get_by_id):
+ snapshot_get_by_id.side_effect = \
+ exception.SnapshotNotFound(snapshot_id=self.req_id)
+
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.index, req, self.url)
- def test_index_no_data(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_index_no_data(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
- def test_show(self):
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key2': 'value2'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show(self, snapshot_get_by_id, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, self.req_id, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
- def test_show_nonexistent_snapshot(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_nonexistent)
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show_nonexistent_snapshot(self, snapshot_get_by_id):
+ snapshot_get_by_id.side_effect = \
+ exception.SnapshotNotFound(snapshot_id=self.req_id)
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key2')
- def test_show_meta_not_found(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_show_meta_not_found(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key6')
- def test_delete(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_snapshot_metadata)
- self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
- delete_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_delete')
+ @mock.patch('cinder.db.snapshot_metadata_get',
+ return_value={'key2': 'value2'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_delete(self, snapshot_get_by_id, snapshot_metadata_get,
+ snapshot_metadata_delete):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, self.req_id, 'key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key1')
- def test_delete_meta_not_found(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_delete_meta_not_found(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key6')
- def test_create(self):
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
res_dict = self.controller.create(req, self.req_id, body)
self.assertEqual(body, res_dict)
- def test_create_with_keys_in_uppercase_and_lowercase(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create_with_keys_in_uppercase_and_lowercase(
+ self, snapshot_get_by_id, snapshot_update, snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
# if the keys in uppercase_and_lowercase, should return the one
# which server added
- self.stubs.Set(cinder.db, 'snapshot_metadata_get',
- return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata_insensitive)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.create, req, self.req_id, body)
- def test_update_all(self):
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all(self, snapshot_get_by_id, snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': []
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_new_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
self.assertEqual(expected, res_dict)
- def test_update_all_with_keys_in_uppercase_and_lowercase(self):
+ @mock.patch('cinder.db.snapshot_update',
+ return_value={'key10': 'value10',
+ 'key99': 'value99',
+ 'KEY20': 'value20'})
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all_with_keys_in_uppercase_and_lowercase(
+ self, snapshot_get_by_id, snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_create_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
self.assertEqual(expected, res_dict)
- def test_update_all_empty_container(self):
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_all_empty_container(self, snapshot_get_by_id,
+ snapshot_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': []
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_empty_container_metadata)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update_all, req, '100', body)
- def test_update_item(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_update',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.snapshot_metadata_update', return_value=dict())
+ @mock.patch('cinder.db.snapshot_update')
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item(self, snapshot_get_by_id, snapshot_metadata_get,
+ snapshot_update, snapshot_metadata_update):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, '', body)
- def test_update_item_key_too_long(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item_key_too_long(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
self.controller.update,
req, self.req_id, ("a" * 260), body)
- def test_update_item_value_too_long(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_update_item_value_too_long(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
self.controller.update, req, self.req_id, 'bad',
body)
- def test_invalid_metadata_items_on_create(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_invalid_metadata_items_on_create(self, snapshot_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': self.req_id,
+ 'expected_attrs': ['metadata']
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_get_by_id.return_value = snapshot_obj
+
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
import datetime
from lxml import etree
+import mock
import webob
from cinder.api.v2 import snapshots
+from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
from cinder import volume
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, body)
- def test_snapshot_update(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
- self.stubs.Set(volume.api.API, "update_snapshot",
- stubs.stub_snapshot_update)
+ @mock.patch.object(volume.api.API, "update_snapshot",
+ side_effect=stubs.stub_snapshot_update)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_update(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get, update_snapshot):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
updates = {
"name": "Updated Test Name",
}
expected = {
'snapshot': {
'id': UUID,
- 'volume_id': 12,
- 'status': 'available',
+ 'volume_id': '1',
+ 'status': u'available',
'size': 100,
'created_at': None,
- 'name': 'Updated Test Name',
- 'description': 'Default description',
+ 'name': u'Updated Test Name',
+ 'description': u'Default description',
'metadata': {},
}
}
self.assertRaises(webob.exc.HTTPNotFound, self.controller.update, req,
'not-the-uuid', body)
- def test_snapshot_delete(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
- self.stubs.Set(volume.api.API, "delete_snapshot", stub_snapshot_delete)
+ @mock.patch.object(volume.api.API, "delete_snapshot",
+ side_effect=stubs.stub_snapshot_update)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_delete(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get, delete_snapshot):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
snapshot_id = UUID
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % snapshot_id)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
req, snapshot_id)
- def test_snapshot_show(self):
- self.stubs.Set(volume.api.API, "get_snapshot", stub_snapshot_get)
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_snapshot_show(self, snapshot_get_by_id, volume_get_by_id,
+ snapshot_metadata_get):
+ snapshot = {
+ 'id': UUID,
+ 'volume_id': 1,
+ 'status': 'available',
+ 'volume_size': 100,
+ 'display_name': 'Default name',
+ 'display_description': 'Default description',
+ 'expected_attrs': ['metadata'],
+ }
+ ctx = context.RequestContext('admin', 'fake', True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ fake_volume_obj = fake_volume.fake_volume_obj(ctx)
+ snapshot_get_by_id.return_value = snapshot_obj
+ volume_get_by_id.return_value = fake_volume_obj
+
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
resp_dict = self.controller.show(req, UUID)
resp_snapshot = resp_snapshots.pop()
self.assertEqual(resp_snapshot['id'], UUID)
- def test_snapshot_list_by_status(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_snapshot_list_by_status(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, display_name='backup1',
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 0)
- def test_snapshot_list_by_volume(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ def test_snapshot_list_by_volume(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, volume_id='vol1', status='creating'),
self.assertEqual(resp['snapshots'][0]['volume_id'], 'vol1')
self.assertEqual(resp['snapshots'][0]['status'], 'available')
- def test_snapshot_list_by_name(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ def test_snapshot_list_by_name(self, snapshot_metadata_get):
def stub_snapshot_get_all_by_project(context, project_id):
return [
stubs.stub_snapshot(1, display_name='backup1'),
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 0)
- def test_admin_list_snapshots_limited_to_project(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_admin_list_snapshots_limited_to_project(self,
+ snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots',
use_admin_context=True)
res = self.controller.index(req)
# non_admin case
list_snapshots_with_limit_and_offset(is_admin=False)
- def test_admin_list_snapshots_all_tenants(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_admin_list_snapshots_all_tenants(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
- def test_all_tenants_non_admin_gets_all_tenants(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_all_tenants_non_admin_gets_all_tenants(self,
+ snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1')
res = self.controller.index(req)
self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
- def test_non_admin_get_by_project(self):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
+ def test_non_admin_get_by_project(self, snapshot_metadata_get):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots')
res = self.controller.index(req)
self.assertIn('snapshots', res)
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from cinder import objects
+from cinder.objects import fields
+
+
+def fake_db_volume(**updates):
+ db_volume = {
+ 'id': 1,
+ 'size': 1,
+ 'name': 'fake',
+ 'availability_zone': 'fake_availability_zone',
+ 'status': 'available',
+ 'attach_status': 'detached',
+ }
+
+ for name, field in objects.Volume.fields.items():
+ if name in db_volume:
+ continue
+ if field.nullable:
+ db_volume[name] = None
+ elif field.default != fields.UnspecifiedDefault:
+ db_volume[name] = field.default
+ else:
+ raise Exception('fake_db_volume needs help with %s' % name)
+
+ if updates:
+ db_volume.update(updates)
+
+ return db_volume
+
+
+def fake_db_snapshot(**updates):
+ db_snapshot = {
+ 'id': 1,
+ 'volume_id': 'fake_id',
+ 'status': "creating",
+ 'progress': '0%',
+ 'volume_size': 1,
+ 'display_name': 'fake_name',
+ 'display_description': 'fake_description',
+ 'metadata': {},
+ 'snapshot_metadata': {},
+ }
+
+ for name, field in objects.Snapshot.fields.items():
+ if name in db_snapshot:
+ continue
+ if field.nullable:
+ db_snapshot[name] = None
+ elif field.default != fields.UnspecifiedDefault:
+ db_snapshot[name] = field.default
+ else:
+ raise Exception('fake_db_snapshot needs help with %s' % name)
+
+ if updates:
+ db_snapshot.update(updates)
+
+ return db_snapshot
+
+
+def fake_snapshot_obj(context, **updates):
+ expected_attrs = updates.pop('expected_attrs', None)
+ return objects.Snapshot._from_db_object(context, objects.Snapshot(),
+ fake_db_snapshot(**updates),
+ expected_attrs=expected_attrs)
+
+
+def fake_volume_obj(context, **updates):
+ expected_attrs = updates.pop('expected_attrs', None)
+ return objects.Volume._from_db_object(context, objects.Volume(),
+ fake_db_volume(**updates),
+ expected_attrs=expected_attrs)
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from cinder import objects
+from cinder.objects import fields
+
+
+def fake_db_volume(**updates):
+ db_volume = {
+ 'id': 1,
+ 'size': 1,
+ 'name': 'fake',
+ 'availability_zone': 'fake_availability_zone',
+ 'status': 'available',
+ 'attach_status': 'detached',
+ }
+
+ for name, field in objects.Volume.fields.items():
+ if name in db_volume:
+ continue
+ if field.nullable:
+ db_volume[name] = None
+ elif field.default != fields.UnspecifiedDefault:
+ db_volume[name] = field.default
+ else:
+ raise Exception('fake_db_volume needs help with %s' % name)
+
+ if updates:
+ db_volume.update(updates)
+
+ return db_volume
+
+
+def fake_volume_obj(context, **updates):
+ return objects.Volume._from_db_object(context, objects.Volume(),
+ fake_db_volume(**updates))
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from cinder.objects import snapshot as snapshot_obj
+from cinder.tests import fake_volume
+from cinder.tests.objects import test_objects
+
+fake_snapshot = {
+ 'id': '1',
+ 'volume_id': 'fake_id',
+ 'status': "creating",
+ 'progress': '0%',
+ 'volume_size': 1,
+ 'display_name': 'fake_name',
+ 'display_description': 'fake_description',
+}
+
+
+class TestSnapshot(test_objects._LocalTest):
+ @staticmethod
+ def _compare(test, db, obj):
+ for field, value in db.items():
+ test.assertEqual(db[field], obj[field])
+
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ @mock.patch('cinder.db.snapshot_get', return_value=fake_snapshot)
+ def test_get_by_id(self, snapshot_get, snapshot_metadata_get):
+ snapshot = snapshot_obj.Snapshot.get_by_id(self.context, 1)
+ self._compare(self, fake_snapshot, snapshot)
+
+ def test_reset_changes(self):
+ snapshot = snapshot_obj.Snapshot()
+ snapshot.metadata = {'key1': 'value1'}
+ self.assertEqual({}, snapshot._orig_metadata)
+ snapshot.obj_reset_changes(['metadata'])
+ self.assertEqual({'key1': 'value1'}, snapshot._orig_metadata)
+
+ @mock.patch('cinder.db.snapshot_create', return_value=fake_snapshot)
+ def test_create(self, snapshot_create):
+ snapshot = snapshot_obj.Snapshot(context=self.context)
+ snapshot.create()
+ self.assertEqual(fake_snapshot['id'], snapshot.id)
+ self.assertEqual(fake_snapshot['volume_id'], snapshot.volume_id)
+
+ @mock.patch('cinder.db.snapshot_update')
+ def test_save(self, snapshot_update):
+ snapshot = snapshot_obj.Snapshot._from_db_object(
+ self.context, snapshot_obj.Snapshot(), fake_snapshot)
+ snapshot.display_name = 'foobar'
+ snapshot.save(self.context)
+ snapshot_update.assert_called_once_with(self.context, snapshot.id,
+ {'display_name': 'foobar'})
+
+ @mock.patch('cinder.db.snapshot_metadata_update',
+ return_value={'key1': 'value1'})
+ @mock.patch('cinder.db.snapshot_update')
+ def test_save_with_metadata(self, snapshot_update,
+ snapshot_metadata_update):
+ snapshot = snapshot_obj.Snapshot._from_db_object(
+ self.context, snapshot_obj.Snapshot(), fake_snapshot)
+ snapshot.display_name = 'foobar'
+ snapshot.metadata = {'key1': 'value1'}
+ self.assertEqual({'display_name': 'foobar',
+ 'metadata': {'key1': 'value1'}},
+ snapshot.obj_get_changes())
+ snapshot.save(self.context)
+ snapshot_update.assert_called_once_with(self.context, snapshot.id,
+ {'display_name': 'foobar'})
+ snapshot_metadata_update.assert_called_once_with(self.context, '1',
+ {'key1': 'value1'},
+ True)
+
+ @mock.patch('cinder.db.snapshot_destroy')
+ def test_destroy(self, snapshot_destroy):
+ snapshot = snapshot_obj.Snapshot(context=self.context, id=1)
+ snapshot.destroy()
+ snapshot_destroy.assert_called_once_with(self.context, '1')
+
+ @mock.patch('cinder.db.snapshot_metadata_delete')
+ def test_delete_metadata_key(self, snapshot_metadata_delete):
+ snapshot = snapshot_obj.Snapshot(self.context, id=1)
+ snapshot.metadata = {'key1': 'value1', 'key2': 'value2'}
+ self.assertEqual({}, snapshot._orig_metadata)
+ snapshot.delete_metadata_key(self.context, 'key2')
+ self.assertEqual({'key1': 'value1'}, snapshot.metadata)
+ snapshot_metadata_delete.assert_called_once_with(self.context, '1',
+ 'key2')
+
+
+class TestSnapshotList(test_objects._LocalTest):
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.db.snapshot_get_all', return_value=[fake_snapshot])
+ def test_get_all(self, snapshot_get_all, volume_get_by_id,
+ snapshot_metadata_get):
+ fake_volume_obj = fake_volume.fake_volume_obj(self.context)
+ volume_get_by_id.return_value = fake_volume_obj
+
+ snapshots = snapshot_obj.SnapshotList.get_all(self.context)
+ self.assertEqual(1, len(snapshots))
+ TestSnapshot._compare(self, fake_snapshot, snapshots[0])
+
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.db.snapshot_get_all_by_project',
+ return_value=[fake_snapshot])
+ def test_get_all_by_project(self, get_all_by_project, volume_get_by_id,
+ snapshot_metadata_get):
+ fake_volume_obj = fake_volume.fake_volume_obj(self.context)
+ volume_get_by_id.return_value = fake_volume_obj
+
+ snapshots = snapshot_obj.SnapshotList.get_all_by_project(
+ self.context, self.context.project_id)
+ self.assertEqual(1, len(snapshots))
+ TestSnapshot._compare(self, fake_snapshot, snapshots[0])
+
+ @mock.patch('cinder.db.snapshot_metadata_get', return_value={})
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ @mock.patch('cinder.db.snapshot_get_all_for_volume',
+ return_value=[fake_snapshot])
+ def test_get_all_for_volume(self, get_all_for_volume, volume_get_by_id,
+ snapshot_metadata_get):
+ fake_volume_obj = fake_volume.fake_volume_obj(self.context)
+ volume_get_by_id.return_value = fake_volume_obj
+
+ snapshots = snapshot_obj.SnapshotList.get_all_for_volume(
+ self.context, fake_volume_obj.id)
+ self.assertEqual(1, len(snapshots))
+ TestSnapshot._compare(self, fake_snapshot, snapshots[0])
from cinder import exception
from cinder.image import image_utils
from cinder import keymgr
+from cinder import objects
from cinder.openstack.common import fileutils
from cinder.openstack.common import log as logging
import cinder.policy
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- snapshot_id = self._create_snapshot(volume_src['id'])['id']
+ snapshot_id = self._create_snapshot(volume_src['id'],
+ size=volume_src['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, volume_src['id'],
- snapshot_id)
+ snapshot_obj)
volume_dst = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
volume_dst['id']).snapshot_id)
self.volume.delete_volume(self.context, volume_dst['id'])
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume_src['id'])
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- snapshot_id = self._create_snapshot(volume_src['id'])['id']
+ snapshot_id = self._create_snapshot(volume_src['id'],
+ size=volume_src['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
# NOTE(flaper87): Set initialized to False
self.volume.driver._initialized = False
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_snapshot,
- self.context, volume_src['id'],
- snapshot_id)
+ self.context, volume_src['id'], snapshot_obj)
# NOTE(flaper87): The volume status should be error.
snapshot = db.snapshot_get(context.get_admin_context(), snapshot_id)
# NOTE(flaper87): Set initialized to True,
# lets cleanup the mess
self.volume.driver._initialized = True
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume_src['id'])
def _mock_synchronized(self, name, *s_args, **s_kwargs):
# no lock
self.volume.create_volume(self.context, src_vol_id)
- snap_id = self._create_snapshot(src_vol_id)['id']
+ snap_id = self._create_snapshot(src_vol_id,
+ size=src_vol['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
- self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+ self.volume.create_snapshot(self.context, src_vol_id, snapshot_obj)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
self.assertEqual(len(self.called), 4)
# locked
- self.volume.delete_snapshot(self.context, snap_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.assertEqual(len(self.called), 6)
# locked
# create volume from snapshot
snapshot_id = self._create_snapshot(src_vol['id'])['id']
- self.volume.create_snapshot(self.context, src_vol['id'], snapshot_id)
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, src_vol['id'], snapshot_obj)
# ensure that status of snapshot is 'available'
snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
# create snapshot of volume
snapshot_id = self._create_snapshot(volume['id'])['id']
- self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
# ensure that status of snapshot is 'available'
snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
self.volume.create_volume(self.context, src_vol_id)
# create snapshot
- snap_id = self._create_snapshot(src_vol_id)['id']
+ snap_id = self._create_snapshot(src_vol_id,
+ size=src_vol['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
- self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+ self.volume.create_snapshot(self.context, src_vol_id, snapshot_obj)
# create vol from snapshot...
dst_vol = tests_utils.create_volume(self.context,
self.stubs.Set(self.context, 'elevated', mock_elevated)
# locked
- self.volume.delete_snapshot(self.context, snap_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
# we expect the volume create to fail with the following err since the
# snapshot was deleted while the create was locked. Note that the
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
snapshot = self._create_snapshot(volume_src['id'])
+ snapshot_obj = objects.Snapshot.get_by_id(self.context,
+ snapshot['id'])
self.volume.create_snapshot(self.context, volume_src['id'],
- snapshot['id'])
+ snapshot_obj)
snapshot = db.snapshot_get(self.context, snapshot['id'])
volume_dst = volume_api.create(self.context,
self.assertFalse(fake_notifier.NOTIFICATIONS[2])
self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)
- snapshot_id = self._create_snapshot(volume['id'])['id']
- self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ snapshot_id = self._create_snapshot(volume['id'],
+ size=volume['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
'tenant_id': 'fake',
'user_id': 'fake',
'volume_id': volume['id'],
- 'volume_size': 0,
+ 'volume_size': 1,
'availability_zone': 'nova'
}
self.assertDictMatch(msg['payload'], expected)
self.assertEqual(len(fake_notifier.NOTIFICATIONS), 4)
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
msg = fake_notifier.NOTIFICATIONS[4]
self.assertEqual(msg['event_type'], 'snapshot.delete.start')
expected['status'] = 'available'
"""Test snapshot can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, **self.volume_params)
- snapshot = self._create_snapshot(volume['id'], metadata=test_meta)
+ snapshot = self._create_snapshot(volume['id'], size=volume['size'],
+ metadata=test_meta)
snapshot_id = snapshot['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
snap = db.snapshot_get(context.get_admin_context(), snapshot_id)
result_dict = dict(snap.iteritems())
result_dict['snapshot_metadata'][0].key:
result_dict['snapshot_metadata'][0].value}
self.assertEqual(result_meta, test_meta)
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
"""Test volume can't be deleted with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot_id = self._create_snapshot(volume['id'])['id']
- self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ snapshot_id = self._create_snapshot(volume['id'],
+ size=volume['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
volume_api.delete,
self.context,
volume)
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume['id'])
def test_delete_volume_in_consistency_group(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot_id = self._create_snapshot(volume['id'])['id']
- self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ snapshot_id = self._create_snapshot(volume['id'],
+ size=volume['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
snapshot = db.snapshot_get(context.get_admin_context(),
snapshot_id)
snapshot)
snapshot['status'] = 'error'
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume['id'])
def test_create_snapshot_force(self):
# create snapshot from bootable volume
snap_id = self._create_snapshot(volume_id)['id']
- self.volume.create_snapshot(ctxt, volume_id, snap_id)
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
+ self.volume.create_snapshot(ctxt, volume_id, snapshot_obj)
# get snapshot's volume_glance_metadata
snap_glance_meta = db.volume_snapshot_glance_metadata_get(
snap = self._create_snapshot(volume_id)
snap_id = snap['id']
snap_stat = snap['status']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
self.assertTrue(snap_id)
self.assertTrue(snap_stat)
self.volume.create_snapshot,
ctxt,
volume_id,
- snap_id)
+ snapshot_obj)
# get snapshot's volume_glance_metadata
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_update(self.context, volume_id, {'bootable': True})
snapshot_id = self._create_snapshot(volume['id'])['id']
- self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_snapshot_glance_metadata_get,
self.context, snapshot_id)
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
- snapshot_id = self._create_snapshot(volume_id)['id']
- self.volume.create_snapshot(self.context, volume_id, snapshot_id)
+ snapshot_id = self._create_snapshot(volume_id,
+ size=volume['size'])['id']
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume_id, snapshot_obj)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
snapshot_ref = db.snapshot_get(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual("available", snapshot_ref.status)
self.mox.UnsetStubs()
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume_id)
@test.testtools.skipIf(sys.platform == "darwin", "SKIP on OSX")
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
snapshot_id = self._create_snapshot(volume_id)['id']
- self.volume.create_snapshot(self.context, volume_id, snapshot_id)
+ snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ self.volume.create_snapshot(self.context, volume_id, snapshot_obj)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
- self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot_obj)
snapshot_ref = db.snapshot_get(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual("available", snapshot_ref.status)
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.delete_snapshot,
self.context,
- snapshot_id)
+ snapshot_obj)
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.delete_volume,
self.context,
# create raw snapshot
volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'])
+ snapshot_obj = objects.Snapshot.get_by_id(self.context,
+ snapshot['id'])
self.assertIsNone(snapshot['display_name'])
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
- volume_api.update_snapshot(self.context, snapshot, update_dict)
+ volume_api.update_snapshot(self.context, snapshot_obj, update_dict)
# read changes from db
snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
self.assertEqual(snap['display_name'], 'test update name')
from cinder import context
from cinder import db
+from cinder import objects
from cinder import test
+from cinder.tests import fake_snapshot
from cinder.volume import rpcapi as volume_rpcapi
volume = db.volume_create(self.context, vol)
snpshot = {
+ 'id': 1,
'volume_id': 'fake_id',
'status': "creating",
'progress': '0%',
self.fake_volume = jsonutils.to_primitive(volume)
self.fake_volume_metadata = volume["volume_metadata"]
self.fake_snapshot = jsonutils.to_primitive(snapshot)
+ self.fake_snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
+ **snpshot)
self.fake_reservations = ["RESERVATION"]
def test_serialized_volume_has_id(self):
snapshot = expected_msg['snapshot']
del expected_msg['snapshot']
expected_msg['snapshot_id'] = snapshot['id']
+ expected_msg['snapshot'] = snapshot
if 'host' in expected_msg:
del expected_msg['host']
if 'dest_host' in expected_msg:
self.assertEqual(arg, expected_arg)
for kwarg, value in self.fake_kwargs.items():
- self.assertEqual(value, expected_msg[kwarg])
+ if isinstance(value, objects.Snapshot):
+ expected_snapshot = expected_msg[kwarg].obj_to_primitive()
+ snapshot = value.obj_to_primitive()
+ self.assertEqual(expected_snapshot, snapshot)
+ else:
+ self.assertEqual(expected_msg[kwarg], value)
def test_create_volume(self):
self._test_volume_api('create_volume',
self._test_volume_api('create_snapshot',
rpc_method='cast',
volume=self.fake_volume,
- snapshot=self.fake_snapshot)
+ snapshot=self.fake_snapshot_obj)
def test_delete_snapshot(self):
self._test_volume_api('delete_snapshot',
rpc_method='cast',
- snapshot=self.fake_snapshot,
+ snapshot=self.fake_snapshot_obj,
host='fake_host')
def test_attach_volume_to_instance(self):
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import glance
from cinder import keymgr
+from cinder import objects
+from cinder.objects import base as objects_base
from cinder.openstack.common import log as logging
import cinder.policy
from cinder import quota
'project_id': context.project_id,
'user_id': context.user_id,
}
- target.update(target_obj or {})
+
+ if isinstance(target_obj, objects_base.CinderObject):
+ # Turn object into dict so target.update can work
+ target.update(objects_base.obj_to_primitive(target_obj) or {})
+ else:
+ target.update(target_obj or {})
+
_action = 'volume:%s' % action
cinder.policy.enforce(context, _action, target)
return volumes
def get_snapshot(self, context, snapshot_id):
- check_policy(context, 'get_snapshot')
- rv = self.db.snapshot_get(context, snapshot_id)
- return dict(rv.iteritems())
+ return objects.Snapshot.get_by_id(context, snapshot_id)
def get_volume(self, context, volume_id):
check_policy(context, 'get_volume')
allowed=quotas[over])
self._check_metadata_properties(metadata)
- options = {'volume_id': volume['id'],
- 'cgsnapshot_id': cgsnapshot_id,
- 'user_id': context.user_id,
- 'project_id': context.project_id,
- 'status': "creating",
- 'progress': '0%',
- 'volume_size': volume['size'],
- 'display_name': name,
- 'display_description': description,
- 'volume_type_id': volume['volume_type_id'],
- 'encryption_key_id': volume['encryption_key_id'],
- 'metadata': metadata}
snapshot = None
try:
- snapshot = self.db.snapshot_create(context, options)
+ kwargs = {
+ 'volume_id': volume['id'],
+ 'cgsnapshot_id': cgsnapshot_id,
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'status': 'creating',
+ 'progress': '0%',
+ 'volume_size': volume['size'],
+ 'display_name': name,
+ 'display_description': description,
+ 'volume_type_id': volume['volume_type_id'],
+ 'encryption_key_id': volume['encryption_key_id'],
+ 'metadata': metadata or {}
+ }
+ snapshot = objects.Snapshot(context=context, **kwargs)
+ snapshot.create()
+
QUOTAS.commit(context, reservations)
except Exception:
with excutils.save_and_reraise_exception():
try:
- if snapshot:
- self.db.snapshot_destroy(context, snapshot['id'])
+ if hasattr(snapshot, 'id'):
+ snapshot.destroy()
finally:
QUOTAS.rollback(context, reservations)
'consistency group.') % snapshot['id']
LOG.error(msg)
raise exception.InvalidSnapshot(reason=msg)
- self.db.snapshot_update(context, snapshot['id'],
- {'status': 'deleting'})
- volume = self.db.volume_get(context, snapshot['volume_id'])
- self.volume_rpcapi.delete_snapshot(context, snapshot, volume['host'])
+
+ snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ snapshot_obj.status = 'deleting'
+ snapshot_obj.save(context)
+
+ volume = self.db.volume_get(context, snapshot_obj.volume_id)
+ self.volume_rpcapi.delete_snapshot(context, snapshot_obj,
+ volume['host'])
LOG.info(_LI('Succesfully issued request to '
- 'delete snapshot: %s.'), snapshot['id'])
+ 'delete snapshot: %s'), snapshot_obj.id)
@wrap_check_policy
def update_snapshot(self, context, snapshot, fields):
- self.db.snapshot_update(context, snapshot['id'], fields)
+ snapshot.update(fields)
+ snapshot.save(context)
@wrap_check_policy
def get_volume_metadata(self, context, volume):
def get_snapshot_metadata(self, context, snapshot):
"""Get all metadata associated with a snapshot."""
- rv = self.db.snapshot_metadata_get(context, snapshot['id'])
- return dict(rv.iteritems())
+ snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ return snapshot_obj.metadata
def delete_snapshot_metadata(self, context, snapshot, key):
"""Delete the given metadata item from a snapshot."""
- self.db.snapshot_metadata_delete(context, snapshot['id'], key)
+ snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ snapshot_obj.delete_metadata_key(context, key)
def update_snapshot_metadata(self, context,
snapshot, metadata,
if delete:
_metadata = metadata
else:
- orig_meta = self.get_snapshot_metadata(context, snapshot)
+ orig_meta = snapshot.metadata
_metadata = orig_meta.copy()
_metadata.update(metadata)
self._check_metadata_properties(_metadata)
- db_meta = self.db.snapshot_metadata_update(context,
- snapshot['id'],
- _metadata,
- True)
+ snapshot.metadata = _metadata
+ snapshot.save(context)
# TODO(jdg): Implement an RPC call for drivers that may use this info
- return db_meta
+ return snapshot.metadata
def get_snapshot_metadata_value(self, snapshot, key):
pass
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
- size_in_g = volume.get('size', volume.get('volume_size', None))
+ size_in_g = volume.get('volume_size') or volume.get('size')
if size_in_g is None:
msg = (_LE("Size for volume: %s not found, "
"cannot secure delete.") % volume['id'])
snapshot e.g. delete SnapA while create volume VolA from SnapA is in
progress.
"""
- def lso_inner1(inst, context, snapshot_id, **kwargs):
- @utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True)
+ def lso_inner1(inst, context, snapshot, **kwargs):
+ @utils.synchronized("%s-%s" % (snapshot.id, f.__name__), external=True)
def lso_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
- return lso_inner2(inst, context, snapshot_id, **kwargs)
+ return lso_inner2(inst, context, snapshot, **kwargs)
return lso_inner1
return True
- def create_snapshot(self, context, volume_id, snapshot_id):
+ def create_snapshot(self, context, volume_id, snapshot):
"""Creates and exports the snapshot."""
- caller_context = context
context = context.elevated()
- snapshot_ref = self.db.snapshot_get(context, snapshot_id)
- LOG.info(_LI("snapshot %s: creating"), snapshot_ref['id'])
+ LOG.info(_LI("snapshot %s: creating"), snapshot.id)
self._notify_about_snapshot_usage(
- context, snapshot_ref, "create.start")
+ context, snapshot, "create.start")
try:
# NOTE(flaper87): Verify the driver is enabled
utils.require_driver_initialized(self.driver)
LOG.debug("snapshot %(snap_id)s: creating",
- {'snap_id': snapshot_ref['id']})
+ {'snap_id': snapshot.id})
# Pass context so that drivers that want to use it, can,
# but it is not a requirement for all drivers.
- snapshot_ref['context'] = caller_context
+ snapshot.context = context
- model_update = self.driver.create_snapshot(snapshot_ref)
+ model_update = self.driver.create_snapshot(snapshot)
if model_update:
- self.db.snapshot_update(context, snapshot_ref['id'],
- model_update)
+ snapshot.update(model_update)
+ snapshot.save(context)
except Exception:
with excutils.save_and_reraise_exception():
- self.db.snapshot_update(context,
- snapshot_ref['id'],
- {'status': 'error'})
+ snapshot.status = 'error'
+ snapshot.save(context)
vol_ref = self.db.volume_get(context, volume_id)
if vol_ref.bootable:
try:
self.db.volume_glance_metadata_copy_to_snapshot(
- context, snapshot_ref['id'], volume_id)
+ context, snapshot.id, volume_id)
except exception.GlanceMetadataNotFound:
# If volume is not created from image, No glance metadata
# would be available for that volume in
" metadata using the provided volumes"
" %(volume_id)s metadata") %
{'volume_id': volume_id,
- 'snapshot_id': snapshot_id})
- self.db.snapshot_update(context,
- snapshot_ref['id'],
- {'status': 'error'})
+ 'snapshot_id': snapshot.id})
+ snapshot.status = 'error'
+ snapshot.save(context)
raise exception.MetadataCopyFailure(reason=ex)
- snapshot_ref = self.db.snapshot_update(context,
- snapshot_ref['id'],
- {'status': 'available',
- 'progress': '100%'})
+ snapshot.status = 'available'
+ snapshot.progress = '100%'
+ snapshot.save(context)
- LOG.info(_LI("snapshot %s: created successfully"), snapshot_ref['id'])
- self._notify_about_snapshot_usage(context, snapshot_ref, "create.end")
- return snapshot_id
+ LOG.info(_("snapshot %s: created successfully"), snapshot.id)
+ self._notify_about_snapshot_usage(context, snapshot, "create.end")
+ return snapshot.id
@locked_snapshot_operation
- def delete_snapshot(self, context, snapshot_id):
+ def delete_snapshot(self, context, snapshot):
"""Deletes and unexports snapshot."""
- caller_context = context
context = context.elevated()
- snapshot_ref = self.db.snapshot_get(context, snapshot_id)
- project_id = snapshot_ref['project_id']
+ project_id = snapshot.project_id
- LOG.info(_LI("snapshot %s: deleting"), snapshot_ref['id'])
+ LOG.info(_("snapshot %s: deleting"), snapshot.id)
self._notify_about_snapshot_usage(
- context, snapshot_ref, "delete.start")
+ context, snapshot, "delete.start")
try:
# NOTE(flaper87): Verify the driver is enabled
# and the snapshot status updated.
utils.require_driver_initialized(self.driver)
- LOG.debug("snapshot %s: deleting", snapshot_ref['id'])
+ LOG.debug("snapshot %s: deleting", snapshot.id)
# Pass context so that drivers that want to use it, can,
# but it is not a requirement for all drivers.
- snapshot_ref['context'] = caller_context
+ snapshot.context = context
+ snapshot.save()
- self.driver.delete_snapshot(snapshot_ref)
+ self.driver.delete_snapshot(snapshot)
except exception.SnapshotIsBusy:
LOG.error(_LE("Cannot delete snapshot %s: snapshot is busy"),
- snapshot_ref['id'])
- self.db.snapshot_update(context,
- snapshot_ref['id'],
- {'status': 'available'})
+ snapshot.id)
+ snapshot.status = 'available'
+ snapshot.save()
return True
except Exception:
with excutils.save_and_reraise_exception():
- self.db.snapshot_update(context,
- snapshot_ref['id'],
- {'status': 'error_deleting'})
+ snapshot.status = 'error_deleting'
+ snapshot.save()
# Get reservations
try:
else:
reserve_opts = {
'snapshots': -1,
- 'gigabytes': -snapshot_ref['volume_size'],
+ 'gigabytes': -snapshot.volume_size,
}
- volume_ref = self.db.volume_get(context, snapshot_ref['volume_id'])
+ volume_ref = self.db.volume_get(context, snapshot.volume_id)
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume_ref.get('volume_type_id'))
except Exception:
reservations = None
LOG.exception(_LE("Failed to update usages deleting snapshot"))
- self.db.volume_glance_metadata_delete_by_snapshot(context, snapshot_id)
- self.db.snapshot_destroy(context, snapshot_id)
- LOG.info(_LI("snapshot %s: deleted successfully"), snapshot_ref['id'])
- self._notify_about_snapshot_usage(context, snapshot_ref, "delete.end")
+ self.db.volume_glance_metadata_delete_by_snapshot(context, snapshot.id)
+ snapshot.destroy(context)
+ LOG.info(_LI("snapshot %s: deleted successfully"), snapshot.id)
+ self._notify_about_snapshot_usage(context, snapshot, "delete.end")
# Commit the reservations
if reservations:
create_cgsnapshot, and delete_cgsnapshot. Also adds
the consistencygroup_id parameter in create_volume.
1.19 - Adds update_migrated_volume
+ 1.20 - Adds support for sending objects over RPC in create_snapshot()
+ and delete_snapshot()
'''
BASE_RPC_API_VERSION = '1.0'
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
- self.client = rpc.get_client(target, '1.19', serializer=serializer)
+ self.client = rpc.get_client(target, '1.20', serializer=serializer)
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host)
cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
- snapshot_id=snapshot['id'])
+ snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host)
- cctxt.cast(ctxt, 'delete_snapshot', snapshot_id=snapshot['id'])
+ cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot)
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
# checked elsewhere. We also ignore cinder.tests for now due to high false
# positive rate.
ignore_modules = ["cinder/openstack/common/", "cinder/tests/"]
-# Note(thangp): E1101 should be ignored for only cinder.object modules.
-# E1101 is error code related to accessing a non-existent member of an
-# object, but should be ignored because the object member is created
-# dynamically.
-objects_ignore_codes = ["E1101"]
+
+# Note(thangp): E0213, E1101, and E1102 should be ignored for only
+# cinder.object modules. E0213 and E1102 are error codes related to
+# the first argument of a method, but should be ignored because the method
+# is a remotable class method. E1101 is error code related to accessing a
+# non-existent member of an object, but should be ignored because the object
+# member is created dynamically.
+objects_ignore_codes = ["E0213", "E1101", "E1102"]
+# Note(thangp): The error messages are for codes [E1120, E1101] appearing in
+# the cinder code base using objects. E1120 is an error code related no value
+# passed for a parameter in function call, but should be ignored because it is
+# reporting false positives. E1101 is error code related to accessing a
+# non-existent member of an object, but should be ignored because the object
+# member is created dynamically.
+objects_ignore_messages = [
+ "No value passed for parameter 'id' in function call",
+ "Module 'cinder.objects' has no 'Snapshot' member",
+]
objects_ignore_modules = ["cinder/objects/"]
KNOWN_PYLINT_EXCEPTIONS_FILE = "tools/pylint_exceptions"
return True
if any(self.filename.startswith(name) for name in ignore_modules):
return True
- if any(msg in self.message for msg in ignore_messages):
+ if any(msg in self.message for msg in
+ (ignore_messages + objects_ignore_messages)):
+ return True
+ if (self.code in objects_ignore_codes and
+ any(self.filename.startswith(name)
+ for name in objects_ignore_modules)):
return True
if (self.code in objects_ignore_codes and
any(self.filename.startswith(name)