def image_volume_cache_get_all_for_host(context, host):
"""Query for all image volume cache entry for a host."""
return IMPL.image_volume_cache_get_all_for_host(context, host)
+
+
+###################
+
+
+def get_model_for_versioned_object(versioned_object):
+ return IMPL.get_model_for_versioned_object(versioned_object)
+
+
+def get_by_id(context, model, id, *args, **kwargs):
+ return IMPL.get_by_id(context, model, id, *args, **kwargs)
import datetime as dt
import functools
+import re
import sys
import threading
import time
expected_fields=expected_fields)
+def _volume_type_get_full(context, id):
+ """Return dict for a specific volume_type with extra_specs and projects."""
+ return _volume_type_get(context, id, session=None, inactive=False,
+ expected_fields=('extra_specs', 'projects'))
+
+
@require_context
def _volume_type_ref_get(context, id, session=None, inactive=False):
read_deleted = "yes" if inactive else "no"
filter_by(host=host).\
order_by(desc(models.ImageVolumeCacheEntry.last_used)).\
all()
+
+
+###############################
+
+
+def get_model_for_versioned_object(versioned_object):
+ # Exceptions to model mapping, in general Versioned Objects have the same
+ # name as their ORM models counterparts, but there are some that diverge
+ VO_TO_MODEL_EXCEPTIONS = {
+ 'BackupImport': models.Backup,
+ 'VolumeType': models.VolumeTypes,
+ 'CGSnapshot': models.Cgsnapshot,
+ }
+
+ model_name = versioned_object.obj_name()
+ return (VO_TO_MODEL_EXCEPTIONS.get(model_name) or
+ getattr(models, model_name))
+
+
+def _get_get_method(model):
+ # Exceptions to model to get methods, in general method names are a simple
+ # conversion changing ORM name from camel case to snake format and adding
+ # _get to the string
+ GET_EXCEPTIONS = {
+ models.ConsistencyGroup: consistencygroup_get,
+ models.VolumeTypes: _volume_type_get_full,
+ }
+
+ if model in GET_EXCEPTIONS:
+ return GET_EXCEPTIONS[model]
+
+ # General conversion
+ # Convert camel cased model name to snake format
+ s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', model.__name__)
+ # Get method must be snake formatted model name concatenated with _get
+ method_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s).lower() + '_get'
+ return globals().get(method_name)
+
+
+_GET_METHODS = {}
+
+
+@require_context
+def get_by_id(context, model, id, *args, **kwargs):
+ # Add get method to cache dictionary if it's not already there
+ if not _GET_METHODS.get(model):
+ _GET_METHODS[model] = _get_get_method(model)
+
+ return _GET_METHODS[model](context, id, *args, **kwargs)
backup.obj_reset_changes()
return backup
- @base.remotable_classmethod
- def get_by_id(cls, context, id, read_deleted=None, project_only=None):
- db_backup = db.backup_get(context, id, read_deleted=read_deleted,
- project_only=project_only)
- return cls._from_db_object(context, cls(context), db_backup)
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
+from cinder import db
from cinder import exception
+from cinder.i18n import _
from cinder import objects
# Return modified dict
return changes
+ @base.remotable_classmethod
+ def get_by_id(cls, context, id, *args, **kwargs):
+ # To get by id we need to have a model and for the model to
+ # have an id field
+ if 'id' not in cls.fields:
+ msg = (_('VersionedObject %s cannot retrieve object by id.') %
+ (cls.obj_name()))
+ raise NotImplementedError(msg)
+
+ model = db.get_model_for_versioned_object(cls)
+ orm_obj = db.get_by_id(context, model, id, *args, **kwargs)
+ kargs = {}
+ if hasattr(cls, 'DEFAULT_EXPECTED_ATTR'):
+ kargs = {'expected_attrs': getattr(cls, 'DEFAULT_EXPECTED_ATTR')}
+ return cls._from_db_object(context, cls(context), orm_obj, **kargs)
+
class CinderObjectDictCompat(base.VersionedObjectDictCompat):
"""Mix-in to provide dictionary key access compat.
cgsnapshot.obj_reset_changes()
return cgsnapshot
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_cgsnapshots = db.cgsnapshot_get(context, id)
- return cls._from_db_object(context, cls(context), db_cgsnapshots)
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
consistencygroup.obj_reset_changes()
return consistencygroup
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_consistencygroup = db.consistencygroup_get(context, id)
- return cls._from_db_object(context, cls(context),
- db_consistencygroup)
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
service.obj_reset_changes()
return service
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_service = db.service_get(context, id)
- return cls._from_db_object(context, cls(context), db_service)
-
@base.remotable_classmethod
def get_by_host_and_topic(cls, context, host, topic):
db_service = db.service_get_by_host_and_topic(context, host, topic)
# Version 1.0: Initial version
VERSION = '1.0'
+ DEFAULT_EXPECTED_ATTR = ('metadata',)
+
fields = {
'id': fields.UUIDField(),
snapshot.obj_reset_changes()
return snapshot
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_snapshot = db.snapshot_get(context, id)
- return cls._from_db_object(context, cls(context), db_snapshot,
- expected_attrs=['metadata'])
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
# volume_type
VERSION = '1.1'
+ DEFAULT_EXPECTED_ATTR = ('admin_metadata', 'metadata')
+
fields = {
'id': fields.UUIDField(),
'_name_id': fields.UUIDField(nullable=True),
volume.obj_reset_changes()
return volume
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_volume = db.volume_get(context, id)
- expected_attrs = ['admin_metadata', 'metadata']
- return cls._from_db_object(context, cls(context), db_volume,
- expected_attrs=expected_attrs)
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
attachment.obj_reset_changes()
return attachment
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_attach = db.volume_attachment_get(context, id)
- return cls._from_db_object(context, cls(context), db_attach)
-
@base.remotable
def save(self):
updates = self.cinder_obj_get_changes()
# Version 1.0: Initial version
VERSION = '1.0'
+ DEFAULT_EXPECTED_ATTR = ('extra_specs', 'projects')
+
fields = {
'id': fields.UUIDField(),
'name': fields.StringField(nullable=True),
type.obj_reset_changes()
return type
- @base.remotable_classmethod
- def get_by_id(cls, context, id):
- db_volume_type = volume_types.get_volume_type(
- context, id, expected_fields=['extra_specs', 'projects'])
- expected_attrs = ['extra_specs', 'projects']
- return cls._from_db_object(context, cls(context), db_volume_type,
- expected_attrs=expected_attrs)
-
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
self._disable_osprofiler()
+ # NOTE(geguileo): This is required because common get_by_id method in
+ # cinder.db.sqlalchemy.api caches get methods and if we use a mocked
+ # get method in one test it would carry on to the next test. So we
+ # clear out the cache.
+ sqla_api._GET_METHODS = {}
+
def _restore_obj_registry(self):
objects_base.CinderObjectRegistry._registry._obj_classes = \
self._base_test_obj_backup
from cinder.tests.unit.api.v2 import stubs
+def stub_snapshot_get(context, snapshot_id):
+ snapshot = stubs.stub_snapshot(snapshot_id)
+ if snapshot_id == 3:
+ snapshot['status'] = 'error'
+ elif snapshot_id == 1:
+ snapshot['status'] = 'creating'
+ elif snapshot_id == 7:
+ snapshot['status'] = 'available'
+ else:
+ snapshot['status'] = 'creating'
+
+ return snapshot
+
+
class SnapshotActionsTest(test.TestCase):
def setUp(self):
super(SnapshotActionsTest, self).setUp()
+ @mock.patch('cinder.db.snapshot_update', autospec=True)
+ @mock.patch('cinder.db.sqlalchemy.api._snapshot_get',
+ side_effect=stub_snapshot_get)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
- def test_update_snapshot_status(self, metadata_get):
- self.stubs.Set(db, 'snapshot_get', stub_snapshot_get)
- self.stubs.Set(db, 'snapshot_update', stub_snapshot_update)
-
+ def test_update_snapshot_status(self, metadata_get, *args):
body = {'os-update_snapshot_status': {'status': 'available'}}
req = webob.Request.blank('/v2/fake/snapshots/1/action')
req.method = "POST"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(202, res.status_int)
+ @mock.patch('cinder.db.sqlalchemy.api._snapshot_get',
+ side_effect=stub_snapshot_get)
@mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
- def test_update_snapshot_status_invalid_status(self, metadata_get):
- self.stubs.Set(db, 'snapshot_get', stub_snapshot_get)
+ def test_update_snapshot_status_invalid_status(self, metadata_get, *args):
body = {'os-update_snapshot_status': {'status': 'in-use'}}
req = webob.Request.blank('/v2/fake/snapshots/1/action')
req.method = "POST"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(400, res.status_int)
-
-
-def stub_snapshot_get(context, snapshot_id):
- snapshot = stubs.stub_snapshot(snapshot_id)
- if snapshot_id == 3:
- snapshot['status'] = 'error'
- elif snapshot_id == 1:
- snapshot['status'] = 'creating'
- elif snapshot_id == 7:
- snapshot['status'] = 'available'
- else:
- snapshot['status'] = 'creating'
-
- return snapshot
-
-
-def stub_snapshot_update(self, context, id, **kwargs):
- pass
self.controller.update_all, req, self.req_id,
expected)
- def test_update_all_malformed_data(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_update',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.sqlalchemy.api._snapshot_get')
+ @mock.patch('cinder.db.snapshot_metadata_update', autospec=True)
+ def test_update_all_malformed_data(self, metadata_update, snapshot_get):
+ snapshot_get.return_value = stub_get
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
self.controller.update, req, self.req_id, 'key1',
None)
- def test_update_item_empty_key(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_update',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.sqlalchemy.api._snapshot_get')
+ @mock.patch('cinder.db.snapshot_metadata_update', autospec=True)
+ def test_update_item_empty_key(self, metadata_update, snapshot_get):
+ snapshot_get.return_value = stub_get
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
def test_volume_create(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
vol = {"size": 100,
"display_name": "Volume Test Name",
def test_volume_create_with_image_id(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
test_id = "c905cedb-7281-47e4-8a62-f26bc5fc4c77"
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
- @mock.patch.object(db, 'volume_type_get',
+ @mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get)
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
return_value={"qos_max_iops": 2000,
"readonly": "False",
"attached_mode": "rw"})
- @mock.patch.object(db, 'volume_type_get',
+ @mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get)
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes')
res_dict = self.controller.index(req)
def test_volume_list_detail(self, *args):
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail')
res_dict = self.controller.index(req)
'readonly': 'False'})
@mock.patch.object(volume_api.API, 'get',
side_effect=stubs.stub_volume_api_get, autospec=True)
- @mock.patch.object(db, 'volume_type_get',
+ @mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
side_effect=stubs.stub_volume_type_get, autospec=True)
def test_volume_show(self, *args):
req = fakes.HTTPRequest.blank('/v1/volumes/1')
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
self.stubs.Set(db, 'volume_get_all_by_project',
stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2\
&offset=1',
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, 1)
def test_volume_show_with_unencrypted_volume(self):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, 1)
self.assertEqual(False, res_dict['volume']['encrypted'])
def test_volume_delete(self):
- self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ self.stubs.Set(db.sqlalchemy.api, 'volume_get',
+ stubs.stub_volume_get_db)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
resp = self.controller.delete(req, 1)
def test_admin_list_volumes_limited_to_project(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes',
use_admin_context=True)
self.assertEqual(1, len(res['volumes']))
def test_admin_list_volumes_all_tenants(self):
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1')
res = self.controller.index(req)
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes')
res = self.controller.index(req)
self.controller.update, req, self.req_id, 'key1',
None)
- def test_update_item_empty_key(self):
- self.stubs.Set(cinder.db, 'snapshot_metadata_update',
- return_create_snapshot_metadata)
+ @mock.patch('cinder.db.sqlalchemy.api._snapshot_get')
+ @mock.patch('cinder.db.snapshot_metadata_update', autospec=True)
+ def test_update_item_empty_key(self, metadata_update, snapshot_get):
+ snapshot_get.return_value = stub_get
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
def test_volume_create(self, mock_validate):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
vol = self._vol_in_request_body()
body = {"volume": vol}
self.stubs.Set(volume_api.API, 'get_all',
lambda *args, **kwargs:
objects.VolumeList(objects=[vol_obj]))
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ # NOTE(geguileo): This is required because common get_by_id method in
+ # cinder.db.sqlalchemy.api caches the real get method.
+ db.sqlalchemy.api._GET_METHODS = {}
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail')
res_dict = self.controller.detail(req)
self.assertTrue(mock_validate.called)
'multiattach': False,
}
- @mock.patch.object(db, 'volume_type_get', autospec=True)
+ @mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
+ autospec=True)
@mock.patch.object(volume_api.API, 'get_snapshot', autospec=True)
@mock.patch.object(volume_api.API, 'create', autospec=True)
def test_volume_creation_from_snapshot(self, create, get_snapshot,
get_snapshot.assert_called_once_with(self.controller.volume_api,
context, snapshot_id)
- @mock.patch.object(db, 'volume_type_get', autospec=True)
+ @mock.patch.object(db.sqlalchemy.api, '_volume_type_get_full',
+ autospec=True)
@mock.patch.object(volume_api.API, 'get_volume', autospec=True)
@mock.patch.object(volume_api.API, 'create', autospec=True)
def test_volume_creation_from_source_volume(self, create, get_volume,
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_ref(self, mock_validate):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = self._vol_in_request_body(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_id(self, mock_validate):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = self._vol_in_request_body(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_name(self, mock_validate):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
self.stubs.Set(fake_image._FakeImageService,
"detail",
stubs.stub_image_service_detail)
def test_volume_update(self, mock_validate):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
updates = {
"name": "Updated Test Name",
def test_volume_update_deprecation(self, mock_validate):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
updates = {
"display_name": "Updated Test Name",
"""Test current update keys have priority over deprecated keys."""
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
updates = {
"name": "New Name",
def test_volume_update_metadata(self, mock_validate):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
updates = {
"metadata": {"qos_max_iops": 2000}
def test_volume_list_summary(self):
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes')
res_dict = self.controller.index(req)
def test_volume_list_detail(self):
self.stubs.Set(volume_api.API, 'get_all',
stubs.stub_volume_api_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail')
res_dict = self.controller.detail(req)
]
self.stubs.Set(db, 'volume_get_all_by_project',
stub_volume_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1')
res_dict = self.controller.detail(req)
def test_volume_detail_limit(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=1')
res_dict = self.controller.detail(req)
def test_volume_detail_limit_marker(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1&limit=1')
res_dict = self.controller.detail(req)
def test_volume_show(self):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, '1')
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
self.stubs.Set(db, 'volume_admin_metadata_get',
stub_volume_admin_metadata_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, '1')
return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, 1)
def test_volume_show_with_unencrypted_volume(self):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
- self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
+ self.stubs.Set(db.sqlalchemy.api, '_volume_type_get_full',
+ stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, 1)
import mock
+from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import objects
from cinder.tests.unit import fake_volume
class TestBackup(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.backup_get', return_value=fake_backup)
+ @mock.patch('cinder.db.get_by_id', return_value=fake_backup)
def test_get_by_id(self, backup_get):
backup = objects.Backup.get_by_id(self.context, 1)
self._compare(self, fake_backup, backup)
+ backup_get.assert_called_once_with(self.context, models.Backup, 1)
+
+ @mock.patch('cinder.db.sqlalchemy.api.model_query')
+ def test_get_by_id_no_existing_id(self, model_query):
+ query = mock.Mock()
+ filter_by = mock.Mock()
+ filter_by.first.return_value = None
+ query.filter_by.return_value = filter_by
+ model_query.return_value = query
+ self.assertRaises(exception.BackupNotFound, objects.Backup.get_by_id,
+ self.context, 123)
@mock.patch('cinder.db.backup_create', return_value=fake_backup)
def test_create(self, backup_create):
class TestCGSnapshot(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.cgsnapshot_get',
+ @mock.patch('cinder.db.sqlalchemy.api.cgsnapshot_get',
return_value=fake_cgsnapshot)
def test_get_by_id(self, cgsnapshot_get):
cgsnapshot = objects.CGSnapshot.get_by_id(self.context, 1)
class TestConsistencyGroup(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.consistencygroup_get',
+ @mock.patch('cinder.db.sqlalchemy.api.consistencygroup_get',
return_value=fake_consistencygroup)
def test_get_by_id(self, consistencygroup_get):
consistencygroup = objects.ConsistencyGroup.get_by_id(self.context, 1)
self._compare(self, fake_consistencygroup, consistencygroup)
+ consistencygroup_get.assert_called_once_with(self.context, 1)
@mock.patch('cinder.db.sqlalchemy.api.model_query')
def test_get_by_id_no_existing_id(self, model_query):
- query = mock.Mock()
- filter_by = mock.Mock()
- filter_by.first.return_value = None
- query.filter_by.return_value = filter_by
- model_query.return_value = query
+ model_query().filter_by().first.return_value = None
self.assertRaises(exception.ConsistencyGroupNotFound,
objects.ConsistencyGroup.get_by_id, self.context,
123)
# NOTE: The hashes in this list should only be changed if they come with a
# corresponding version bump in the affected objects.
object_data = {
- 'Backup': '1.1-f2e7befd20d3bb388700f17c4f386b28',
- 'BackupImport': '1.1-f2e7befd20d3bb388700f17c4f386b28',
- 'BackupList': '1.0-db44728c8d21bb23bba601a5499550f8',
- 'CGSnapshot': '1.0-d50e9480cee2abcb2222997f2bb85656',
- 'CGSnapshotList': '1.0-3361be608f396c5ae045b6d94f901346',
- 'ConsistencyGroup': '1.0-98714c3d8f83914fd7a17317c3c29e01',
- 'ConsistencyGroupList': '1.0-a906318d3e69d847f31df561d12540b3',
- 'Service': '1.0-b81a04373ce0ad2d07de525eb534afd6',
- 'ServiceList': '1.0-1911175eadd43fb6eafbefd18c802f2c',
- 'Snapshot': '1.0-54a2726a282cbdb47ddd326107e821ce',
- 'SnapshotList': '1.0-46abf2a1e65ef55dad4f36fe787f9a78',
- 'Volume': '1.1-adc26d52b646723bd0633b0771ad2598',
- 'VolumeAttachment': '1.0-4fd93dbfa57d048a4859f5bb1ca66bed',
- 'VolumeAttachmentList': '1.0-829c18b1d929ea1f8a451b3c4e0a0289',
- 'VolumeList': '1.1-d41f3a850be5fbaa94eb4cc955c7ca60',
- 'VolumeType': '1.0-8cb7daad27570133543c2c359d85c658',
- 'VolumeTypeList': '1.0-980f0b518aed9df0beb55cc533eff632'
+ 'Backup': '1.1-cd077ec037f5ad1f5409fd660bd59f53',
+ 'BackupImport': '1.1-cd077ec037f5ad1f5409fd660bd59f53',
+ 'BackupList': '1.0-24591dabe26d920ce0756fe64cd5f3aa',
+ 'CGSnapshot': '1.0-190da2a2aa9457edc771d888f7d225c4',
+ 'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
+ 'ConsistencyGroup': '1.0-b9bad093daee0b259edddb3993c60c31',
+ 'ConsistencyGroupList': '1.0-09d0aad5491e762ecfdf66bef02ceb8d',
+ 'Service': '1.0-64baeb4911dbab1153064dd1c87edb9f',
+ 'ServiceList': '1.0-d242d3384b68e5a5a534e090ff1d5161',
+ 'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a',
+ 'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1',
+ 'Volume': '1.1-6037de222b09c330b33b419a0c1b10c6',
+ 'VolumeAttachment': '1.0-f14a7c03ffc5b93701d496251a5263aa',
+ 'VolumeAttachmentList': '1.0-307d2b6c8dd55ef854f6386898e9e98e',
+ 'VolumeList': '1.1-03ba6cb8c546683e64e15c50042cb1a3',
+ 'VolumeType': '1.0-bf8abbbea2e852ed2e9bac5a9f5f70f2',
+ 'VolumeTypeList': '1.0-09b01f4526266c1a58cb206ba509d6d2',
}
class TestService(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.service_get')
+ @mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_get_by_id(self, service_get):
db_service = fake_service.fake_db_service()
service_get.return_value = db_service
from oslo_log import log as logging
+from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import objects
from cinder.tests.unit import fake_snapshot
class TestSnapshot(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.snapshot_get', return_value=fake_db_snapshot)
+ @mock.patch('cinder.db.get_by_id', return_value=fake_db_snapshot)
def test_get_by_id(self, snapshot_get):
snapshot = objects.Snapshot.get_by_id(self.context, 1)
self._compare(self, fake_snapshot_obj, snapshot)
+ snapshot_get.assert_called_once_with(self.context, models.Snapshot, 1)
+
+ @mock.patch('cinder.db.sqlalchemy.api.model_query')
+ def test_get_by_id_no_existing_id(self, model_query):
+ query = model_query().options().options().filter_by().first
+ query.return_value = None
+ self.assertRaises(exception.SnapshotNotFound,
+ objects.Snapshot.get_by_id, self.context, 123)
def test_reset_changes(self):
snapshot = objects.Snapshot()
import mock
from cinder import context
+from cinder import exception
from cinder import objects
from cinder.tests.unit import fake_volume
from cinder.tests.unit import objects as test_objects
class TestVolume(test_objects.BaseObjectsTestCase):
@mock.patch('cinder.db.volume_glance_metadata_get', return_value={})
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_get_by_id(self, volume_get, volume_glance_metadata_get):
db_volume = fake_volume.fake_db_volume()
volume_get.return_value = db_volume
volume = objects.Volume.get_by_id(self.context, 1)
self._compare(self, db_volume, volume)
+ volume_get.assert_called_once_with(self.context, 1)
+
+ @mock.patch('cinder.db.sqlalchemy.api.model_query')
+ def test_get_by_id_no_existing_id(self, model_query):
+ pf = model_query().options().options().options().options().options()
+ pf.filter_by().first.return_value = None
+ self.assertRaises(exception.VolumeNotFound,
+ objects.Volume.get_by_id, self.context, 123)
@mock.patch('cinder.db.volume_create')
def test_create(self, volume_create):
class TestVolumeAttachment(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.volume_attachment_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_attachment_get')
def test_get_by_id(self, volume_attachment_get):
db_attachment = fake_volume.fake_db_volume_attachment()
volume_attachment_get.return_value = db_attachment
class TestVolumeType(test_objects.BaseObjectsTestCase):
- @mock.patch('cinder.db.volume_type_get')
+ @mock.patch('cinder.db.sqlalchemy.api._volume_type_get_full')
def test_get_by_id(self, volume_type_get):
db_volume_type = fake_volume.fake_db_volume_type()
volume_type_get.return_value = db_volume_type
serializer=object_serializer())
self.assertEqual(mock_rpc_client, rpc_client)
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
@mock.patch('cinder.context.get_admin_context')
@mock.patch('cinder.rpc.get_client')
@mock.patch('cinder.rpc.init')
volume=volume_obj)
@mock.patch('cinder.db.volume_destroy')
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
@mock.patch('cinder.context.get_admin_context')
@mock.patch('cinder.rpc.init')
def test_volume_commands_delete_no_host(self, rpc_init, get_admin_context,
self.assertEqual(expected_out, fake_out.getvalue())
@mock.patch('cinder.db.volume_destroy')
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
@mock.patch('cinder.context.get_admin_context')
@mock.patch('cinder.rpc.init')
def test_volume_commands_delete_volume_in_use(self, rpc_init,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
- with mock.patch.object(objects.service, 'db') as mock_db:
+ with mock.patch.object(objects.service, 'db') as mock_db,\
+ mock.patch('cinder.db.sqlalchemy.api.get_by_id') as get_by_id:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
- mock_db.service_get.return_value = service_ref
+ get_by_id.return_value = service_ref
serv = service.Service(
self.host,
self.assertRaises(exception.NotFound, db.volume_get,
self.context, volume['id'])
- @mock.patch.object(db, 'volume_get', side_effect=exception.VolumeNotFound(
- volume_id='12345678-1234-5678-1234-567812345678'))
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get',
+ side_effect=exception.VolumeNotFound(
+ volume_id='12345678-1234-5678-1234-567812345678'))
def test_delete_volume_not_found(self, mock_get_volume):
"""Test delete volume moves on if the volume does not exist."""
volume_id = '12345678-1234-5678-1234-567812345678'
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_fetchqos(self,
_mock_volume_update,
self.assertIsNone(conn_info['data']['qos_specs'])
@mock.patch.object(fake_driver.FakeISCSIDriver, 'create_export')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_export_failure(self,
_mock_volume_update,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
@mock.patch.object(db, 'volume_update')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(fake_driver.FakeISCSIDriver, 'initialize_connection')
@mock.patch.object(db, 'driver_initiator_data_get')
@mock.patch.object(db, 'driver_initiator_data_update')
self.assertEqual("detached", vol['attach_status'])
@mock.patch.object(cinder.volume.api.API, 'update')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_reserve_volume_success(self, volume_get, volume_update):
fake_volume = {
'id': self.FAKE_UUID,
'status': status
}
- with mock.patch.object(db, 'volume_get') as mock_volume_get:
- mock_volume_get.return_value = fake_volume
+ with mock.patch.object(db.sqlalchemy.api, 'volume_get') as mock_get:
+ mock_get.return_value = fake_volume
self.assertRaises(exception.InvalidVolume,
cinder.volume.api.API().reserve_volume,
self.context,
fake_volume)
- self.assertTrue(mock_volume_get.called)
+ self.assertTrue(mock_get.called)
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_attachment_get_used_by_volume_id')
@mock.patch.object(cinder.volume.api.API, 'update')
def test_unreserve_volume_success(self, volume_get,
'name',
'description')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_begin_detaching_fails_available(self, volume_get):
volume_api = cinder.volume.api.API()
volume = tests_utils.create_volume(self.context, **self.volume_params)
@mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
'migrate_volume_completion')
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_migrate_volume_generic(self, volume_get,
migrate_volume_completion,
nova_api):
@mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
'migrate_volume_completion')
- @mock.patch('cinder.db.volume_get')
+ @mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_migrate_volume_generic_attached_volume(self, volume_get,
migrate_volume_completion,
nova_api):
with mock.patch.object(self.volume.driver, 'retype') as _retype,\
mock.patch.object(volume_types, 'volume_types_diff') as _diff,\
mock.patch.object(self.volume, 'migrate_volume') as _mig,\
- mock.patch.object(db, 'volume_get') as get_volume:
- get_volume.return_value = volume
+ mock.patch.object(db.sqlalchemy.api, 'volume_get') as mock_get:
+ mock_get.return_value = volume
_retype.return_value = driver
_diff.return_value = ({}, diff_equal)
if migrate_exc:
@mock.patch('six.moves.builtins.open')
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_backup_volume_available(self, mock_volume_get,
mock_get_connector_properties,
mock_file_open,
@mock.patch('six.moves.builtins.open')
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_backup_volume_inuse_temp_volume(self, mock_volume_get,
mock_get_connector_properties,
mock_file_open,
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties',
return_value={})
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_backup_volume_inuse_temp_snapshot(self, mock_volume_get,
mock_get_connector_properties,
mock_check_device,
self.stubs.Set(self.volume.driver.vg, 'get_volume',
self._get_manage_existing_lvs)
- @mock.patch.object(db, 'volume_get', side_effect=exception.VolumeNotFound(
- volume_id='d8cd1feb-2dcc-404d-9b15-b86fe3bec0a1'))
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get',
+ side_effect=exception.VolumeNotFound(
+ volume_id='d8cd1feb-2dcc-404d-9b15-b86fe3bec0a1'))
def test_lvm_manage_existing_not_found(self, mock_vol_get):
self._setup_stubs_for_manage_existing()
model_update = self.volume.driver.manage_existing(vol, ref)
self.assertIsNone(model_update)
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_lvm_manage_existing_already_managed(self, mock_conf):
self._setup_stubs_for_manage_existing()
@mock.patch('six.moves.builtins.open')
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_backup_volume(self, mock_volume_get,
mock_get_connector_properties,
mock_file_open,
@mock.patch('six.moves.builtins.open')
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
- @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db.sqlalchemy.api, 'volume_get')
def test_backup_volume_inuse(self, mock_volume_get,
mock_get_connector_properties,
mock_file_open,
self.snapshot = fake_snapshot.fake_snapshot_obj(
ctx, **{'volume': self.fake_volume})
- self.mock_object(db, 'volume_get', self.return_fake_volume)
+ self.mock_object(db.sqlalchemy.api, 'volume_get',
+ self.return_fake_volume)
snap_vol_id = self.snapshot.volume_id
self.volume_name_2x_enc = urllib.parse.quote(