msg = _("Service not found.")
raise exc.HTTPNotFound(explanation=msg)
- new_volume = dict(new_volume)
utils.add_visible_admin_metadata(new_volume)
return self._view_builder.detail(req, new_volume)
def _translate_attachment_summary_view(_context, vol):
"""Maps keys for attachment summary view."""
d = []
- attachments = vol.get('volume_attachment', [])
+ attachments = vol.volume_attachment
for attachment in attachments:
if attachment.get('attach_status') == 'attached':
a = {'id': attachment.get('volume_id'),
LOG.info(_LI("vol=%s"), vol, context=context)
- if vol.get('volume_metadata'):
- metadata = vol.get('volume_metadata')
- d['metadata'] = {item['key']: item['value'] for item in metadata}
- # avoid circular ref when vol is a Volume instance
- elif vol.get('metadata') and isinstance(vol.get('metadata'), dict):
- d['metadata'] = vol['metadata']
+ if vol.metadata:
+ d['metadata'] = vol.metadata
else:
d['metadata'] = {}
filters=search_opts,
viewable_admin_meta=True)
- volumes = [dict(vol) for vol in volumes]
-
for volume in volumes:
utils.add_visible_admin_metadata(volume)
- limited_list = common.limited(volumes, req)
+ limited_list = common.limited(volumes.objects, req)
req.cache_db_volumes(limited_list)
res = [entity_maker(context, vol) for vol in limited_list]
# under the License.
from oslo_log import log as logging
+import six
from cinder.api import common
'metadata': self._get_volume_metadata(volume),
'links': self._get_links(request, volume['id']),
'user_id': volume.get('user_id'),
- 'bootable': str(volume.get('bootable')).lower(),
+ 'bootable': six.text_type(volume.get('bootable')).lower(),
'encrypted': self._is_volume_encrypted(volume),
'replication_status': volume.get('replication_status'),
'consistencygroup_id': volume.get('consistencygroup_id'),
attachments = []
if volume['attach_status'] == 'attached':
- attaches = volume.get('volume_attachment', [])
+ attaches = volume.volume_attachment
for attachment in attaches:
if attachment.get('attach_status') == 'attached':
a = {'id': attachment.get('volume_id'),
def _get_volume_metadata(self, volume):
"""Retrieve the metadata of the volume object."""
- if volume.get('volume_metadata'):
- metadata = volume.get('volume_metadata')
- return {item['key']: item['value'] for item in metadata}
- # avoid circular ref when vol is a Volume instance
- elif volume.get('metadata') and isinstance(volume.get('metadata'),
- dict):
- return volume['metadata']
- return {}
+ return volume.metadata
def _get_volume_type(self, volume):
"""Retrieve the type the volume object."""
viewable_admin_meta=True,
offset=offset)
- volumes = [dict(vol) for vol in volumes]
-
for volume in volumes:
utils.add_visible_admin_metadata(volume)
- req.cache_db_volumes(volumes)
+ req.cache_db_volumes(volumes.objects)
if is_detail:
volumes = self._view_builder.detail_list(req, volumes)
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import timeutils
-from oslo_utils import uuidutils
from cinder import i18n
i18n.enable_lazy()
return _decorator
-def param2id(object_id):
- """Helper function to convert various id types to internal id.
-
- :param object_id: e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
- """
- if uuidutils.is_uuid_like(object_id):
- return object_id
- elif '-' in object_id:
- # FIXME(ja): mapping occurs in nova?
- pass
- else:
- try:
- return int(object_id)
- except ValueError:
- return object_id
-
-
class ShellCommands(object):
def bpython(self):
"""Runs a bpython shell.
def delete(self, volume_id):
"""Delete a volume, bypassing the check that it must be available."""
ctxt = context.get_admin_context()
- volume = db.volume_get(ctxt, param2id(volume_id))
- host = vutils.extract_host(volume['host']) if volume['host'] else None
+ volume = objects.Volume.get_by_id(ctxt, volume_id)
+ host = vutils.extract_host(volume.host) if volume.host else None
if not host:
print(_("Volume not yet assigned to host."))
print(_("Deleting volume from database and skipping rpc."))
- db.volume_destroy(ctxt, param2id(volume_id))
+ volume.destroy()
return
- if volume['status'] == 'in-use':
+ if volume.status == 'in-use':
print(_("Volume is in-use."))
print(_("Detach volume from instance and then try again."))
return
cctxt = self._rpc_client().prepare(server=host)
- cctxt.cast(ctxt, "delete_volume", volume_id=volume['id'])
+ cctxt.cast(ctxt, "delete_volume", volume_id=volume.id, volume=volume)
@args('--currenthost', required=True, help='Existing volume host name')
@args('--newhost', required=True, help='New volume host name')
resp = req.get_response(app())
return resp
+ def _create_volume(self, context, updates=None):
+ db_volume = {'status': 'available',
+ 'host': 'test',
+ 'availability_zone': 'fake_zone',
+ 'attach_status': 'detached'}
+ if updates:
+ db_volume.update(updates)
+
+ volume = objects.Volume(context=context, **db_volume)
+ volume.create()
+ return volume
+
def test_valid_updates(self):
vac = admin_actions.VolumeAdminController()
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
- volume = db.volume_create(ctx, {'size': 1})
+ volume = self._create_volume(ctx, {'size': 1, 'host': None})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
# request is accepted
self.assertEqual(202, resp.status_int)
# volume is deleted
- self.assertRaises(exception.NotFound, db.volume_get, ctx, volume['id'])
+ self.assertRaises(exception.NotFound, objects.Volume.get_by_id, ctx,
+ volume.id)
@mock.patch.object(volume_api.API, 'delete_snapshot', return_value=True)
@mock.patch('cinder.objects.Snapshot.get_by_id')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
connector = {}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
"""Test that attaching volume reserved for another instance fails."""
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
self.volume_api.reserve_volume(ctx, volume)
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
'topic': CONF.volume_topic,
'created_at': timeutils.utcnow()})
# current status is available
- volume = db.volume_create(admin_ctx,
- {'status': 'available',
- 'host': 'test',
- 'provider_location': '',
- 'attach_status': ''})
+ volume = self._create_volume(admin_ctx)
return volume
def _migrate_volume_exec(self, ctx, volume, host, expected_status,
ctx = context.RequestContext('admin', 'fake', True)
volume = self._migrate_volume_prep()
# current status is available
- volume = db.volume_create(ctx,
- {'status': 'available',
- 'host': 'test',
- 'provider_location': '',
- 'attach_status': '',
- 'replication_status': 'active'})
+ volume = self._create_volume(ctx, {'provider_location': '',
+ 'attach_status': '',
+ 'replication_status': 'active'})
volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
def test_migrate_volume_as_non_admin(self):
def test_migrate_volume_comp_no_mig_status(self):
admin_ctx = context.get_admin_context()
- volume1 = db.volume_create(admin_ctx, {'id': 'fake1',
- 'migration_status': 'foo'})
- volume2 = db.volume_create(admin_ctx, {'id': 'fake2',
- 'migration_status': None})
+ volume1 = self._create_volume(admin_ctx, {'migration_status': 'foo'})
+ volume2 = self._create_volume(admin_ctx, {'migration_status': None})
+
expected_status = 400
expected_id = None
ctx = context.RequestContext('admin', 'fake', True)
def test_migrate_volume_comp_bad_mig_status(self):
admin_ctx = context.get_admin_context()
- volume1 = db.volume_create(admin_ctx,
- {'id': 'fake1',
- 'migration_status': 'migrating'})
- volume2 = db.volume_create(admin_ctx,
- {'id': 'fake2',
- 'migration_status': 'target:foo'})
+ volume1 = self._create_volume(admin_ctx,
+ {'migration_status': 'migrating'})
+ volume2 = self._create_volume(admin_ctx,
+ {'migration_status': 'target:foo'})
expected_status = 400
expected_id = None
ctx = context.RequestContext('admin', 'fake', True)
def test_migrate_volume_comp_from_nova(self):
admin_ctx = context.get_admin_context()
- volume = db.volume_create(admin_ctx,
- {'id': 'fake1',
- 'status': 'in-use',
- 'host': 'test',
- 'migration_status': None,
- 'attach_status': 'attached'})
- new_volume = db.volume_create(admin_ctx,
- {'id': 'fake2',
- 'status': 'available',
- 'host': 'test',
- 'migration_status': None,
- 'attach_status': 'detached'})
+ volume = self._create_volume(admin_ctx, {'status': 'in-use',
+ 'migration_status': None,
+ 'attach_status': 'attached'})
+ new_volume = self._create_volume(admin_ctx,
+ {'migration_status': None,
+ 'attach_status': 'detached'})
expected_status = 200
- expected_id = 'fake2'
+ expected_id = new_volume.id
ctx = context.RequestContext('admin', 'fake', True)
self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
expected_status, expected_id)
# under the License.
import datetime
+import iso8601
import json
import uuid
import webob
from cinder.api.contrib import volume_actions
+from cinder import context
from cinder import exception
from cinder.image import glance
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import stubs
+from cinder.tests.unit import fake_volume
from cinder import volume
from cinder.volume import api as volume_api
from cinder.volume import rpcapi as volume_rpcapi
def setUp(self):
super(VolumeActionsTest, self).setUp()
+ self.context = context.RequestContext('fake', 'fake', is_admin=False)
self.UUID = uuid.uuid4()
self.controller = volume_actions.VolumeActionsController()
self.api_patchers = {}
self.addCleanup(self.api_patchers[_meth].stop)
self.api_patchers[_meth].return_value = True
- vol = {'id': 'fake', 'host': 'fake', 'status': 'available', 'size': 1,
- 'migration_status': None, 'volume_type_id': 'fake',
- 'project_id': 'project_id'}
+ db_vol = {'id': 'fake', 'host': 'fake', 'status': 'available',
+ 'size': 1, 'migration_status': None,
+ 'volume_type_id': 'fake', 'project_id': 'project_id'}
+ vol = fake_volume.fake_volume_obj(self.context, **db_vol)
self.get_patcher = mock.patch('cinder.volume.API.get')
self.mock_volume_get = self.get_patcher.start()
self.addCleanup(self.get_patcher.stop)
expected_res = {
'os-volume_upload_image': {
'id': id,
- 'updated_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'updated_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'status': 'uploading',
'display_description': 'displaydesc',
'size': 1,
expected_res = {
'os-volume_upload_image': {
'id': id,
- 'updated_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'updated_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'status': 'uploading',
'display_description': 'displaydesc',
'size': 1,
expected_res = {
'os-volume_upload_image': {
'id': id,
- 'updated_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'updated_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'status': 'uploading',
'display_description': 'displaydesc',
'size': 1,
expected_res = {
'os-volume_upload_image': {
'id': id,
- 'updated_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'updated_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'status': 'uploading',
'display_description': 'displaydesc',
'size': 1,
from cinder import context
from cinder import db
+from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_volume
from cinder import volume
-def fake_volume_get(*args, **kwargs):
+def fake_db_volume_get(*args, **kwargs):
return {
'id': 'fake',
'host': 'host001',
'project_id': 'fake',
'migration_status': None,
'_name_id': 'fake2',
+ 'attach_status': 'detached',
}
+def fake_volume_api_get(*args, **kwargs):
+ ctx = context.RequestContext('admin', 'fake', True)
+ db_volume = fake_db_volume_get()
+ return fake_volume.fake_volume_obj(ctx, **db_volume)
+
+
def fake_volume_get_all(*args, **kwargs):
- return [fake_volume_get()]
+ return objects.VolumeList(objects=[fake_volume_api_get()])
def app():
def setUp(self):
super(VolumeHostAttributeTest, self).setUp()
- self.stubs.Set(volume.API, 'get', fake_volume_get)
+ self.stubs.Set(volume.API, 'get', fake_volume_api_get)
self.stubs.Set(volume.API, 'get_all', fake_volume_get_all)
- self.stubs.Set(db, 'volume_get', fake_volume_get)
+ self.stubs.Set(db, 'volume_get', fake_db_volume_get)
self.UUID = uuid.uuid4()
from cinder import context
from cinder import db
from cinder import exception
+from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_volume
from cinder import volume
-def fake_volume_get(*args, **kwargs):
+def fake_db_volume_get(*args, **kwargs):
return {
'id': 'fake',
'host': 'host001',
'volume_type_id': None,
'snapshot_id': None,
'project_id': 'fake',
+ 'migration_status': None,
+ '_name_id': 'fake2',
+ 'attach_status': 'detached',
}
+def fake_volume_api_get(*args, **kwargs):
+ ctx = context.RequestContext('admin', 'fake', True)
+ db_volume = fake_db_volume_get()
+ return fake_volume.fake_volume_obj(ctx, **db_volume)
+
+
def fake_volume_get_all(*args, **kwargs):
- return [fake_volume_get()]
+ return objects.VolumeList(objects=[fake_volume_api_get()])
fake_image_metadata = {
def setUp(self):
super(VolumeImageMetadataTest, self).setUp()
- self.stubs.Set(volume.API, 'get', fake_volume_get)
+ self.stubs.Set(volume.API, 'get', fake_volume_api_get)
self.stubs.Set(volume.API, 'get_all', fake_volume_get_all)
self.stubs.Set(volume.API, 'get_volume_image_metadata',
fake_get_volume_image_metadata)
self.stubs.Set(volume.API, 'get_volumes_image_metadata',
fake_get_volumes_image_metadata)
- self.stubs.Set(db, 'volume_get', fake_volume_get)
self.UUID = uuid.uuid4()
self.controller = (volume_image_metadata.
VolumeImageMetadataController())
from cinder import exception
from cinder import test
from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_volume
def app():
Note that we don't try to replicate any passed-in information (e.g. name,
volume type) in the returned structure.
"""
+ ctx = context.RequestContext('admin', 'fake', True)
vol = {
'status': 'creating',
'display_name': 'fake_name',
'availability_zone': 'nova',
'tenant_id': 'fake',
- 'created_at': 'DONTCARE',
'id': 'ffffffff-0000-ffff-0000-ffffffffffff',
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
- 'launched_at': 'DONTCARE',
'size': 0,
'attach_status': 'detached',
'volume_type_id': None}
- return vol
+ return fake_volume.fake_volume_obj(ctx, **vol)
@mock.patch('cinder.db.service_get_by_host_and_topic',
import webob
from cinder import context
+from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_volume
from cinder import volume
-def fake_volume_get(*args, **kwargs):
+def fake_db_volume_get(*args, **kwargs):
return {
'id': 'fake',
'host': 'host001',
'size': 5,
'availability_zone': 'somewhere',
'created_at': timeutils.utcnow(),
- 'attach_status': None,
+ 'attach_status': 'detached',
'display_name': 'anothervolume',
'display_description': 'Just another volume!',
'volume_type_id': None,
}
+def fake_volume_api_get(*args, **kwargs):
+ ctx = context.RequestContext('admin', 'fake', True)
+ db_volume = fake_db_volume_get()
+ return fake_volume.fake_volume_obj(ctx, **db_volume)
+
+
def fake_volume_get_all(*args, **kwargs):
- return [fake_volume_get()]
+ return objects.VolumeList(objects=[fake_volume_api_get()])
def app():
def setUp(self):
super(VolumeMigStatusAttributeTest, self).setUp()
- self.stubs.Set(volume.API, 'get', fake_volume_get)
+ self.stubs.Set(volume.API, 'get', fake_volume_api_get)
self.stubs.Set(volume.API, 'get_all', fake_volume_get_all)
self.UUID = uuid.uuid4()
import uuid
from lxml import etree
-from oslo_utils import timeutils
import webob
from cinder import context
+from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_volume
from cinder import volume
def fake_volume_get(*args, **kwargs):
- return {
+ ctx = context.RequestContext('non-admin', 'fake', False)
+ vol = {
'id': 'fake',
- 'host': 'host001',
- 'status': 'available',
- 'size': 5,
- 'availability_zone': 'somewhere',
- 'created_at': timeutils.utcnow(),
- 'attach_status': None,
- 'display_name': 'anothervolume',
- 'display_description': 'Just another volume!',
- 'volume_type_id': None,
- 'snapshot_id': None,
'project_id': PROJECT_ID,
- 'migration_status': None,
- '_name_id': 'fake2',
}
+ return fake_volume.fake_volume_obj(ctx, **vol)
def fake_volume_get_all(*args, **kwargs):
- return [fake_volume_get()]
+ return objects.VolumeList(objects=[fake_volume_get()])
def app():
vol['display_name'] = display_name
vol['display_description'] = display_description
vol['attach_status'] = status
+ vol['availability_zone'] = 'fake_zone'
return db.volume_create(context.get_admin_context(), vol)['id']
def test_show_transfer(self):
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_snapshot
+from cinder.tests.unit import fake_volume
# This list of fake volumes is used by our tests. Each is configured in a
if not vol:
raise exception.VolumeNotFound(volume_id)
- return vol
+ return fake_volume.fake_volume_obj(context, **vol)
def db_snapshot_get_all_for_volume(context, volume_id):
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
+from cinder import volume
CONF = cfg.CONF
'metadata': {}}
-def return_volume(context, volume_id):
- return {'id': 'fake-vol-id',
- 'size': 100,
- 'name': 'fake',
- 'host': 'fake-host',
- 'status': 'available',
- 'encryption_key_id': None,
- 'volume_type_id': None,
- 'migration_status': None,
- 'metadata': {},
- 'project_id': context.project_id}
+def stub_get(context, volume_id, *args, **kwargs):
+ vol = {'id': volume_id,
+ 'size': 100,
+ 'name': 'fake',
+ 'host': 'fake-host',
+ 'status': 'available',
+ 'encryption_key_id': None,
+ 'volume_type_id': None,
+ 'migration_status': None,
+ 'availability_zone': 'zone1:host1',
+ 'attach_status': 'detached'}
+ return fake_volume.fake_volume_obj(context, **vol)
def return_snapshot_nonexistent(context, snapshot_id):
def setUp(self):
super(SnapshotMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
- self.stubs.Set(cinder.db, 'volume_get', return_volume)
+ self.stubs.Set(volume.API, 'get', stub_get)
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v1 import stubs
+from cinder.tests.unit import fake_volume
+from cinder import volume
CONF = cfg.CONF
def return_volume_metadata(context, volume_id):
- if not isinstance(volume_id, str) or not len(volume_id) == 36:
- msg = 'id %s must be a uuid in return volume metadata' % volume_id
- raise Exception(msg)
return stub_volume_metadata()
return metadata
-def return_volume(context, volume_id):
- return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
- 'name': 'fake',
- 'metadata': {},
- 'project_id': context.project_id}
+def get_volume(*args, **kwargs):
+ vol = {'id': args[1],
+ 'size': 100,
+ 'name': 'fake',
+ 'host': 'fake-host',
+ 'status': 'available',
+ 'encryption_key_id': None,
+ 'volume_type_id': None,
+ 'migration_status': None,
+ 'availability_zone': 'zone1:host1',
+ 'attach_status': 'detached'}
+ return fake_volume.fake_volume_obj(args[0], **vol)
def return_volume_nonexistent(*args, **kwargs):
def setUp(self):
super(volumeMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
- self.stubs.Set(cinder.db, 'volume_get', return_volume)
+ self.stubs.Set(volume.API, 'get', get_volume)
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(cinder.db, 'service_get_all_by_topic',
req, self.req_id, body)
def test_create_nonexistent_volume(self):
- self.stubs.Set(cinder.db, 'volume_get',
- return_volume_nonexistent)
+ self.stubs.Set(volume.API, 'get', return_volume_nonexistent)
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(cinder.db, 'volume_metadata_update',
# under the License.
import datetime
+import iso8601
from lxml import etree
import mock
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import stubs
from cinder.tests.unit import fake_notifier
+from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.volume import api as volume_api
self.stubs.Set(volume_api.API, 'delete', stubs.stub_volume_delete)
def test_volume_create(self):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
vol = {"size": 100,
"display_name": "Volume Test Name",
'source_volid': None,
'metadata': {},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 100,
'encrypted': False}}
self.assertEqual(expected, res_dict)
req, body)
def test_volume_create_with_image_id(self):
- self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
test_id = "c905cedb-7281-47e4-8a62-f26bc5fc4c77"
vol = {"size": '1',
'source_volid': None,
'metadata': {},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
- 'size': '1'}}
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
+ 'size': 1}}
body = {"volume": vol}
req = fakes.HTTPRequest.blank('/v1/volumes')
res_dict = self.controller.create(req, body)
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
- @mock.patch.object(db, 'volume_get', side_effect=stubs.stub_volume_get_db)
+ @mock.patch.object(db, 'volume_type_get',
+ side_effect=stubs.stub_volume_type_get)
+ @mock.patch.object(volume_api.API, 'get',
+ side_effect=stubs.stub_volume_api_get, autospec=True)
@mock.patch.object(volume_api.API, 'update',
- side_effect=stubs.stub_volume_update)
+ side_effect=stubs.stub_volume_update, autospec=True)
def test_volume_update(self, *args):
updates = {
"display_name": "Updated Test Name",
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
return_value={"qos_max_iops": 2000,
"readonly": "False",
"attached_mode": "rw"})
- @mock.patch.object(db, 'volume_get', side_effect=stubs.stub_volume_get_db)
+ @mock.patch.object(db, 'volume_type_get',
+ side_effect=stubs.stub_volume_type_get)
+ @mock.patch.object(volume_api.API, 'get',
+ side_effect=stubs.stub_volume_api_get, autospec=True)
@mock.patch.object(volume_api.API, 'update',
- side_effect=stubs.stub_volume_update)
+ side_effect=stubs.stub_volume_update, autospec=True)
def test_volume_update_metadata(self, *args):
updates = {
"metadata": {"qos_max_iops": 2000}
'volume_type': 'vol_type_name',
'snapshot_id': None,
'source_volid': None,
- 'metadata': {"qos_max_iops": 2000,
+ 'metadata': {"qos_max_iops": '2000',
"readonly": "False",
"attached_mode": "rw"},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1
}}
self.assertEqual(expected, res_dict)
'metadata': {'key': 'value',
'readonly': 'True'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
stubs_volume_admin_metadata_get)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
self.stubs.Set(volume_api.API, 'get_all',
- stubs.stub_volume_get_all_by_project)
+ stubs.stub_volume_api_get_all_by_project)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes')
res_dict = self.controller.index(req)
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}]}
self.assertEqual(expected, res_dict)
# Finally test that we cached the returned volumes
'metadata': {'key': 'value',
'readonly': 'True'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}]}
self.assertEqual(expected, res_dict)
- def test_volume_list_detail(self):
- self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ @mock.patch.object(db, 'volume_admin_metadata_get',
+ return_value={'attached_mode': 'rw',
+ 'readonly': 'False'})
+ def test_volume_list_detail(self, *args):
self.stubs.Set(volume_api.API, 'get_all',
- stubs.stub_volume_get_all_by_project)
+ stubs.stub_volume_api_get_all_by_project)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail')
res_dict = self.controller.index(req)
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}]}
self.assertEqual(expected, res_dict)
# Finally test that we cached the returned volumes
'metadata': {'key': 'value',
'readonly': 'True'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}]}
self.assertEqual(expected, res_dict)
@mock.patch.object(db, 'volume_admin_metadata_get',
return_value={'attached_mode': 'rw',
'readonly': 'False'})
- @mock.patch.object(db, 'volume_get', side_effect=stubs.stub_volume_get_db)
+ @mock.patch.object(volume_api.API, 'get',
+ side_effect=stubs.stub_volume_api_get, autospec=True)
+ @mock.patch.object(db, 'volume_type_get',
+ side_effect=stubs.stub_volume_type_get, autospec=True)
def test_volume_show(self, *args):
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
# Finally test that we cached the returned volume
def test_volume_show_no_attachments(self):
def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, attach_status='detached')
+ vol = stubs.stub_volume(volume_id, attach_status='detached')
+ return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
'source_volid': None,
'metadata': {'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
def test_volume_show_bootable(self):
def stub_volume_get(self, context, volume_id, **kwargs):
- return (stubs.stub_volume(volume_id,
- volume_glance_metadata=dict(foo='bar')))
+ vol = (stubs.stub_volume(volume_id,
+ volume_glance_metadata=dict(foo='bar')))
+ return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
'metadata': {'attached_mode': 'rw',
'readonly': 'False'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
self.stubs.Set(db, 'volume_get_all_by_project',
stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2\
&offset=1',
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
self.assertEqual(1, len(volumes))
- self.assertEqual(2, volumes[0]['id'])
+ self.assertEqual('2', volumes[0]['id'])
# admin case
volume_detail_limit_offset(is_admin=True)
'metadata': {'key': 'value',
'readonly': 'True'},
'id': '1',
- 'created_at': datetime.datetime(1900, 1, 1,
- 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'size': 1}}
self.assertEqual(expected, res_dict)
def test_volume_show_with_encrypted_volume(self):
def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, encryption_key_id='fake_id')
+ vol = stubs.stub_volume(volume_id, encryption_key_id='fake_id')
+ return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, 1)
self.assertEqual(True, res_dict['volume']['encrypted'])
def test_volume_show_with_unencrypted_volume(self):
- def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, encryption_key_id=None)
-
- self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, 1)
def test_admin_list_volumes_limited_to_project(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes',
use_admin_context=True)
self.assertEqual(1, len(res['volumes']))
def test_admin_list_volumes_all_tenants(self):
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1')
res = self.controller.index(req)
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v1/fake/volumes')
res = self.controller.index(req)
# under the License.
import datetime
+import iso8601
from cinder import exception as exc
+from cinder import objects
+from cinder.tests.unit import fake_volume
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
'name': 'vol name',
'display_name': DEFAULT_VOL_NAME,
'display_description': DEFAULT_VOL_DESCRIPTION,
- 'updated_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
- 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'updated_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
+ 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'snapshot_id': None,
'source_volid': None,
'volume_type_id': '3e196c20-3c06-11e2-81c1-0800200c9a66',
'volume_admin_metadata': [{'key': 'attached_mode', 'value': 'rw'},
{'key': 'readonly', 'value': 'False'}],
'bootable': False,
- 'launched_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'launched_at': datetime.datetime(1900, 1, 1, 1, 1, 1,
+ tzinfo=iso8601.iso8601.Utc()),
'volume_type': {'name': DEFAULT_VOL_TYPE},
'replication_status': 'disabled',
'replication_extended_status': None,
source_volume = param.get('source_volume') or {}
vol['source_volid'] = source_volume.get('id')
vol['bootable'] = False
+ vol['volume_attachment'] = []
try:
vol['snapshot_id'] = snapshot['id']
except (KeyError, TypeError):
return vol
+def stub_volume_api_create(self, context, *args, **kwargs):
+ vol = stub_volume_create(self, context, *args, **kwargs)
+ return fake_volume.fake_volume_obj(context, **vol)
+
+
def stub_image_service_detail(self, context, **kwargs):
filters = kwargs.get('filters', {'name': ''})
if filters['name'] == "Fedora-x86_64-20-20140618-sda":
return volume
+def stub_volume_api_get(self, context, volume_id, viewable_admin_meta=False):
+ vol = stub_volume(volume_id)
+ return fake_volume.fake_volume_obj(context, **vol)
+
+
def stub_volume_get_all(context, search_opts=None, marker=None, limit=None,
sort_keys=None, sort_dirs=None, filters=None,
viewable_admin_meta=False, offset=None):
return [stub_volume_get(self, context, '1', viewable_admin_meta=True)]
+def stub_volume_api_get_all_by_project(self, context, marker, limit,
+ sort_keys=None, sort_dirs=None,
+ filters=None,
+ viewable_admin_meta=False,
+ offset=None):
+ filters = filters or {}
+ vol = stub_volume_get(self, context, '1',
+ viewable_admin_meta=viewable_admin_meta)
+ vol_obj = fake_volume.fake_volume_obj(context, **vol)
+ return objects.VolumeList(objects=[vol_obj])
+
+
def stub_snapshot(id, **kwargs):
snapshot = {'id': id,
'volume_id': 12,
def stub_consistencygroup_get_notfound(self, context, cg_id):
raise exc.ConsistencyGroupNotFound(consistencygroup_id=cg_id)
+
+
+def stub_volume_type_get(context, id, *args, **kwargs):
+ return {'id': id,
+ 'name': 'vol_type_name',
+ 'description': 'A fake volume type',
+ 'is_public': True,
+ 'projects': [],
+ 'extra_specs': {},
+ 'created_at': None,
+ 'deleted_at': None,
+ 'updated_at': None,
+ 'deleted': False}
+
+
+def stub_volume_admin_metadata_get(context, volume_id, **kwargs):
+ admin_meta = {'attached_mode': 'rw', 'readonly': 'False'}
+ if kwargs.get('attach_status') == 'detached':
+ del admin_meta['attached_mode']
+
+ return admin_meta
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
+from cinder import volume
CONF = cfg.CONF
'metadata': {}}
-def return_volume(context, volume_id):
- return {'id': 'fake-vol-id',
- 'size': 100,
- 'name': 'fake',
- 'host': 'fake-host',
- 'status': 'available',
- 'encryption_key_id': None,
- 'volume_type_id': None,
- 'migration_status': None,
- 'metadata': {},
- 'project_id': context.project_id}
+def stub_get(context, *args, **kwargs):
+ vol = {'id': 'fake-vol-id',
+ 'size': 100,
+ 'name': 'fake',
+ 'host': 'fake-host',
+ 'status': 'available',
+ 'encryption_key_id': None,
+ 'volume_type_id': None,
+ 'migration_status': None,
+ 'availability_zone': 'fake-zone',
+ 'attach_status': 'detached',
+ 'metadata': {}}
+ return fake_volume.fake_volume_obj(context, **vol)
def return_snapshot_nonexistent(context, snapshot_id):
def setUp(self):
super(SnapshotMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
- self.stubs.Set(cinder.db, 'volume_get', return_volume)
+ self.stubs.Set(volume.API, 'get', stub_get)
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import stubs
+from cinder.tests.unit import fake_volume
+from cinder import volume
from cinder.volume import api as volume_api
def return_volume_metadata(context, volume_id):
- if not isinstance(volume_id, str) or not len(volume_id) == 36:
- msg = 'id %s must be a uuid in return volume metadata' % volume_id
- raise Exception(msg)
return stub_volume_metadata()
return metadata
-def return_volume(context, volume_id):
- return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
- 'name': 'fake',
- 'metadata': {},
- 'project_id': context.project_id}
+def get_volume(*args, **kwargs):
+ vol = {'name': 'fake',
+ 'metadata': {}}
+ return fake_volume.fake_volume_obj(args[0], **vol)
def return_volume_nonexistent(*args, **kwargs):
def setUp(self):
super(volumeMetaDataTest, self).setUp()
self.volume_api = volume_api.API()
- self.stubs.Set(db, 'volume_get', return_volume)
+ self.stubs.Set(volume.API, 'get', get_volume)
self.stubs.Set(db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(db, 'service_get_all_by_topic',
req, self.req_id, body)
def test_create_nonexistent_volume(self):
- self.stubs.Set(db, 'volume_get',
- return_volume_nonexistent)
+ self.stubs.Set(volume.API, 'get', return_volume_nonexistent)
self.stubs.Set(db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(db, 'volume_metadata_update',
import datetime
-import functools
+import iso8601
from lxml import etree
import mock
from cinder import context
from cinder import db
from cinder import exception
+from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit.api.v2 import stubs
from cinder.tests.unit import fake_notifier
+from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit import utils
from cinder.volume import api as volume_api
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create(self, mock_validate):
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
vol = self._vol_in_request_body()
body = {"volume": vol}
volume_id = res_dict['volume']['id']
self.assertEqual(1, len(res_dict))
+ vol_db = stubs.stub_volume(volume_id, volume_type={'name': vol_type})
+ vol_obj = fake_volume.fake_volume_obj(context.get_admin_context(),
+ **vol_db)
self.stubs.Set(volume_api.API, 'get_all',
lambda *args, **kwargs:
- [stubs.stub_volume(volume_id,
- volume_type={'name': vol_type})])
+ objects.VolumeList(objects=[vol_obj]))
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail')
res_dict = self.controller.detail(req)
self.assertTrue(mock_validate.called)
'availability_zone': availability_zone,
'bootable': 'false',
'consistencygroup_id': consistencygroup_id,
- 'created_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
- 'updated_at': datetime.datetime(1900, 1, 1, 1, 1, 1),
+ 'created_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1, tzinfo=iso8601.iso8601.Utc()),
+ 'updated_at': datetime.datetime(
+ 1900, 1, 1, 1, 1, 1, tzinfo=iso8601.iso8601.Utc()),
'description': description,
'id': stubs.DEFAULT_VOL_ID,
'links':
'multiattach': False,
}
+ @mock.patch.object(db, 'volume_type_get', autospec=True)
@mock.patch.object(volume_api.API, 'get_snapshot', autospec=True)
@mock.patch.object(volume_api.API, 'create', autospec=True)
- def test_volume_creation_from_snapshot(self, create, get_snapshot):
-
- create.side_effect = stubs.stub_volume_create
+ def test_volume_creation_from_snapshot(self, create, get_snapshot,
+ volume_type_get):
+ create.side_effect = stubs.stub_volume_api_create
get_snapshot.side_effect = stubs.stub_snapshot_get
+ volume_type_get.side_effect = stubs.stub_volume_type_get
snapshot_id = stubs.TEST_SNAPSHOT_UUID
vol = self._vol_in_request_body(snapshot_id=stubs.TEST_SNAPSHOT_UUID)
get_snapshot.assert_called_once_with(self.controller.volume_api,
context, snapshot_id)
+ @mock.patch.object(db, 'volume_type_get', autospec=True)
@mock.patch.object(volume_api.API, 'get_volume', autospec=True)
@mock.patch.object(volume_api.API, 'create', autospec=True)
- def test_volume_creation_from_source_volume(self, create, get_volume):
-
- get_volume.side_effect = functools.partial(stubs.stub_volume_get,
- viewable_admin_meta=True)
- create.side_effect = stubs.stub_volume_create
+ def test_volume_creation_from_source_volume(self, create, get_volume,
+ volume_type_get):
+ get_volume.side_effect = stubs.stub_volume_api_get
+ create.side_effect = stubs.stub_volume_api_create
+ volume_type_get.side_effect = stubs.stub_volume_type_get
source_volid = '2f49aa3a-6aae-488d-8b99-a43271605af6'
vol = self._vol_in_request_body(source_volid=source_volid)
get_volume.assert_called_once_with(self.controller.volume_api,
context, source_volid)
+ db_vol = stubs.stub_volume(source_volid)
+ vol_obj = fake_volume.fake_volume_obj(context, **db_vol)
kwargs = self._expected_volume_api_create_kwargs(
- source_volume=stubs.stub_volume(source_volid))
+ source_volume=vol_obj)
create.assert_called_once_with(self.controller.volume_api, context,
vol['size'], stubs.DEFAULT_VOL_NAME,
stubs.DEFAULT_VOL_DESCRIPTION, **kwargs)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_ref(self, mock_validate):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = self._vol_in_request_body(
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_id(self, mock_validate):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
vol = self._vol_in_request_body(
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_create_with_image_name(self, mock_validate):
- self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)
- self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
+ self.stubs.Set(volume_api.API, "create", stubs.stub_volume_api_create)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
self.stubs.Set(fake_image._FakeImageService,
"detail",
stubs.stub_image_service_detail)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_update(self, mock_validate):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
updates = {
"name": "Updated Test Name",
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_update_deprecation(self, mock_validate):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
updates = {
"display_name": "Updated Test Name",
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_update_deprecation_key_priority(self, mock_validate):
"""Test current update keys have priority over deprecated keys."""
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
updates = {
"name": "New Name",
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_volume_update_metadata(self, mock_validate):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
updates = {
"metadata": {"qos_max_iops": 2000}
expected = self._expected_vol_from_controller(
availability_zone=stubs.DEFAULT_AZ,
metadata={'attached_mode': 'rw', 'readonly': 'False',
- 'qos_max_iops': 2000})
+ 'qos_max_iops': '2000'})
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
self.assertTrue(mock_validate.called)
'server_id': stubs.FAKE_UUID,
'host_name': None,
'device': '/',
- 'attached_at': attach_tmp['attach_time'],
+ 'attached_at': attach_tmp['attach_time'].replace(
+ tzinfo=iso8601.iso8601.Utc()),
}],
metadata={'key': 'value', 'readonly': 'True'},
with_migration_status=True)
- expected['volume']['updated_at'] = volume_tmp['updated_at']
+ expected['volume']['updated_at'] = volume_tmp['updated_at'].replace(
+ tzinfo=iso8601.iso8601.Utc())
self.assertEqual(expected, res_dict)
self.assertEqual(2, len(self.notifier.notifications))
self.assertTrue(mock_validate.called)
def test_volume_list_summary(self):
self.stubs.Set(volume_api.API, 'get_all',
- stubs.stub_volume_get_all_by_project)
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ stubs.stub_volume_api_get_all_by_project)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes')
res_dict = self.controller.index(req)
def test_volume_list_detail(self):
self.stubs.Set(volume_api.API, 'get_all',
- stubs.stub_volume_get_all_by_project)
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ stubs.stub_volume_api_get_all_by_project)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail')
res_dict = self.controller.detail(req)
'host_name': None,
'id': '1',
'volume_id': stubs.DEFAULT_VOL_ID,
- 'attached_at': attach_tmp['attach_time'],
+ 'attached_at': attach_tmp['attach_time'].replace(
+ tzinfo=iso8601.iso8601.Utc()),
}],
metadata={'key': 'value', 'readonly': 'True'},
with_migration_status=True)
- exp_vol['volume']['updated_at'] = volume_tmp['updated_at']
+ exp_vol['volume']['updated_at'] = volume_tmp['updated_at'].replace(
+ tzinfo=iso8601.iso8601.Utc())
expected = {'volumes': [exp_vol['volume']]}
self.assertEqual(expected, res_dict)
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
self.assertEqual(2, len(volumes))
- self.assertEqual(1, volumes[0]['id'])
- self.assertEqual(2, volumes[1]['id'])
+ self.assertEqual('1', volumes[0]['id'])
+ self.assertEqual('2', volumes[1]['id'])
def test_volume_index_limit(self):
self.stubs.Set(db, 'volume_get_all_by_project',
]
self.stubs.Set(db, 'volume_get_all_by_project',
stub_volume_get_all_by_project)
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1')
res_dict = self.controller.detail(req)
volumes = res_dict['volumes']
self.assertEqual(2, len(volumes))
- self.assertEqual(1, volumes[0]['id'])
- self.assertEqual(2, volumes[1]['id'])
+ self.assertEqual('1', volumes[0]['id'])
+ self.assertEqual('2', volumes[1]['id'])
def test_volume_detail_limit(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=1')
res_dict = self.controller.detail(req)
def test_volume_detail_limit_marker(self):
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1&limit=1')
res_dict = self.controller.detail(req)
self.assertEqual('vol3', resp['volumes'][0]['name'])
def test_volume_show(self):
- self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, '1')
def test_volume_show_no_attachments(self):
def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, attach_status='detached')
+ vol = stubs.stub_volume(volume_id, attach_status='detached')
+ return fake_volume.fake_volume_obj(context, **vol)
+
+ def stub_volume_admin_metadata_get(context, volume_id, **kwargs):
+ return stubs.stub_volume_admin_metadata_get(
+ context, volume_id, attach_status='detached')
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(db, 'volume_admin_metadata_get',
+ stub_volume_admin_metadata_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, '1')
'server_id': stubs.FAKE_UUID,
'host_name': None,
'device': '/',
- 'attached_at': attach_tmp['attach_time'],
+ 'attached_at': attach_tmp['attach_time'].replace(
+ tzinfo=iso8601.iso8601.Utc()),
}],
metadata={'key': 'value', 'readonly': 'True'},
with_migration_status=True)
- expected['volume']['updated_at'] = volume_tmp['updated_at']
+ expected['volume']['updated_at'] = volume_tmp['updated_at'].replace(
+ tzinfo=iso8601.iso8601.Utc())
self.assertEqual(expected, res_dict)
def test_volume_show_with_encrypted_volume(self):
def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, encryption_key_id='fake_id')
+ vol = stubs.stub_volume(volume_id, encryption_key_id='fake_id')
+ return fake_volume.fake_volume_obj(context, **vol)
self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, 1)
self.assertEqual(True, res_dict['volume']['encrypted'])
def test_volume_show_with_unencrypted_volume(self):
- def stub_volume_get(self, context, volume_id, **kwargs):
- return stubs.stub_volume(volume_id, encryption_key_id=None)
-
- self.stubs.Set(volume_api.API, 'get', stub_volume_get)
+ self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_api_get)
+ self.stubs.Set(db, 'volume_type_get', stubs.stub_volume_type_get)
req = fakes.HTTPRequest.blank('/v2/volumes/1')
res_dict = self.controller.show(req, 1)
from cinder.cmd import volume_usage_audit
from cinder import context
from cinder import test
+from cinder.tests.unit import fake_volume
from cinder import version
CONF = cfg.CONF
def tearDown(self):
super(TestCinderManageCmd, self).tearDown()
- @mock.patch('oslo_utils.uuidutils.is_uuid_like')
- def test_param2id(self, is_uuid_like):
- mock_object_id = mock.MagicMock()
- is_uuid_like.return_value = True
-
- object_id = cinder_manage.param2id(mock_object_id)
- self.assertEqual(mock_object_id, object_id)
- is_uuid_like.assert_called_once_with(mock_object_id)
-
- @mock.patch('oslo_utils.uuidutils.is_uuid_like')
- def test_param2id_int_string(self, is_uuid_like):
- object_id_str = '10'
- is_uuid_like.return_value = False
-
- object_id = cinder_manage.param2id(object_id_str)
- self.assertEqual(10, object_id)
- is_uuid_like.assert_called_once_with(object_id_str)
-
@mock.patch('cinder.db.migration.db_sync')
def test_db_commands_sync(self, db_sync):
version = mock.MagicMock()
@mock.patch('cinder.rpc.init')
def test_volume_commands_delete(self, rpc_init, get_client,
get_admin_context, volume_get):
- ctxt = context.RequestContext('fake-user', 'fake-project')
+ ctxt = context.RequestContext('admin', 'fake', True)
get_admin_context.return_value = ctxt
mock_client = mock.MagicMock()
cctxt = mock.MagicMock()
mock_client.prepare.return_value = cctxt
get_client.return_value = mock_client
- volume_id = '123'
host = 'fake@host'
- volume = {'id': volume_id,
- 'host': host + '#pool1',
- 'status': 'available'}
+ db_volume = {'host': host + '#pool1'}
+ volume = fake_volume.fake_db_volume(**db_volume)
+ volume_obj = fake_volume.fake_volume_obj(ctxt, **volume)
+ volume_id = volume['id']
volume_get.return_value = volume
volume_cmds = cinder_manage.VolumeCommands()
volume_cmds._client = mock_client
volume_cmds.delete(volume_id)
- volume_get.assert_called_once_with(ctxt, 123)
- # NOTE prepare called w/o pool part in host
+ volume_get.assert_called_once_with(ctxt, volume_id)
mock_client.prepare.assert_called_once_with(server=host)
cctxt.cast.assert_called_once_with(ctxt, 'delete_volume',
- volume_id=volume['id'])
+ volume_id=volume['id'],
+ volume=volume_obj)
@mock.patch('cinder.db.volume_destroy')
@mock.patch('cinder.db.volume_get')
@mock.patch('cinder.rpc.init')
def test_volume_commands_delete_no_host(self, rpc_init, get_admin_context,
volume_get, volume_destroy):
- ctxt = context.RequestContext('fake-user', 'fake-project')
+ ctxt = context.RequestContext('fake-user', 'fake-project',
+ is_admin=True)
get_admin_context.return_value = ctxt
- volume_id = '123'
- volume = {'id': volume_id, 'host': None, 'status': 'available'}
+ volume = fake_volume.fake_db_volume()
+ volume_id = volume['id']
volume_get.return_value = volume
with mock.patch('sys.stdout', new=six.StringIO()) as fake_out:
volume_cmds.delete(volume_id)
get_admin_context.assert_called_once_with()
- volume_get.assert_called_once_with(ctxt, 123)
- volume_destroy.assert_called_once_with(ctxt, 123)
+ volume_get.assert_called_once_with(ctxt, volume_id)
+ self.assertTrue(volume_destroy.called)
+ admin_context = volume_destroy.call_args[0][0]
+ self.assertTrue(admin_context.is_admin)
self.assertEqual(expected_out, fake_out.getvalue())
@mock.patch('cinder.db.volume_destroy')
volume_get, volume_destroy):
ctxt = context.RequestContext('fake-user', 'fake-project')
get_admin_context.return_value = ctxt
- volume_id = '123'
- volume = {'id': volume_id, 'host': 'fake-host', 'status': 'in-use'}
+ db_volume = {'status': 'in-use', 'host': 'fake-host'}
+ volume = fake_volume.fake_db_volume(**db_volume)
+ volume_id = volume['id']
volume_get.return_value = volume
with mock.patch('sys.stdout', new=six.StringIO()) as fake_out:
volume_cmds = cinder_manage.VolumeCommands()
volume_cmds.delete(volume_id)
- volume_get.assert_called_once_with(ctxt, 123)
+ volume_get.assert_called_once_with(ctxt, volume_id)
self.assertEqual(expected_out, fake_out.getvalue())
def test_config_commands_list(self):
vol['status'] = 'available'
vol['volume_type_id'] = self.volume_type['id']
vol['host'] = 'fake_host'
- return db.volume_create(self.context, vol)
+ vol['availability_zone'] = 'fake_zone'
+ vol['attach_status'] = 'detached'
+ volume = objects.Volume(context=self.context, **vol)
+ volume.create()
+ return volume
def _create_snapshot(self, volume):
snapshot = objects.Snapshot(self.context)
msg = ("Maximum number of volumes allowed (1) exceeded for"
" quota '%s'." % resource)
self.assertEqual(msg, six.text_type(ex))
- db.volume_destroy(self.context, vol_ref['id'])
+ vol_ref.destroy()
def test_too_many_snapshots_of_type(self):
resource = 'snapshots_%s' % self.volume_type_name
volume.API().create_snapshot,
self.context, vol_ref, '', '')
snap_ref.destroy()
- db.volume_destroy(self.context, vol_ref['id'])
+ vol_ref.destroy()
def test_too_many_backups(self):
resource = 'backups'
self.project_id)
self.assertEqual(20, usages['gigabytes']['in_use'])
snap_ref.destroy()
- db.volume_destroy(self.context, vol_ref['id'])
+ vol_ref.destroy()
def test_too_many_combined_backup_gigabytes(self):
vol_ref = self._create_volume(size=10000)
container='container',
incremental=False)
db.backup_destroy(self.context, backup_ref['id'])
- db.volume_destroy(self.context, vol_ref['id'])
+ vol_ref.destroy()
def test_no_snapshot_gb_quota_flag(self):
self.flags(quota_volumes=2,
snap_ref.destroy()
snap_ref2.destroy()
- db.volume_destroy(self.context, vol_ref['id'])
- db.volume_destroy(self.context, vol_ref2['id'])
+ vol_ref.destroy()
+ vol_ref2.destroy()
def test_backup_gb_quota_flag(self):
self.flags(quota_volumes=2,
db.backup_destroy(self.context, backup_ref['id'])
db.backup_destroy(self.context, backup_ref2['id'])
- db.volume_destroy(self.context, vol_ref['id'])
- db.volume_destroy(self.context, vol_ref2['id'])
+ vol_ref.destroy()
+ vol_ref2.destroy()
def test_too_many_gigabytes_of_type(self):
resource = 'gigabytes_%s' % self.volume_type_name
expected = exception.VolumeSizeExceedsAvailableQuota(
requested=1, quota=10, consumed=10, name=resource)
self.assertEqual(str(expected), str(raised_exc))
- db.volume_destroy(self.context, vol_ref['id'])
+ vol_ref.destroy()
class FakeContext(object):
self.assertEqual(4, len(self.notifier.notifications),
self.notifier.notifications)
msg = self.notifier.notifications[2]
- expected['metadata'] = []
self.assertEqual('volume.delete.start', msg['event_type'])
self.assertDictMatch(expected, msg['payload'])
msg = self.notifier.notifications[3]
'volume_get_all_by_project') as by_project:
with mock.patch.object(volume_api.db,
'volume_get_all') as get_all:
- fake_volume = {'volume_type_id': 'fake_type_id',
- 'name': 'fake_name',
- 'host': 'fake_host',
- 'id': 'fake_volume_id'}
+ db_volume = {'volume_type_id': 'fake_type_id',
+ 'name': 'fake_name',
+ 'host': 'fake_host',
+ 'id': 'fake_volume_id'}
- fake_volume_list = []
- fake_volume_list.append([fake_volume])
- by_project.return_value = fake_volume_list
- get_all.return_value = fake_volume_list
+ volume = fake_volume.fake_db_volume(**db_volume)
+ by_project.return_value = [volume]
+ get_all.return_value = [volume]
volume_api.get_all(self.context, filters={'all_tenants': '0'})
self.assertTrue(by_project.called)
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'error_deleting'
- volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
volume_api.delete(self.context, volume, force=True)
# status is deleting
- volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual('deleting', volume['status'])
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
+ self.assertEqual('deleting', volume.status)
# clean up
- self.volume.delete_volume(self.context, volume['id'])
+ self.volume.delete_volume(self.context, volume.id)
def test_cannot_force_delete_attached_volume(self):
"""Test volume can't be force delete in attached state."""
}
volume_api = cinder.volume.api.API()
volume = tests_utils.create_volume(self.context, **volume_params)
- volume = db.volume_update(self.context, volume['id'],
- {'status': 'available'})
+ volume.status = 'available'
+ volume.save()
image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
db.image_volume_cache_create(self.context,
volume['host'],
version='1.32')
can_send_version.assert_called_once_with('1.32')
- def test_delete_volume(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_delete_volume(self, can_send_version):
self._test_volume_api('delete_volume',
rpc_method='cast',
- volume=self.fake_volume,
+ volume=self.fake_volume_obj,
+ unmanage_only=False,
+ version='1.33')
+ can_send_version.assert_called_once_with('1.33')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_delete_volume_old(self, can_send_version):
+ self._test_volume_api('delete_volume',
+ rpc_method='cast',
+ volume=self.fake_volume_obj,
unmanage_only=False,
version='1.15')
+ can_send_version.assert_called_once_with('1.33')
def test_create_snapshot(self):
self._test_volume_api('create_snapshot',
reservations = None
LOG.exception(_LE("Failed to update quota while "
"deleting volume."))
- self.db.volume_destroy(context.elevated(), volume_id)
+ volume.destroy()
if reservations:
QUOTAS.commit(context, reservations, project_id=project_id)
msg = _("Unable to delete encrypted volume: %s.") % e.msg
raise exception.InvalidVolume(reason=msg)
- now = timeutils.utcnow()
- vref = self.db.volume_update(context,
- volume_id,
- {'status': 'deleting',
- 'terminated_at': now})
+ volume.status = 'deleting'
+ volume.terminated_at = timeutils.utcnow()
+ volume.save()
self.volume_rpcapi.delete_volume(context, volume, unmanage_only)
LOG.info(_LI("Delete volume request issued successfully."),
- resource=vref)
+ resource=volume)
@wrap_check_policy
def update(self, context, volume, fields):
LOG.info(_LI("Volume updated successfully."), resource=vref)
def get(self, context, volume_id, viewable_admin_meta=False):
- rv = self.db.volume_get(context, volume_id)
-
- volume = dict(rv)
+ volume = objects.Volume.get_by_id(context, volume_id)
if viewable_admin_meta:
ctxt = context.elevated()
admin_metadata = self.db.volume_admin_metadata_get(ctxt,
volume_id)
- volume['volume_admin_metadata'] = admin_metadata
+ volume.admin_metadata = admin_metadata
+ volume.obj_reset_changes()
try:
check_policy(context, 'get', volume)
# raise VolumeNotFound instead to make sure Cinder behaves
# as it used to
raise exception.VolumeNotFound(volume_id=volume_id)
- LOG.info(_LI("Volume info retrieved successfully."), resource=rv)
+ LOG.info(_LI("Volume info retrieved successfully."), resource=volume)
return volume
def get_all(self, context, marker=None, limit=None, sort_keys=None,
if context.is_admin and allTenants:
# Need to remove all_tenants to pass the filtering below.
del filters['all_tenants']
- volumes = self.db.volume_get_all(context, marker, limit,
- sort_keys=sort_keys,
- sort_dirs=sort_dirs,
- filters=filters,
- offset=offset)
+ volumes = objects.VolumeList.get_all(context, marker, limit,
+ sort_keys=sort_keys,
+ sort_dirs=sort_dirs,
+ filters=filters,
+ offset=offset)
else:
if viewable_admin_meta:
context = context.elevated()
- volumes = self.db.volume_get_all_by_project(context,
- context.project_id,
- marker, limit,
- sort_keys=sort_keys,
- sort_dirs=sort_dirs,
- filters=filters,
- offset=offset)
+ volumes = objects.VolumeList.get_all_by_project(
+ context, context.project_id, marker, limit,
+ sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters,
+ offset=offset)
LOG.info(_LI("Get all volumes completed successfully."))
return volumes
def get_volume(self, context, volume_id):
check_policy(context, 'get_volume')
- vref = self.db.volume_get(context, volume_id)
- LOG.info(_LI("Volume retrieved successfully."), resource=vref)
- return dict(vref)
+ volume = objects.Volume.get_by_id(context, volume_id)
+ LOG.info(_LI("Volume retrieved successfully."), resource=volume)
+ return volume
def get_all_snapshots(self, context, search_opts=None, marker=None,
limit=None, sort_keys=None, sort_dirs=None,
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.32'
+ RPC_API_VERSION = '1.33'
target = messaging.Target(version=RPC_API_VERSION)
# Initialize backend capabilities list
self.driver.init_capabilities()
- volumes = self.db.volume_get_all_by_host(ctxt, self.host)
+ volumes = objects.VolumeList.get_all_by_host(ctxt, self.host)
snapshots = self.db.snapshot_get_by_host(ctxt, self.host)
self._sync_provider_info(ctxt, volumes, snapshots)
# FIXME volume count for exporting is wrong
LOG.exception(_LE("Failed to re-export volume, "
"setting to ERROR."),
resource=volume)
- self.db.volume_update(ctxt,
- volume['id'],
- {'status': 'error'})
+ volume.status = 'error'
+ volume.save()
elif volume['status'] in ('downloading', 'creating'):
LOG.warning(_LW("Detected volume stuck "
"in %(curr_status)s "
if volume['status'] == 'downloading':
self.driver.clear_download(ctxt, volume)
- self.db.volume_update(ctxt,
- volume['id'],
- {'status': 'error'})
+ volume.status = 'error'
+ volume.save()
else:
pass
snapshots = objects.SnapshotList.get_by_host(
return vol_ref.id
@locked_volume_operation
- def delete_volume(self, context, volume_id, unmanage_only=False):
+ def delete_volume(self, context, volume_id, unmanage_only=False,
+ volume=None):
"""Deletes and unexports volume.
1. Delete a volume(normal case)
context = context.elevated()
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is not None:
+ volume_id = volume.id
+
try:
- volume_ref = self.db.volume_get(context, volume_id)
+ # TODO(thangp): Replace with volume.refresh() when it is available
+ volume = objects.Volume.get_by_id(context, volume_id)
except exception.VolumeNotFound:
# NOTE(thingee): It could be possible for a volume to
# be deleted when resuming deletes from init_host().
volume_id)
return True
- if context.project_id != volume_ref['project_id']:
- project_id = volume_ref['project_id']
+ if context.project_id != volume.project_id:
+ project_id = volume.project_id
else:
project_id = context.project_id
- if volume_ref['attach_status'] == "attached":
+ if volume['attach_status'] == "attached":
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume_id)
- if vol_utils.extract_host(volume_ref['host']) != self.host:
+ if vol_utils.extract_host(volume.host) != self.host:
raise exception.InvalidVolume(
reason=_("volume is not local to this node"))
# The status 'deleting' is not included, because it only applies to
# the source volume to be deleted after a migration. No quota
# needs to be handled for it.
- is_migrating = volume_ref['migration_status'] not in (None, 'error',
- 'success')
+ is_migrating = volume.migration_status not in (None, 'error',
+ 'success')
is_migrating_dest = (is_migrating and
- volume_ref['migration_status'].startswith(
+ volume.migration_status.startswith(
'target:'))
- self._notify_about_volume_usage(context, volume_ref, "delete.start")
+ self._notify_about_volume_usage(context, volume, "delete.start")
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
# and the volume status updated.
utils.require_driver_initialized(self.driver)
- self.driver.remove_export(context, volume_ref)
+ self.driver.remove_export(context, volume)
if unmanage_only:
- self.driver.unmanage(volume_ref)
+ self.driver.unmanage(volume)
else:
- self.driver.delete_volume(volume_ref)
+ self.driver.delete_volume(volume)
except exception.VolumeIsBusy:
LOG.error(_LE("Unable to delete busy volume."),
- resource=volume_ref)
+ resource=volume)
# If this is a destination volume, we have to clear the database
# record to avoid user confusion.
- self._clear_db(context, is_migrating_dest, volume_ref,
+ self._clear_db(context, is_migrating_dest, volume,
'available')
return True
except Exception:
with excutils.save_and_reraise_exception():
# If this is a destination volume, we have to clear the
# database record to avoid user confusion.
- self._clear_db(context, is_migrating_dest, volume_ref,
+ self._clear_db(context, is_migrating_dest, volume,
'error_deleting')
# If deleting source/destination volume in a migration, we should
# Get reservations
try:
reserve_opts = {'volumes': -1,
- 'gigabytes': -volume_ref['size']}
+ 'gigabytes': -volume.size}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
- volume_ref.get('volume_type_id'))
+ volume.volume_type_id)
reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
except Exception:
reservations = None
LOG.exception(_LE("Failed to update usages deleting volume."),
- resource=volume_ref)
+ resource=volume)
# Delete glance metadata if it exists
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
- self.db.volume_destroy(context, volume_id)
+ volume.destroy()
# If deleting source/destination volume in a migration, we should
# skip quotas.
if not is_migrating:
- self._notify_about_volume_usage(context, volume_ref, "delete.end")
+ self._notify_about_volume_usage(context, volume, "delete.end")
# Commit the reservations
if reservations:
QUOTAS.commit(context, reservations, project_id=project_id)
- pool = vol_utils.extract_host(volume_ref['host'], 'pool')
+ pool = vol_utils.extract_host(volume.host, 'pool')
if pool is None:
# Legacy volume, put them into default pool
pool = self.driver.configuration.safe_get(
'volume_backend_name') or vol_utils.extract_host(
- volume_ref['host'], 'pool', True)
- size = volume_ref['size']
+ volume.host, 'pool', True)
+ size = volume.size
try:
self.stats['pools'][pool]['allocated_capacity_gb'] -= size
self.publish_service_capabilities(context)
- LOG.info(_LI("Deleted volume successfully."), resource=volume_ref)
+ LOG.info(_LI("Deleted volume successfully."), resource=volume)
return True
def _clear_db(self, context, is_migrating_dest, volume_ref, status):
# driver.delete_volume() fails in delete_volume(), so it is already
# in the exception handling part.
if is_migrating_dest:
- self.db.volume_destroy(context, volume_ref['id'])
+ volume_ref.destroy()
LOG.error(_LE("Unable to delete the destination volume "
"during volume migration, (NOTE: database "
"record needs to be deleted)."), resource=volume_ref)
else:
- self.db.volume_update(context,
- volume_ref['id'],
- {'status': status})
+ volume_ref.status = status
+ volume_ref.save()
def create_snapshot(self, context, volume_id, snapshot):
"""Creates and exports the snapshot."""
and delete_cgsnapshot() to cast method only with necessary
args. Forwarding CGSnapshot object instead of CGSnapshot_id.
1.32 - Adds support for sending objects over RPC in create_volume().
+ 1.33 - Adds support for sending objects over RPC in delete_volume().
"""
BASE_RPC_API_VERSION = '1.0'
cctxt.cast(ctxt, 'create_volume', **msg_args)
def delete_volume(self, ctxt, volume, unmanage_only=False):
- new_host = utils.extract_host(volume['host'])
- cctxt = self.client.prepare(server=new_host, version='1.15')
- cctxt.cast(ctxt, 'delete_volume',
- volume_id=volume['id'],
- unmanage_only=unmanage_only)
+ msg_args = {'volume_id': volume.id, 'unmanage_only': unmanage_only}
+ if self.client.can_send_version('1.33'):
+ version = '1.33'
+ msg_args['volume'] = volume
+ else:
+ version = '1.15'
+
+ new_host = utils.extract_host(volume.host)
+ cctxt = self.client.prepare(server=new_host, version=version)
+ cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
new_host = utils.extract_host(volume['host'])