# NOTE(danms): You must make sure your object gets imported in this
# function in order for it to be registered by services that may
# need to receive it via RPC.
- __import__('cinder.objects.volume')
- __import__('cinder.objects.service')
- __import__('cinder.objects.snapshot')
__import__('cinder.objects.backup')
__import__('cinder.objects.consistencygroup')
+ __import__('cinder.objects.service')
+ __import__('cinder.objects.snapshot')
+ __import__('cinder.objects.volume')
+ __import__('cinder.objects.volume_attachment')
+ __import__('cinder.objects.volume_type')
class CinderComparableObject(base.ComparableVersionedObject):
- pass
+ def __eq__(self, obj):
+ if hasattr(obj, 'obj_to_primitive'):
+ return self.obj_to_primitive() == obj.obj_to_primitive()
+ return False
class ObjectListBase(base.ObjectListBase):
from cinder import utils
CONF = cfg.CONF
-OPTIONAL_FIELDS = []
+OPTIONAL_FIELDS = ['metadata', 'admin_metadata',
+ 'volume_type', 'volume_attachment']
LOG = logging.getLogger(__name__)
@base.CinderObjectRegistry.register
class Volume(base.CinderPersistentObject, base.CinderObject,
- base.CinderObjectDictCompat):
+ base.CinderObjectDictCompat, base.CinderComparableObject):
# Version 1.0: Initial version
- VERSION = '1.0'
+ # Version 1.1: Added metadata, admin_metadata, volume_attachment, and
+ # volume_type
+ VERSION = '1.1'
fields = {
'id': fields.UUIDField(),
'deleted': fields.BooleanField(default=False),
'bootable': fields.BooleanField(default=False),
+ 'multiattach': fields.BooleanField(default=False),
'replication_status': fields.StringField(nullable=True),
'replication_extended_status': fields.StringField(nullable=True),
'replication_driver_data': fields.StringField(nullable=True),
'previous_status': fields.StringField(nullable=True),
+
+ 'metadata': fields.DictOfStringsField(nullable=True),
+ 'admin_metadata': fields.DictOfStringsField(nullable=True),
+ 'volume_type': fields.ObjectField('VolumeType', nullable=True),
+ 'volume_attachment': fields.ListOfObjectsField('VolumeAttachment',
+ nullable=True),
}
# NOTE(thangp): obj_extra_fields is used to hold properties that are not
def name(self):
return CONF.volume_name_template % self.name_id
+ def __init__(self, *args, **kwargs):
+ super(Volume, self).__init__(*args, **kwargs)
+ self._orig_metadata = {}
+ self._orig_admin_metadata = {}
+
+ self._reset_metadata_tracking()
+
+ def obj_reset_changes(self, fields=None):
+ super(Volume, self).obj_reset_changes(fields)
+ self._reset_metadata_tracking(fields=fields)
+
+ def _reset_metadata_tracking(self, fields=None):
+ if fields is None or 'metadata' in fields:
+ self._orig_metadata = (dict(self.metadata)
+ if 'metadata' in self else {})
+ if fields is None or 'admin_metadata' in fields:
+ self._orig_admin_metadata = (dict(self.admin_metadata)
+ if 'admin_metadata' in self
+ else {})
+
+ def obj_what_changed(self):
+ changes = super(Volume, self).obj_what_changed()
+ if 'metadata' in self and self.metadata != self._orig_metadata:
+ changes.add('metadata')
+ if ('admin_metadata' in self and
+ self.admin_metadata != self._orig_admin_metadata):
+ changes.add('admin_metadata')
+
+ return changes
+
def obj_make_compatible(self, primitive, target_version):
"""Make an object representation compatible with a target version."""
super(Volume, self).obj_make_compatible(primitive, target_version)
target_version = utils.convert_version_to_tuple(target_version)
@staticmethod
- def _from_db_object(context, volume, db_volume):
+ def _from_db_object(context, volume, db_volume, expected_attrs=None):
+ if expected_attrs is None:
+ expected_attrs = []
for name, field in volume.fields.items():
- value = db_volume[name]
+ if name in OPTIONAL_FIELDS:
+ continue
+ value = db_volume.get(name)
if isinstance(field, fields.IntegerField):
value = value or 0
volume[name] = value
+ # Get data from db_volume object that was queried by joined query
+ # from DB
+ if 'metadata' in expected_attrs:
+ volume.metadata = {}
+ metadata = db_volume.get('volume_metadata', [])
+ if metadata:
+ volume.metadata = {item['key']: item['value']
+ for item in metadata}
+ if 'admin_metadata' in expected_attrs:
+ volume.admin_metadata = {}
+ metadata = db_volume.get('volume_admin_metadata', [])
+ if metadata:
+ volume.admin_metadata = {item['key']: item['value']
+ for item in metadata}
+ if 'volume_type' in expected_attrs:
+ db_volume_type = db_volume.get('volume_type')
+ if db_volume_type:
+ volume.volume_type = objects.VolumeType._from_db_object(
+ context, objects.VolumeType(), db_volume_type,
+ expected_attrs='extra_specs')
+ if 'volume_attachment' in expected_attrs:
+ attachments = base.obj_make_list(
+ context, objects.VolumeAttachmentList(context),
+ objects.VolumeAttachment,
+ db_volume.get('volume_attachment'))
+ volume.volume_attachment = attachments.objects
+
volume._context = context
volume.obj_reset_changes()
return volume
@base.remotable_classmethod
def get_by_id(cls, context, id):
db_volume = db.volume_get(context, id)
- return cls._from_db_object(context, cls(context), db_volume)
+ expected_attrs = ['admin_metadata', 'metadata']
+ return cls._from_db_object(context, cls(context), db_volume,
+ expected_attrs=expected_attrs)
@base.remotable
def create(self):
def save(self):
updates = self.cinder_obj_get_changes()
if updates:
- db.volume_update(self._context, self.id, updates)
+ if 'metadata' in updates:
+ # Metadata items that are not specified in the
+ # self.metadata will be deleted
+ metadata = updates.pop('metadata', None)
+ self.metadata = db.volume_metadata_update(self._context,
+ self.id, metadata,
+ True)
+ if self._context.is_admin and 'admin_metadata' in updates:
+ metadata = updates.pop('admin_metadata', None)
+ self.admin_metadata = db.volume_admin_metadata_update(
+ self._context, self.id, metadata, True)
- self.obj_reset_changes()
+ db.volume_update(self._context, self.id, updates)
+ self.obj_reset_changes()
@base.remotable
def destroy(self):
- db.volume_destroy(self._context, self.id)
+ with self.obj_as_admin():
+ db.volume_destroy(self._context, self.id)
+
+ def obj_load_attr(self, attrname):
+ if attrname not in OPTIONAL_FIELDS:
+ raise exception.ObjectActionError(
+ action='obj_load_attr',
+ reason=_('attribute %s not lazy-loadable') % attrname)
+ if not self._context:
+ raise exception.OrphanedObjectError(method='obj_load_attr',
+ objtype=self.obj_name())
+
+ if attrname == 'metadata':
+ self.metadata = db.volume_metadata_get(self._context, self.id)
+ elif attrname == 'admin_metadata':
+ self.admin_metadata = {}
+ if self._context.is_admin:
+ self.admin_metadata = db.volume_admin_metadata_get(
+ self._context, self.id)
+ elif attrname == 'volume_type':
+ self.volume_type = objects.VolumeType.get_by_id(
+ self._context, self.volume_type_id)
+ elif attrname == 'volume_attachment':
+ attachments = (
+ objects.VolumeAttachmentList.get_all_by_volume_id(
+ self._context, self.id))
+ self.volume_attachment = attachments.objects
+
+ self.obj_reset_changes(fields=[attrname])
+
+ def delete_metadata_key(self, key):
+ db.volume_metadata_delete(self._context, self.id, key)
+ md_was_changed = 'metadata' in self.obj_what_changed()
+
+ del self.metadata[key]
+ self._orig_metadata.pop(key, None)
+
+ if not md_was_changed:
+ self.obj_reset_changes(['metadata'])
@base.CinderObjectRegistry.register
class VolumeList(base.ObjectListBase, base.CinderObject):
- VERSION = '1.0'
+ VERSION = '1.1'
fields = {
'objects': fields.ListOfObjectsField('Volume'),
}
+
child_versions = {
- '1.0': '1.0'
+ '1.0': '1.0',
+ '1.1': '1.1',
}
@base.remotable_classmethod
- def get_all(cls, context, marker, limit, sort_key, sort_dir,
- filters=None):
- volumes = db.volume_get_all(context, marker, limit, sort_key,
- sort_dir, filters=filters)
+ def get_all(cls, context, marker, limit, sort_keys=None, sort_dirs=None,
+ filters=None, offset=None):
+ volumes = db.volume_get_all(context, marker, limit,
+ sort_keys=sort_keys, sort_dirs=sort_dirs,
+ filters=filters, offset=offset)
+ expected_attrs = ['admin_metadata', 'metadata']
+ return base.obj_make_list(context, cls(context), objects.Volume,
+ volumes, expected_attrs=expected_attrs)
+
+ @base.remotable_classmethod
+ def get_all_by_host(cls, context, host, filters=None):
+ volumes = db.volume_get_all_by_host(context, host, filters)
+ expected_attrs = ['admin_metadata', 'metadata']
+ return base.obj_make_list(context, cls(context), objects.Volume,
+ volumes, expected_attrs=expected_attrs)
+
+ @base.remotable_classmethod
+ def get_all_by_group(cls, context, group_id, filters=None):
+ volumes = db.volume_get_all_by_group(context, group_id, filters)
+ expected_attrs = ['admin_metadata', 'metadata']
+ return base.obj_make_list(context, cls(context), objects.Volume,
+ volumes, expected_attrs=expected_attrs)
+
+ @base.remotable_classmethod
+ def get_all_by_project(cls, context, project_id, marker, limit,
+ sort_keys=None, sort_dirs=None, filters=None,
+ offset=None):
+ volumes = db.volume_get_all_by_project(context, project_id, marker,
+ limit, sort_keys=sort_keys,
+ sort_dirs=sort_dirs,
+ filters=filters, offset=offset)
+ expected_attrs = ['admin_metadata', 'metadata']
return base.obj_make_list(context, cls(context), objects.Volume,
- volumes)
+ volumes, expected_attrs=expected_attrs)
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_versionedobjects import fields
+
+from cinder import db
+from cinder import objects
+from cinder.objects import base
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+@base.CinderObjectRegistry.register
+class VolumeAttachment(base.CinderPersistentObject, base.CinderObject,
+ base.CinderObjectDictCompat,
+ base.CinderComparableObject):
+ # Version 1.0: Initial version
+ VERSION = '1.0'
+
+ fields = {
+ 'id': fields.UUIDField(),
+ 'volume_id': fields.UUIDField(),
+ 'instance_uuid': fields.UUIDField(nullable=True),
+ 'attached_host': fields.StringField(nullable=True),
+ 'mountpoint': fields.StringField(nullable=True),
+
+ 'attach_time': fields.DateTimeField(nullable=True),
+ 'detach_time': fields.DateTimeField(nullable=True),
+
+ 'attach_status': fields.StringField(nullable=True),
+ 'attach_mode': fields.StringField(nullable=True),
+ }
+
+ @staticmethod
+ def _from_db_object(context, attachment, db_attachment):
+ for name, field in attachment.fields.items():
+ value = db_attachment.get(name)
+ if isinstance(field, fields.IntegerField):
+ value = value or 0
+ attachment[name] = value
+
+ attachment._context = context
+ attachment.obj_reset_changes()
+ return attachment
+
+ @base.remotable_classmethod
+ def get_by_id(cls, context, id):
+ db_attach = db.volume_attachment_get(context, id)
+ return cls._from_db_object(context, cls(context), db_attach)
+
+ @base.remotable
+ def save(self):
+ updates = self.cinder_obj_get_changes()
+ if updates:
+ db.volume_attachment_update(self._context, self.id, updates)
+ self.obj_reset_changes()
+
+
+@base.CinderObjectRegistry.register
+class VolumeAttachmentList(base.ObjectListBase, base.CinderObject):
+ VERSION = '1.0'
+
+ fields = {
+ 'objects': fields.ListOfObjectsField('VolumeAttachment'),
+ }
+
+ child_versions = {
+ '1.0': '1.0',
+ }
+
+ @base.remotable_classmethod
+ def get_all_by_volume_id(cls, context, volume_id):
+ attachments = db.volume_attachment_get_used_by_volume_id(context,
+ volume_id)
+ return base.obj_make_list(context, cls(context),
+ objects.VolumeAttachment, attachments)
+
+ @base.remotable_classmethod
+ def get_all_by_host(cls, context, volume_id, host):
+ attachments = db.volume_attachment_get_by_host(context, volume_id,
+ host)
+ return base.obj_make_list(context, cls(context),
+ objects.VolumeAttachment, attachments)
+
+ @base.remotable_classmethod
+ def get_all_by_instance_uuid(cls, context, volume_id, instance_uuid):
+ attachments = db.volume_attachment_get_by_instance_uuid(
+ context, volume_id, instance_uuid)
+ return base.obj_make_list(context, cls(context),
+ objects.VolumeAttachment, attachments)
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_versionedobjects import fields
+
+from cinder import exception
+from cinder.i18n import _
+from cinder import objects
+from cinder.objects import base
+from cinder.volume import volume_types
+
+CONF = cfg.CONF
+OPTIONAL_FIELDS = ['extra_specs', 'projects']
+LOG = logging.getLogger(__name__)
+
+
+@base.CinderObjectRegistry.register
+class VolumeType(base.CinderPersistentObject, base.CinderObject,
+ base.CinderObjectDictCompat, base.CinderComparableObject):
+ # Version 1.0: Initial version
+ VERSION = '1.0'
+
+ fields = {
+ 'id': fields.UUIDField(),
+ 'name': fields.StringField(nullable=True),
+ 'description': fields.StringField(nullable=True),
+ 'is_public': fields.BooleanField(default=True),
+ 'projects': fields.ListOfStringsField(nullable=True),
+ 'extra_specs': fields.DictOfStringsField(nullable=True),
+ }
+
+ @staticmethod
+ def _from_db_object(context, type, db_type, expected_attrs=None):
+ if expected_attrs is None:
+ expected_attrs = []
+ for name, field in type.fields.items():
+ if name in OPTIONAL_FIELDS:
+ continue
+ value = db_type[name]
+ if isinstance(field, fields.IntegerField):
+ value = value or 0
+ type[name] = value
+
+ # Get data from db_type object that was queried by joined query
+ # from DB
+ if 'extra_specs' in expected_attrs:
+ type.extra_specs = {}
+ specs = db_type.get('extra_specs')
+ if specs and isinstance(specs, list):
+ type.extra_specs = {item['key']: item['value']
+ for item in specs}
+ elif specs and isinstance(specs, dict):
+ type.extra_specs = specs
+ if 'projects' in expected_attrs:
+ type.projects = db_type.get('projects', [])
+
+ type._context = context
+ type.obj_reset_changes()
+ return type
+
+ @base.remotable_classmethod
+ def get_by_id(cls, context, id):
+ db_volume_type = volume_types.get_volume_type(
+ context, id, expected_fields=['extra_specs', 'projects'])
+ expected_attrs = ['extra_specs', 'projects']
+ return cls._from_db_object(context, cls(context), db_volume_type,
+ expected_attrs=expected_attrs)
+
+ @base.remotable
+ def create(self):
+ if self.obj_attr_is_set('id'):
+ raise exception.ObjectActionError(action='create',
+ reason=_('already created'))
+ db_volume_type = volume_types.create(self._context, self.name,
+ self.extra_specs,
+ self.is_public, self.projects,
+ self.description)
+ self._from_db_object(self._context, self, db_volume_type)
+
+ @base.remotable
+ def save(self):
+ updates = self.cinder_obj_get_changes()
+ if updates:
+ volume_types.update(self._context, self.id, self.name,
+ self.description)
+ self.obj_reset_changes()
+
+ @base.remotable
+ def destroy(self):
+ with self.obj_as_admin():
+ volume_types.destroy(self._context, self.id)
+
+
+@base.CinderObjectRegistry.register
+class VolumeTypeList(base.ObjectListBase, base.CinderObject):
+ VERSION = '1.0'
+
+ fields = {
+ 'objects': fields.ListOfObjectsField('VolumeType'),
+ }
+
+ child_versions = {
+ '1.0': '1.0',
+ }
+
+ @base.remotable_classmethod
+ def get_all(cls, context, inactive=0, search_opts=None):
+ types = volume_types.get_all_types(context, inactive, search_opts)
+ expected_attrs = ['extra_specs', 'projects']
+ return base.obj_make_list(context, cls(context),
+ objects.VolumeType, types,
+ expected_attrs=expected_attrs)
'availability_zone': 'fake_availability_zone',
'status': 'available',
'attach_status': 'detached',
- 'previous_status': None
+ 'previous_status': None,
+ 'metadata': {},
+ 'admin_metadata': {},
+ 'volume_attachment': [],
+ 'volume_metadata': [],
+ 'volume_admin_metadata': [],
}
for name, field in objects.Volume.fields.items():
elif field.default != fields.UnspecifiedDefault:
db_volume[name] = field.default
else:
- raise Exception('fake_db_volume needs help with %s' % name)
+ raise Exception('fake_db_volume needs help with %s.' % name)
if updates:
db_volume.update(updates)
return db_volume
+def fake_db_volume_type(**updates):
+ db_volume_type = {
+ 'id': '1',
+ 'name': 'type-1',
+ 'description': 'A fake volume type',
+ 'is_public': True,
+ 'projects': [],
+ 'extra_specs': {},
+ }
+
+ for name, field in objects.VolumeType.fields.items():
+ if name in db_volume_type:
+ continue
+ if field.nullable:
+ db_volume_type[name] = None
+ elif field.default != fields.UnspecifiedDefault:
+ db_volume_type[name] = field.default
+ else:
+ raise Exception('fake_db_volume_type needs help with %s.' % name)
+
+ if updates:
+ db_volume_type.update(updates)
+
+ return db_volume_type
+
+
+def fake_db_volume_attachment(**updates):
+ db_volume_attachment = {
+ 'id': '1',
+ 'volume_id': '1',
+ }
+
+ for name, field in objects.VolumeAttachment.fields.items():
+ if name in db_volume_attachment:
+ continue
+ if field.nullable:
+ db_volume_attachment[name] = None
+ elif field.default != fields.UnspecifiedDefault:
+ db_volume_attachment[name] = field.default
+ else:
+ raise Exception(
+ 'fake_db_volume_attachment needs help with %s.' % name)
+
+ if updates:
+ db_volume_attachment.update(updates)
+
+ return db_volume_attachment
+
+
def fake_volume_obj(context, **updates):
- return objects.Volume._from_db_object(context, objects.Volume(),
- fake_db_volume(**updates))
+ expected_attrs = updates.pop('expected_attrs',
+ ['metadata', 'admin_metadata'])
+ vol = objects.Volume._from_db_object(context, objects.Volume(),
+ fake_db_volume(**updates),
+ expected_attrs=expected_attrs)
+ return vol
+
+
+def fake_volume_type_obj(context, **updates):
+ return objects.VolumeType._from_db_object(
+ context, objects.VolumeType(), fake_db_volume_type(**updates))
+
+
+def fake_volume_attachment_obj(context, **updates):
+ return objects.VolumeAttachment._from_db_object(
+ context, objects.VolumeAttachment(),
+ fake_db_volume_attachment(**updates))
import uuid
from iso8601 import iso8601
+from oslo_versionedobjects import fields
from cinder.objects import base
from cinder.tests.unit import objects as test_objects
self.obj.scheduled_at = now_tz
self.assertDictEqual({'scheduled_at': now},
self.obj.cinder_obj_get_changes())
+
+
+class TestCinderComparableObject(test_objects.BaseObjectsTestCase):
+ def test_comparable_objects(self):
+ @base.CinderObjectRegistry.register
+ class MyComparableObj(base.CinderObject,
+ base.CinderObjectDictCompat,
+ base.CinderComparableObject):
+ fields = {'foo': fields.Field(fields.Integer())}
+
+ class NonVersionedObject(object):
+ pass
+
+ obj1 = MyComparableObj(foo=1)
+ obj2 = MyComparableObj(foo=1)
+ obj3 = MyComparableObj(foo=2)
+ obj4 = NonVersionedObject()
+ self.assertTrue(obj1 == obj2)
+ self.assertFalse(obj1 == obj3)
+ self.assertFalse(obj1 == obj4)
+ self.assertNotEqual(obj1, None)
import mock
+from cinder import context
from cinder import objects
from cinder.tests.unit import fake_volume
from cinder.tests.unit import objects as test_objects
class TestVolume(test_objects.BaseObjectsTestCase):
+ def setUp(self):
+ super(TestVolume, self).setUp()
+ # NOTE (e0ne): base tests contains original RequestContext from
+ # oslo_context. We change it to our RequestContext implementation
+ # to have 'elevated' method
+ self.context = context.RequestContext(self.user_id, self.project_id,
+ is_admin=False)
+
@staticmethod
def _compare(test, db, obj):
for field, value in db.items():
+ if not hasattr(obj, field):
+ continue
+
test.assertEqual(db[field], obj[field])
+ @mock.patch('cinder.db.volume_glance_metadata_get', return_value={})
@mock.patch('cinder.db.volume_get')
- def test_get_by_id(self, volume_get):
+ def test_get_by_id(self, volume_get, volume_glance_metadata_get):
db_volume = fake_volume.fake_db_volume()
volume_get.return_value = db_volume
volume = objects.Volume.get_by_id(self.context, 1)
volume_update.assert_called_once_with(self.context, volume.id,
{'display_name': 'foobar'})
+ @mock.patch('cinder.db.volume_metadata_update',
+ return_value={'key1': 'value1'})
+ @mock.patch('cinder.db.volume_update')
+ def test_save_with_metadata(self, volume_update, metadata_update):
+ db_volume = fake_volume.fake_db_volume()
+ volume = objects.Volume._from_db_object(self.context,
+ objects.Volume(), db_volume)
+ volume.display_name = 'foobar'
+ volume.metadata = {'key1': 'value1'}
+ self.assertEqual({'display_name': 'foobar',
+ 'metadata': {'key1': 'value1'}},
+ volume.obj_get_changes())
+ volume.save()
+ volume_update.assert_called_once_with(self.context, volume.id,
+ {'display_name': 'foobar'})
+ metadata_update.assert_called_once_with(self.context, volume.id,
+ {'key1': 'value1'}, True)
+
+ @mock.patch('cinder.db.volume_admin_metadata_update',
+ return_value={'key1': 'value1'})
+ @mock.patch('cinder.db.volume_update')
+ def test_save_with_admin_metadata(self, volume_update,
+ admin_metadata_update):
+ # Test with no admin context
+ db_volume = fake_volume.fake_db_volume()
+ volume = objects.Volume._from_db_object(self.context,
+ objects.Volume(), db_volume)
+ volume.admin_metadata = {'key1': 'value1'}
+ volume.save()
+ self.assertFalse(admin_metadata_update.called)
+
+ # Test with admin context
+ admin_context = context.RequestContext(self.user_id, self.project_id,
+ is_admin=True)
+ volume = objects.Volume._from_db_object(admin_context,
+ objects.Volume(), db_volume)
+ volume.admin_metadata = {'key1': 'value1'}
+ volume.save()
+ admin_metadata_update.assert_called_once_with(
+ admin_context, volume.id, {'key1': 'value1'}, True)
+
@mock.patch('cinder.db.volume_destroy')
def test_destroy(self, volume_destroy):
db_volume = fake_volume.fake_db_volume()
volume = objects.Volume._from_db_object(self.context,
objects.Volume(), db_volume)
volume.destroy()
- volume_destroy.assert_called_once_with(self.context, '1')
+ self.assertTrue(volume_destroy.called)
+ admin_context = volume_destroy.call_args[0][0]
+ self.assertTrue(admin_context.is_admin)
def test_obj_fields(self):
volume = objects.Volume(context=self.context, id=2, _name_id=2)
previous_status='backing-up')
self.assertEqual('backing-up', volume.previous_status)
+ @mock.patch('cinder.db.volume_metadata_delete')
+ def test_delete_metadata_key(self, metadata_delete):
+ volume = objects.Volume(self.context, id=1)
+ volume.metadata = {'key1': 'value1', 'key2': 'value2'}
+ self.assertEqual({}, volume._orig_metadata)
+ volume.delete_metadata_key('key2')
+ self.assertEqual({'key1': 'value1'}, volume.metadata)
+ metadata_delete.assert_called_once_with(self.context, '1', 'key2')
+
class TestVolumeList(test_objects.BaseObjectsTestCase):
+ @mock.patch('cinder.db.volume_glance_metadata_get', return_value={})
@mock.patch('cinder.db.volume_get_all')
- def test_get_all(self, volume_get_all):
+ def test_get_all(self, volume_get_all, volume_glance_metadata_get):
db_volume = fake_volume.fake_db_volume()
volume_get_all.return_value = [db_volume]
mock.sentinel.sort_dir)
self.assertEqual(1, len(volumes))
TestVolume._compare(self, db_volume, volumes[0])
+
+ @mock.patch('cinder.db.volume_get_all_by_host')
+ def test_get_by_host(self, get_all_by_host):
+ db_volume = fake_volume.fake_db_volume()
+ get_all_by_host.return_value = [db_volume]
+
+ volumes = objects.VolumeList.get_all_by_host(
+ self.context, 'fake-host')
+ self.assertEqual(1, len(volumes))
+ TestVolume._compare(self, db_volume, volumes[0])
+
+ @mock.patch('cinder.db.volume_get_all_by_group')
+ def test_get_by_group(self, get_all_by_group):
+ db_volume = fake_volume.fake_db_volume()
+ get_all_by_group.return_value = [db_volume]
+
+ volumes = objects.VolumeList.get_all_by_group(
+ self.context, 'fake-host')
+ self.assertEqual(1, len(volumes))
+ TestVolume._compare(self, db_volume, volumes[0])
+
+ @mock.patch('cinder.db.volume_get_all_by_project')
+ def test_get_by_project(self, get_all_by_project):
+ db_volume = fake_volume.fake_db_volume()
+ get_all_by_project.return_value = [db_volume]
+
+ volumes = objects.VolumeList.get_all_by_project(
+ self.context, mock.sentinel.project_id, mock.sentinel.marker,
+ mock.sentinel.limit, mock.sentinel.sorted_keys,
+ mock.sentinel.sorted_dirs, mock.sentinel.filters)
+ self.assertEqual(1, len(volumes))
+ TestVolume._compare(self, db_volume, volumes[0])
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from cinder import objects
+from cinder.tests.unit import fake_volume
+from cinder.tests.unit import objects as test_objects
+
+
+class TestVolumeAttachment(test_objects.BaseObjectsTestCase):
+ @staticmethod
+ def _compare(test, db, obj):
+ for field, value in db.items():
+ test.assertEqual(db[field], obj[field])
+
+ @mock.patch('cinder.db.volume_attachment_get')
+ def test_get_by_id(self, volume_attachment_get):
+ db_attachment = fake_volume.fake_db_volume_attachment()
+ volume_attachment_get.return_value = db_attachment
+ attachment = objects.VolumeAttachment.get_by_id(self.context, '1')
+ self._compare(self, db_attachment, attachment)
+
+ @mock.patch('cinder.db.volume_attachment_update')
+ def test_save(self, volume_attachment_update):
+ attachment = fake_volume.fake_volume_attachment_obj(self.context)
+ attachment.attach_status = 'attaching'
+ attachment.save()
+ volume_attachment_update.assert_called_once_with(
+ self.context, attachment.id, {'attach_status': 'attaching'})
+
+
+class TestVolumeAttachmentList(test_objects.BaseObjectsTestCase):
+ @mock.patch('cinder.db.volume_attachment_get_used_by_volume_id')
+ def test_get_all_by_volume_id(self, get_used_by_volume_id):
+ db_attachment = fake_volume.fake_db_volume_attachment()
+ get_used_by_volume_id.return_value = [db_attachment]
+
+ attachments = objects.VolumeAttachmentList.get_all_by_volume_id(
+ self.context, mock.sentinel.volume_id)
+ self.assertEqual(1, len(attachments))
+ TestVolumeAttachment._compare(self, db_attachment, attachments[0])
+
+ @mock.patch('cinder.db.volume_attachment_get_by_host')
+ def test_get_all_by_host(self, get_by_host):
+ db_attachment = fake_volume.fake_db_volume_attachment()
+ get_by_host.return_value = [db_attachment]
+
+ attachments = objects.VolumeAttachmentList.get_all_by_host(
+ self.context, mock.sentinel.volume_id, mock.sentinel.host)
+ self.assertEqual(1, len(attachments))
+ TestVolumeAttachment._compare(self, db_attachment, attachments[0])
+
+ @mock.patch('cinder.db.volume_attachment_get_by_instance_uuid')
+ def test_get_all_by_instance_uuid(self, get_by_instance_uuid):
+ db_attachment = fake_volume.fake_db_volume_attachment()
+ get_by_instance_uuid.return_value = [db_attachment]
+
+ attachments = objects.VolumeAttachmentList.get_all_by_instance_uuid(
+ self.context, mock.sentinel.volume_id, mock.sentinel.uuid)
+ self.assertEqual(1, len(attachments))
+ TestVolumeAttachment._compare(self, db_attachment, attachments[0])
--- /dev/null
+# Copyright 2015 SimpliVity Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from cinder import context
+from cinder import objects
+from cinder.tests.unit import fake_volume
+from cinder.tests.unit import objects as test_objects
+
+
+class TestVolumeType(test_objects.BaseObjectsTestCase):
+ def setUp(self):
+ super(TestVolumeType, self).setUp()
+ # NOTE (e0ne): base tests contains original RequestContext from
+ # oslo_context. We change it to our RequestContext implementation
+ # to have 'elevated' method
+ self.context = context.RequestContext(self.user_id, self.project_id,
+ is_admin=False)
+
+ @staticmethod
+ def _compare(test, db, obj):
+ for field, value in db.items():
+ test.assertEqual(db[field], obj[field])
+
+ @mock.patch('cinder.db.volume_type_get')
+ def test_get_by_id(self, volume_type_get):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ volume_type_get.return_value = db_volume_type
+ volume_type = objects.VolumeType.get_by_id(self.context, '1')
+ self._compare(self, db_volume_type, volume_type)
+
+ @mock.patch('cinder.volume.volume_types.create')
+ def test_create(self, volume_type_create):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ volume_type_create.return_value = db_volume_type
+
+ volume_type = objects.VolumeType(context=self.context)
+ volume_type.name = db_volume_type['name']
+ volume_type.extra_specs = db_volume_type['extra_specs']
+ volume_type.is_public = db_volume_type['is_public']
+ volume_type.projects = db_volume_type['projects']
+ volume_type.description = db_volume_type['description']
+ volume_type.create()
+
+ volume_type_create.assert_called_once_with(
+ self.context, db_volume_type['name'],
+ db_volume_type['extra_specs'], db_volume_type['is_public'],
+ db_volume_type['projects'], db_volume_type['description'])
+
+ @mock.patch('cinder.volume.volume_types.update')
+ def test_save(self, volume_type_update):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ volume_type = objects.VolumeType._from_db_object(self.context,
+ objects.VolumeType(),
+ db_volume_type)
+ volume_type.description = 'foobar'
+ volume_type.save()
+ volume_type_update.assert_called_once_with(self.context,
+ volume_type.id,
+ volume_type.name,
+ volume_type.description)
+
+ @mock.patch('cinder.volume.volume_types.destroy')
+ def test_destroy(self, volume_type_destroy):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ volume_type = objects.VolumeType._from_db_object(self.context,
+ objects.VolumeType(),
+ db_volume_type)
+ volume_type.destroy()
+ self.assertTrue(volume_type_destroy.called)
+ admin_context = volume_type_destroy.call_args[0][0]
+ self.assertTrue(admin_context.is_admin)
+
+
+class TestVolumeTypeList(test_objects.BaseObjectsTestCase):
+ @mock.patch('cinder.volume.volume_types.get_all_types')
+ def test_get_all(self, get_all_types):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ get_all_types.return_value = [db_volume_type]
+
+ volume_types = objects.VolumeTypeList.get_all(self.context)
+ self.assertEqual(1, len(volume_types))
+ TestVolumeType._compare(self, db_volume_type, volume_types[0])