kwargs['cgsnapshot'] = cgsnapshot
kwargs['consistencygroup'] = group
kwargs['snapshot'] = snapshot
- volume_type_id = snapshot.get('volume_type_id')
+ volume_type_id = snapshot.volume_type_id
if volume_type_id:
kwargs['volume_type'] = volume_types.get_volume_type(
context, volume_type_id)
# and removal of volume entry in the db.
try:
self.volume_api.create(context,
- snapshot['volume_size'],
+ snapshot.volume_size,
None,
None,
**kwargs)
def _reset_metadata_tracking(self, fields=None):
if fields is None or 'metadata' in fields:
self._orig_metadata = (dict(self.metadata)
- if 'metadata' in self else {})
+ if self.obj_attr_is_set('metadata') else {})
def obj_what_changed(self):
changes = super(Snapshot, self).obj_what_changed()
- if 'metadata' in self and self.metadata != self._orig_metadata:
+ if hasattr(self, 'metadata') and self.metadata != self._orig_metadata:
changes.add('metadata')
return changes
value = db_snapshot.get(name)
if isinstance(field, fields.IntegerField):
value = value if value is not None else 0
- snapshot[name] = value
+ setattr(snapshot, name, value)
if 'volume' in expected_attrs:
volume = objects.Volume(context)
snapshots = db.snapshot_get_all(context, search_opts, marker, limit,
sort_keys, sort_dirs, offset)
return base.obj_make_list(context, cls(), objects.Snapshot,
- snapshots,
- expected_attrs=['metadata'])
+ snapshots, expected_attrs=['metadata'])
@base.remotable_classmethod
def get_by_host(cls, context, host, filters=None):
def _issue_snapshot_reset(self, ctx, snapshot, updated_status):
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
- snapshot['id'])
+ snapshot.id)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.body = \
}
snapshot = objects.Snapshot(context=ctx, **kwargs)
snapshot.create()
+ self.addCleanup(snapshot.destroy)
- resp = self._issue_snapshot_reset(ctx,
- snapshot,
- {'status': 'error'})
+ resp = self._issue_snapshot_reset(ctx, snapshot, {'status': 'error'})
self.assertEqual(202, resp.status_int)
snapshot = objects.Snapshot.get_by_id(ctx, snapshot['id'])
ctx = context.RequestContext('admin', 'fake', True)
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
- snapshot = db.snapshot_create(ctx, {'status': 'available',
- 'volume_id': volume['id']})
+ snapshot = objects.Snapshot(ctx, status='available',
+ volume_id=volume['id'])
+ snapshot.create()
+ self.addCleanup(snapshot.destroy)
- resp = self._issue_snapshot_reset(ctx,
- snapshot,
+ resp = self._issue_snapshot_reset(ctx, snapshot,
{'status': 'attaching'})
self.assertEqual(400, resp.status_int)
- snapshot = db.snapshot_get(ctx, snapshot['id'])
- self.assertEqual('available', snapshot['status'])
+ self.assertEqual('available', snapshot.status)
def test_force_delete(self):
# admin context
host = 'test2'
ctx = context.RequestContext('admin', 'fake', True)
volume = self._migrate_volume_prep()
- db.snapshot_create(ctx, {'volume_id': volume['id']})
+ snap = objects.Snapshot(ctx, volume_id=volume['id'])
+ snap.create()
+ self.addCleanup(snap.destroy)
self._migrate_volume_exec(ctx, volume, host, expected_status)
def test_migrate_volume_bad_force_host_copy(self):
expected_status = 403
expected_id = None
ctx = context.RequestContext('fake', 'fake')
- volume = self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
- expected_status, expected_id)
+ self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
+ expected_status, expected_id)
def test_migrate_volume_comp_no_mig_status(self):
admin_ctx = context.get_admin_context()
expected_status = 200
expected_id = 'fake2'
ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
- expected_status, expected_id)
+ self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
+ expected_status, expected_id)
def test_backup_reset_valid_updates(self):
vac = admin_actions.BackupAdminController()
def setUp(self):
super(ConsistencyGroupsAPITestCase, self).setUp()
self.cg_api = cinder.consistencygroup.API()
- self.ctxt = context.RequestContext('fake', 'fake', auth_token=True)
+ self.ctxt = context.RequestContext('fake', 'fake', auth_token=True,
+ is_admin=True)
def _create_consistencygroup(
self,
volume_type_id = '123456'
consistencygroup = self._create_consistencygroup(status='available',
host='test_host')
+
remove_volume_id = utils.create_volume(
self.ctxt,
volume_type_id=volume_type_id,
cgsnapshot_id = utils.create_cgsnapshot(
self.ctxt,
consistencygroup_id=consistencygroup.id)['id']
- snapshot_id = utils.create_snapshot(
+ snapshot = utils.create_snapshot(
self.ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
- status='available')['id']
+ status='available')
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
self.ctxt.elevated(), res_dict['consistencygroup']['id'])
cg_ref.destroy()
- db.snapshot_destroy(self.ctxt.elevated(), snapshot_id)
+ snapshot.destroy()
db.cgsnapshot_destroy(self.ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(self.ctxt.elevated(), volume_id)
consistencygroup.destroy()
cgsnapshot_id = utils.create_cgsnapshot(
self.ctxt,
consistencygroup_id=consistencygroup.id)['id']
- snapshot_id = utils.create_snapshot(
+ snapshot = utils.create_snapshot(
self.ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
- status='available')['id']
+ status='available')
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
self.assertEqual(400, res_dict['badRequest']['code'])
self.assertIsNotNone(res_dict['badRequest']['message'])
- db.snapshot_destroy(self.ctxt.elevated(), snapshot_id)
+ snapshot.destroy()
db.cgsnapshot_destroy(self.ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(self.ctxt.elevated(), volume_id)
consistencygroup.destroy()
cgsnapshot_id = utils.create_cgsnapshot(
self.ctxt,
consistencygroup_id=consistencygroup.id)['id']
- snapshot_id = utils.create_snapshot(
+ snapshot = utils.create_snapshot(
self.ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
- status='available')['id']
+ status='available')
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
'group')
self.assertIn(msg, res_dict['badRequest']['message'])
- db.snapshot_destroy(self.ctxt.elevated(), snapshot_id)
+ snapshot.destroy()
db.cgsnapshot_destroy(self.ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(self.ctxt.elevated(), volume_id)
consistencygroup.destroy()
cgsnapshot_id = utils.create_cgsnapshot(
self.ctxt,
consistencygroup_id=consistencygroup.id)['id']
- snapshot_id = utils.create_snapshot(
+ snapshot = utils.create_snapshot(
self.ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
- status='available')['id']
+ status='available')
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
msg = _("Create volume failed.")
self.assertEqual(msg, res_dict['badRequest']['message'])
- db.snapshot_destroy(self.ctxt.elevated(), snapshot_id)
+ snapshot.destroy()
db.cgsnapshot_destroy(self.ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(self.ctxt.elevated(), volume_id)
consistencygroup.destroy()
from cinder.db.sqlalchemy import api as sqa_api
from cinder.db.sqlalchemy import models as sqa_models
from cinder import exception
+from cinder import objects
from cinder import quota
from cinder import test
import cinder.tests.unit.image.fake
class QuotaIntegrationTestCase(test.TestCase):
def setUp(self):
+ objects.register_all()
super(QuotaIntegrationTestCase, self).setUp()
self.volume_type_name = CONF.default_volume_type
self.volume_type = db.volume_type_create(
return db.volume_create(self.context, vol)
def _create_snapshot(self, volume):
- snapshot = {}
- snapshot['user_id'] = self.user_id
- snapshot['project_id'] = self.project_id
- snapshot['volume_id'] = volume['id']
- snapshot['volume_size'] = volume['size']
- snapshot['host'] = volume['host']
- snapshot['status'] = 'available'
- return db.snapshot_create(self.context, snapshot)
+ snapshot = objects.Snapshot(self.context)
+ snapshot.user_id = self.user_id or 'fake_user_id'
+ snapshot.project_id = self.project_id or 'fake_project_id'
+ snapshot.volume_id = volume['id']
+ snapshot.volume_size = volume['size']
+ snapshot.host = volume['host']
+ snapshot.status = 'available'
+ snapshot.create()
+ return snapshot
def _create_backup(self, volume):
backup = {}
self.assertRaises(exception.SnapshotLimitExceeded,
volume.API().create_snapshot,
self.context, vol_ref, '', '')
- db.snapshot_destroy(self.context, snap_ref['id'])
+ snap_ref.destroy()
db.volume_destroy(self.context, vol_ref['id'])
def test_too_many_backups(self):
usages = db.quota_usage_get_all_by_project(self.context,
self.project_id)
self.assertEqual(20, usages['gigabytes']['in_use'])
- db.snapshot_destroy(self.context, snap_ref['id'])
+ snap_ref.destroy()
db.volume_destroy(self.context, vol_ref['id'])
def test_too_many_combined_backup_gigabytes(self):
self.assertEqual(20, usages['gigabytes']['in_use'])
self.assertEqual(0, usages['gigabytes']['reserved'])
- db.snapshot_destroy(self.context, snap_ref['id'])
- db.snapshot_destroy(self.context, snap_ref2['id'])
+ snap_ref.destroy()
+ snap_ref2.destroy()
db.volume_destroy(self.context, vol_ref['id'])
db.volume_destroy(self.context, vol_ref2['id'])
from cinder.tests.unit.brick import fake_lvm
from cinder.tests.unit import conf_fixture
from cinder.tests.unit import fake_driver
+from cinder.tests.unit import fake_snapshot
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import utils as tests_utils
'status': 'available',
'volume_size': 10,
'volume_type_id': biz_type['id']}
-
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
+ **snapshot)
# Make sure the case of specifying a type that
# doesn't match the snapshots type fails
with mock.patch.object(cinder.volume.volume_types,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
- snapshot=snapshot)
+ snapshot=snapshot_obj)
# Make sure that trying to specify a type
# when the snapshots type is None fails
- snapshot['volume_type_id'] = None
+ snapshot_obj.volume_type_id = None
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
- snapshot=snapshot)
+ snapshot=snapshot_obj)
- snapshot['volume_type_id'] = foo_type['id']
+ snapshot_obj.volume_type_id = foo_type['id']
volume_api.create(self.context, size=1, name='fake_name',
description='fake_desc', volume_type=foo_type,
- snapshot=snapshot)
+ snapshot=snapshot_obj)
db.volume_type_destroy(context.get_admin_context(),
foo_type['id'])
'status': 'available',
'volume_size': 10,
'volume_type_id': biz_type['id']}
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
+ **snapshot)
with mock.patch.object(db,
'service_get_all_by_topic') as mock_get_service, \
name='fake_name',
description='fake_desc',
volume_type=foo_type,
- snapshot=snapshot)
+ snapshot=snapshot_obj)
def test_create_snapshot_driver_not_initialized(self):
volume_src = tests_utils.create_volume(self.context,
self.context, volume_src['id'], snapshot_obj)
# NOTE(flaper87): The volume status should be error.
- snapshot = db.snapshot_get(context.get_admin_context(), snapshot_id)
- self.assertEqual("error", snapshot.status)
+ self.assertEqual("error", snapshot_obj.status)
# lets cleanup the mess
self.volume.driver._initialized = True
self.volume.create_snapshot(self.context, src_vol['id'], snapshot_obj)
# ensure that status of snapshot is 'available'
- snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
- self.assertEqual('available', snapshot_ref)
+ self.assertEqual('available', snapshot_obj.status)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
dst_vol['id'])
# cleanup resource
- db.snapshot_destroy(self.context, snapshot_id)
+ snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
@mock.patch(
self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
# ensure that status of snapshot is 'available'
- snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
- self.assertEqual('available', snapshot_ref)
+ self.assertEqual('available', snapshot_obj.status)
# create volume from snapshot
dst_vol = tests_utils.create_volume(self.context,
self.assertEqual('available', vol['status'])
# cleanup resource
- db.snapshot_destroy(self.context, snapshot_id)
+ snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
snapshot = {'id': 1234,
'status': 'available',
'volume_size': 10}
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
+ **snapshot)
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
- snapshot=snapshot)
+ snapshot=snapshot_obj)
def test_create_volume_from_snapshot_fail_wrong_az(self):
"""Test volume can't be created from snapshot in a different az."""
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
snapshot = self._create_snapshot(volume_src['id'])
- snapshot_obj = objects.Snapshot.get_by_id(self.context,
- snapshot['id'])
+
self.volume.create_snapshot(self.context, volume_src['id'],
- snapshot_obj)
- snapshot = db.snapshot_get(self.context, snapshot['id'])
+ snapshot)
volume_dst = volume_api.create(self.context,
size=1,
pass
@staticmethod
- def _create_snapshot(volume_id, size='0', metadata=None):
+ def _create_snapshot(volume_id, size=1, metadata=None):
"""Create a snapshot object."""
- snap = {}
- snap['volume_size'] = size
- snap['user_id'] = 'fake'
- snap['project_id'] = 'fake'
- snap['volume_id'] = volume_id
- snap['status'] = "creating"
+ metadata = metadata or {}
+ snap = objects.Snapshot(context.get_admin_context())
+ snap.volume_size = size
+ snap.user_id = 'fake'
+ snap.project_id = 'fake'
+ snap.volume_id = volume_id
+ snap.status = "creating"
if metadata is not None:
- snap['metadata'] = metadata
- return db.snapshot_create(context.get_admin_context(), snap)
+ snap.metadata = metadata
+ snap.create()
+ return snap
def test_create_delete_snapshot(self):
"""Test snapshot can be created and deleted."""
self.assertEqual(2, len(self.notifier.notifications),
self.notifier.notifications)
- snapshot_id = self._create_snapshot(volume['id'],
- size=volume['size'])['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
- self.assertEqual(snapshot_id,
- db.snapshot_get(context.get_admin_context(),
- snapshot_id).id)
+ snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ snapshot_id = snapshot.id
+ self.volume.create_snapshot(self.context, volume['id'], snapshot)
+ self.assertEqual(
+ snapshot_id, objects.Snapshot.get_by_id(self.context,
+ snapshot_id).id)
msg = self.notifier.notifications[2]
self.assertEqual('snapshot.create.start', msg['event_type'])
expected = {
self.assertEqual(4, len(self.notifier.notifications),
self.notifier.notifications)
- self.volume.delete_snapshot(self.context, snapshot_obj)
+ self.volume.delete_snapshot(self.context, snapshot)
msg = self.notifier.notifications[4]
self.assertEqual('snapshot.delete.start', msg['event_type'])
expected['status'] = 'available'
self.assertEqual(6, len(self.notifier.notifications),
self.notifier.notifications)
- snap = db.snapshot_get(context.get_admin_context(read_deleted='yes'),
- snapshot_id)
- self.assertEqual('deleted', snap['status'])
+ snap = objects.Snapshot.get_by_id(context.get_admin_context(
+ read_deleted='yes'), snapshot_id)
+ self.assertEqual('deleted', snap.status)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'], size=volume['size'],
metadata=test_meta)
- snapshot_id = snapshot['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
+ snapshot_id = snapshot.id
- snap = db.snapshot_get(context.get_admin_context(), snapshot_id)
- result_dict = dict(snap)
- result_meta = {
- result_dict['snapshot_metadata'][0].key:
- result_dict['snapshot_metadata'][0].value}
- self.assertEqual(test_meta, result_meta)
- self.volume.delete_snapshot(self.context, snapshot_obj)
+ result_dict = snapshot.metadata
+
+ self.assertEqual(test_meta, result_dict)
+ self.volume.delete_snapshot(self.context, snapshot)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
"""Test volume can't be deleted with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot_id = self._create_snapshot(volume['id'],
- size=volume['size'])['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
- self.assertEqual(snapshot_id,
- db.snapshot_get(context.get_admin_context(),
- snapshot_id).id)
+ snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ self.volume.create_snapshot(self.context, volume['id'], snapshot)
+ self.assertEqual(
+ snapshot.id, objects.Snapshot.get_by_id(self.context,
+ snapshot.id).id)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api.delete,
self.context,
volume)
- self.volume.delete_snapshot(self.context, snapshot_obj)
+ self.volume.delete_snapshot(self.context, snapshot)
self.volume.delete_volume(self.context, volume['id'])
def test_delete_volume_in_consistency_group(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot_id = self._create_snapshot(volume['id'],
- size=volume['size'])['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
- snapshot = db.snapshot_get(context.get_admin_context(),
- snapshot_id)
+ snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ self.volume.create_snapshot(self.context, volume['id'], snapshot)
volume_api = cinder.volume.api.API()
- snapshot['status'] = 'badstatus'
+ snapshot.status = 'badstatus'
self.assertRaises(exception.InvalidSnapshot,
volume_api.delete_snapshot,
self.context,
snapshot)
- snapshot['status'] = 'error'
- self.volume.delete_snapshot(self.context, snapshot_obj)
+ snapshot.status = 'error'
+ self.volume.delete_snapshot(self.context, snapshot)
self.volume.delete_volume(self.context, volume['id'])
def test_create_snapshot_force(self):
volume,
'fake_name',
'fake_description')
- db.snapshot_destroy(self.context, snapshot_ref['id'])
+ snapshot_ref.destroy()
db.volume_destroy(self.context, volume['id'])
# create volume and attach to the host
volume,
'fake_name',
'fake_description')
- db.snapshot_destroy(self.context, snapshot_ref['id'])
+ snapshot_ref.destroy()
db.volume_destroy(self.context, volume['id'])
def test_create_snapshot_from_bootable_volume(self):
self.assertTrue(vol_glance_meta)
# create snapshot from bootable volume
- snap_id = self._create_snapshot(volume_id)['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
- self.volume.create_snapshot(ctxt, volume_id, snapshot_obj)
+ snap = self._create_snapshot(volume_id)
+ self.volume.create_snapshot(ctxt, volume_id, snap)
# get snapshot's volume_glance_metadata
snap_glance_meta = db.volume_snapshot_glance_metadata_get(
- ctxt, snap_id)
+ ctxt, snap.id)
self.assertTrue(snap_glance_meta)
# ensure that volume's glance metadata is copied
self.assertDictMatch(vol_glance_dict, snap_glance_dict)
# ensure that snapshot's status is changed to 'available'
- snapshot_ref = db.snapshot_get(ctxt, snap_id)['status']
- self.assertEqual('available', snapshot_ref)
+ self.assertEqual('available', snap.status)
# cleanup resource
- db.snapshot_destroy(ctxt, snap_id)
+ snap.destroy()
db.volume_destroy(ctxt, volume_id)
def test_create_snapshot_from_bootable_volume_fail(self):
vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
self.assertTrue(vol_glance_meta)
snap = self._create_snapshot(volume_id)
- snap_id = snap['id']
- snap_stat = snap['status']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
- self.assertTrue(snap_id)
+ snap_stat = snap.status
+ self.assertTrue(snap.id)
self.assertTrue(snap_stat)
# set to return DB exception
self.volume.create_snapshot,
ctxt,
volume_id,
- snapshot_obj)
+ snap)
# get snapshot's volume_glance_metadata
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_snapshot_glance_metadata_get,
- ctxt, snap_id)
+ ctxt, snap.id)
# ensure that status of snapshot is 'error'
- snapshot_ref = db.snapshot_get(ctxt, snap_id)['status']
- self.assertEqual('error', snapshot_ref)
+ self.assertEqual('error', snap.status)
# cleanup resource
- db.snapshot_destroy(ctxt, snap_id)
+ snap.destroy()
db.volume_destroy(ctxt, volume_id)
def test_create_snapshot_from_bootable_volume_with_volume_metadata_none(
# set bootable flag of volume to True
db.volume_update(self.context, volume_id, {'bootable': True})
- snapshot_id = self._create_snapshot(volume['id'])['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
+ snapshot = self._create_snapshot(volume['id'])
+ self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_snapshot_glance_metadata_get,
- self.context, snapshot_id)
+ self.context, snapshot.id)
# ensure that status of snapshot is 'available'
- snapshot_ref = db.snapshot_get(self.context, snapshot_id)['status']
- self.assertEqual('available', snapshot_ref)
+ self.assertEqual('available', snapshot.status)
# cleanup resource
- db.snapshot_destroy(self.context, snapshot_id)
+ snapshot.destroy()
db.volume_destroy(self.context, volume_id)
def test_delete_busy_snapshot(self):
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
- snapshot_id = self._create_snapshot(volume_id,
- size=volume['size'])['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume_id, snapshot_obj)
+ snapshot = self._create_snapshot(volume_id, size=volume['size'])
+ self.volume.create_snapshot(self.context, volume_id, snapshot)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
- self.volume.delete_snapshot(self.context, snapshot_obj)
- snapshot_ref = db.snapshot_get(self.context, snapshot_id)
+ snapshot_id = snapshot.id
+ self.volume.delete_snapshot(self.context, snapshot)
+ snapshot_ref = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual("available", snapshot_ref.status)
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
- snapshot_id = self._create_snapshot(volume_id)['id']
- snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
- self.volume.create_snapshot(self.context, volume_id, snapshot_obj)
+ snapshot = self._create_snapshot(volume_id)
+ snapshot_id = snapshot.id
+ self.volume.create_snapshot(self.context, volume_id, snapshot)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
- self.volume.delete_snapshot(self.context, snapshot_obj)
- snapshot_ref = db.snapshot_get(self.context, snapshot_id)
+ self.volume.delete_snapshot(self.context, snapshot)
+ snapshot_ref = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual("available", snapshot_ref.status)
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.delete_snapshot,
self.context,
- snapshot_obj)
+ snapshot)
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.delete_volume,
self.context,
# create raw snapshot
volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'])
- snapshot_obj = objects.Snapshot.get_by_id(self.context,
- snapshot['id'])
- self.assertIsNone(snapshot['display_name'])
+ snapshot_id = snapshot.id
+ self.assertIsNone(snapshot.display_name)
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
- volume_api.update_snapshot(self.context, snapshot_obj, update_dict)
+ volume_api.update_snapshot(self.context, snapshot, update_dict)
# read changes from db
- snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
- self.assertEqual('test update name', snap['display_name'])
+ snap = objects.Snapshot.get_by_id(context.get_admin_context(),
+ snapshot_id)
+ self.assertEqual('test update name', snap.display_name)
@mock.patch.object(QUOTAS, 'reserve')
def test_extend_volume(self, reserve):
'cgsnapshot_id': '1'}
snp3 = {'id': '3', 'name': 'snap 3',
'cgsnapshot_id': '1'}
+ snp1_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp1)
+ snp2_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp2)
+ snp3_obj = fake_snapshot.fake_snapshot_obj(self.context, **snp3)
volumes = []
snapshots = []
volumes.append(vol1)
volumes.append(vol2)
volumes.append(vol3)
- snapshots.append(snp2)
- snapshots.append(snp3)
- snapshots.append(snp1)
+ snapshots.append(snp2_obj)
+ snapshots.append(snp3_obj)
+ snapshots.append(snp1_obj)
i = 0
for vol in volumes:
snap = snapshots[i]
i += 1
- self.assertNotEqual(vol['snapshot_id'], snap['id'])
+ self.assertNotEqual(vol['snapshot_id'], snap.id)
sorted_snaps = self.volume._sort_snapshots(volumes, snapshots)
i = 0
for vol in volumes:
snap = sorted_snaps[i]
i += 1
- self.assertEqual(vol['snapshot_id'], snap['id'])
+ self.assertEqual(vol['snapshot_id'], snap.id)
snapshots[2]['id'] = '9999'
self.assertRaises(exception.SnapshotNotFound,
cgsnapshot = db.cgsnapshot_create(context.get_admin_context(), cgsnap)
# Create a snapshot object
- snap = {}
- snap['volume_size'] = size
- snap['user_id'] = 'fake'
- snap['project_id'] = 'fake'
- snap['volume_id'] = volume_id
- snap['status'] = "available"
- snap['cgsnapshot_id'] = cgsnapshot['id']
- snapshot = db.snapshot_create(context.get_admin_context(), snap)
+ snap = objects.Snapshot(context.get_admin_context())
+ snap.volume_size = size
+ snap.user_id = 'fake'
+ snap.project_id = 'fake'
+ snap.volume_id = volume_id
+ snap.status = "available"
+ snap.cgsnapshot_id = cgsnapshot['id']
+ snap.create()
- return cgsnapshot, snapshot
+ return cgsnapshot, snap
@mock.patch('cinder.volume.driver.VolumeDriver.create_consistencygroup',
autospec=True,
self.db_attrs[i]['volume_id'] = 1
# Not in window
- db.snapshot_create(self.ctx, self.db_attrs[0])
+ del self.db_attrs[0]['id']
+ snap1 = objects.Snapshot(self.ctx, **self.db_attrs[0])
+ snap1.create()
# In - deleted in window
- db.snapshot_create(self.ctx, self.db_attrs[1])
+ del self.db_attrs[1]['id']
+ snap2 = objects.Snapshot(self.ctx, **self.db_attrs[1])
+ snap2.create()
# In - deleted after window
- db.snapshot_create(self.ctx, self.db_attrs[2])
+ del self.db_attrs[2]['id']
+ snap3 = objects.Snapshot(self.ctx, **self.db_attrs[2])
+ snap3.create()
# In - created in window
- db.snapshot_create(self.context, self.db_attrs[3])
+ del self.db_attrs[3]['id']
+ snap4 = objects.Snapshot(self.ctx, **self.db_attrs[3])
+ snap4.create()
+
# Not of window.
- db.snapshot_create(self.context, self.db_attrs[4])
+ del self.db_attrs[4]['id']
+ snap5 = objects.Snapshot(self.ctx, **self.db_attrs[4])
+ snap5.create()
- snapshots = db.snapshot_get_active_by_window(
+ snapshots = objects.SnapshotList.get_active_by_window(
self.context,
datetime.datetime(1, 3, 1, 1, 1, 1),
- datetime.datetime(1, 4, 1, 1, 1, 1),
- project_id='p1')
+ datetime.datetime(1, 4, 1, 1, 1, 1)).objects
self.assertEqual(3, len(snapshots))
- self.assertEqual(u'2', snapshots[0].id)
- self.assertEqual(u'1', snapshots[0].volume.id)
- self.assertEqual(u'3', snapshots[1].id)
- self.assertEqual(u'1', snapshots[1].volume.id)
- self.assertEqual(u'4', snapshots[2].id)
- self.assertEqual(u'1', snapshots[2].volume.id)
+ self.assertEqual(snap2.id, snapshots[0].id)
+ self.assertEqual(u'1', snapshots[0].volume_id)
+ self.assertEqual(snap3.id, snapshots[1].id)
+ self.assertEqual(u'1', snapshots[1].volume_id)
+ self.assertEqual(snap4.id, snapshots[2].id)
+ self.assertEqual(u'1', snapshots[2].volume_id)
class DriverTestCase(test.TestCase):
from cinder import context
from cinder import db
from cinder import exception
+from cinder import objects
from cinder import test
def setUp(self):
super(VolumeGlanceMetadataTestCase, self).setUp()
self.ctxt = context.get_admin_context()
+ objects.register_all()
def test_vol_glance_metadata_bad_vol_id(self):
ctxt = context.get_admin_context()
def test_vol_glance_metadata_copy_to_snapshot(self):
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
- db.snapshot_create(ctxt, {'id': 100, 'volume_id': 1})
+ snap = objects.Snapshot(ctxt, volume_id=1)
+ snap.create()
db.volume_glance_metadata_create(ctxt, 1, 'key1', 'value1')
- db.volume_glance_metadata_copy_to_snapshot(ctxt, 100, 1)
+ db.volume_glance_metadata_copy_to_snapshot(ctxt, snap.id, 1)
- expected_meta = {'snapshot_id': '100',
+ expected_meta = {'snapshot_id': snap.id,
'key': 'key1',
'value': 'value1'}
- for meta in db.volume_snapshot_glance_metadata_get(ctxt, 100):
+ for meta in db.volume_snapshot_glance_metadata_get(ctxt, snap.id):
for (key, value) in expected_meta.items():
self.assertEqual(value, meta[key])
+ snap.destroy()
def test_vol_glance_metadata_copy_from_volume_to_volume(self):
ctxt = context.get_admin_context()
vol1 = db.volume_create(self.ctxt, {})
vol2 = db.volume_create(self.ctxt, {})
db.volume_glance_metadata_create(self.ctxt, vol1['id'], 'm1', 'v1')
- snapshot = db.snapshot_create(self.ctxt, {'volume_id': vol1['id']})
- db.volume_glance_metadata_copy_to_snapshot(self.ctxt, snapshot['id'],
+ snapshot = objects.Snapshot(self.ctxt, volume_id=vol1['id'])
+ snapshot.create()
+ db.volume_glance_metadata_copy_to_snapshot(self.ctxt, snapshot.id,
vol1['id'])
db.volume_glance_metadata_copy_to_volume(self.ctxt, vol2['id'],
- snapshot['id'])
+ snapshot.id)
metadata = db.volume_glance_metadata_get(self.ctxt, vol2['id'])
metadata = {m['key']: m['value'] for m in metadata}
self.assertEqual({'m1': 'v1'}, metadata)
def test_volume_snapshot_glance_metadata_get_nonexistent(self):
vol = db.volume_create(self.ctxt, {})
- snapshot = db.snapshot_create(self.ctxt, {'volume_id': vol['id']})
+ snapshot = objects.Snapshot(self.ctxt, volume_id=vol['id'])
+ snapshot.create()
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_snapshot_glance_metadata_get,
- self.ctxt, snapshot['id'])
+ self.ctxt, snapshot.id)
+ snapshot.destroy()
vol['status'] = "available"
vol['attach_status'] = "detached"
vol['metadata'] = {"test_key": "test_val"}
+ vol['size'] = 1
volume = db.volume_create(self.context, vol)
- snpshot = {
- 'id': 1,
- 'volume_id': 'fake_id',
+ kwargs = {
'status': "creating",
'progress': '0%',
- 'volume_size': 0,
'display_name': 'fake_name',
'display_description': 'fake_description'}
- snapshot = db.snapshot_create(self.context, snpshot)
+ snapshot = tests_utils.create_snapshot(self.context, vol['id'],
+ **kwargs)
source_group = tests_utils.create_consistencygroup(
self.context,
group2 = objects.ConsistencyGroup.get_by_id(self.context, group2.id)
self.fake_volume = jsonutils.to_primitive(volume)
self.fake_volume_metadata = volume["volume_metadata"]
- self.fake_snapshot = jsonutils.to_primitive(snapshot)
- self.fake_snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
- **snpshot)
+ self.fake_snapshot = snapshot
self.fake_reservations = ["RESERVATION"]
self.fake_cg = group
self.fake_cg2 = group2
if 'snapshot' in expected_msg:
snapshot = expected_msg['snapshot']
del expected_msg['snapshot']
- expected_msg['snapshot_id'] = snapshot['id']
+ expected_msg['snapshot_id'] = snapshot.id
expected_msg['snapshot'] = snapshot
if 'host' in expected_msg:
del expected_msg['host']
self._test_volume_api('create_snapshot',
rpc_method='cast',
volume=self.fake_volume,
- snapshot=self.fake_snapshot_obj)
+ snapshot=self.fake_snapshot)
def test_delete_snapshot(self):
self._test_volume_api('delete_snapshot',
rpc_method='cast',
- snapshot=self.fake_snapshot_obj,
+ snapshot=self.fake_snapshot,
host='fake_host',
unmanage_only=False)
def test_delete_snapshot_with_unmanage_only(self):
self._test_volume_api('delete_snapshot',
rpc_method='cast',
- snapshot=self.fake_snapshot_obj,
+ snapshot=self.fake_snapshot,
host='fake_host',
unmanage_only=True)
import datetime
import mock
+import six
from oslo_concurrency import processutils
from oslo_config import cfg
from cinder import context
from cinder import exception
from cinder import test
+from cinder.tests.unit import fake_snapshot
+from cinder.tests.unit import fake_volume
from cinder import utils
from cinder.volume import throttling
from cinder.volume import utils as volume_utils
-
CONF = cfg.CONF
'snapshot.test_suffix',
mock_usage.return_value)
- def test_usage_from_snapshot(self):
+ @mock.patch('cinder.objects.Volume.get_by_id')
+ def test_usage_from_snapshot(self, volume_get_by_id):
+ raw_volume = {
+ 'id': '55614621',
+ 'availability_zone': 'nova'
+ }
+ ctxt = context.get_admin_context()
+ volume_obj = fake_volume.fake_volume_obj(ctxt, **raw_volume)
+ volume_get_by_id.return_value = volume_obj
raw_snapshot = {
'project_id': '12b0330ec2584a',
'user_id': '158cba1b8c2bb6008e',
- 'volume': {'availability_zone': 'nova'},
+ 'volume': volume_obj,
'volume_id': '55614621',
'volume_size': 1,
'id': '343434a2',
'created_at': '2014-12-11T10:10:00',
'status': 'pause',
'deleted': '',
- 'metadata': {'fake_snap_meta_key': 'fake_snap_meta_value'},
+ 'snapshot_metadata': [{'key': 'fake_snap_meta_key',
+ 'value': 'fake_snap_meta_value'}],
+ 'expected_attrs': ['metadata'],
}
- usage_info = volume_utils._usage_from_snapshot(raw_snapshot)
+
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(ctxt, **raw_snapshot)
+ usage_info = volume_utils._usage_from_snapshot(snapshot_obj)
expected_snapshot = {
'tenant_id': '12b0330ec2584a',
'user_id': '158cba1b8c2bb6008e',
'volume_size': 1,
'snapshot_id': '343434a2',
'display_name': '11',
- 'created_at': '2014-12-11T10:10:00',
+ 'created_at': 'DONTCARE',
'status': 'pause',
'deleted': '',
- 'metadata': "{'fake_snap_meta_key': 'fake_snap_meta_value'}",
+ 'metadata': six.text_type({'fake_snap_meta_key':
+ u'fake_snap_meta_value'}),
}
- self.assertEqual(expected_snapshot, usage_info)
+ self.assertDictMatch(expected_snapshot, usage_info)
@mock.patch('cinder.db.volume_glance_metadata_get')
@mock.patch('cinder.db.volume_attachment_get_used_by_volume_id')
display_name='test_snapshot',
display_description='this is a test snapshot',
cgsnapshot_id = None,
- status='creating'):
+ status='creating',
+ **kwargs):
vol = db.volume_get(ctxt, volume_id)
- snap = {}
- snap['volume_id'] = volume_id
- snap['user_id'] = ctxt.user_id
- snap['project_id'] = ctxt.project_id
- snap['status'] = status
- snap['volume_size'] = vol['size']
- snap['display_name'] = display_name
- snap['display_description'] = display_description
- snap['cgsnapshot_id'] = cgsnapshot_id
- return db.snapshot_create(ctxt, snap)
+ snap = objects.Snapshot(ctxt)
+ snap.volume_id = volume_id
+ snap.user_id = ctxt.user_id or 'fake_user_id'
+ snap.project_id = ctxt.project_id or 'fake_project_id'
+ snap.status = status
+ snap.volume_size = vol['size']
+ snap.display_name = display_name
+ snap.display_description = display_description
+ snap.cgsnapshot_id = cgsnapshot_id
+ snap.create()
+ return snap
def create_consistencygroup(ctxt,
raise exception.InvalidInput(reason=msg)
if snapshot and volume_type:
- if volume_type['id'] != snapshot['volume_type_id']:
+ if volume_type['id'] != snapshot.volume_type_id:
if not self._retype_is_possible(context,
volume_type['id'],
- snapshot['volume_type_id'],
+ snapshot.volume_type_id,
volume_type):
msg = _("Invalid volume_type provided: %s (requested "
"type is not compatible; recommend omitting "
# so build the resource tag manually for now.
LOG.info(_LI("Snapshot retrieved successfully."),
resource={'type': 'snapshot',
- 'id': snapshot['id']})
+ 'id': snapshot.id})
return snapshot
def get_volume(self, context, volume_id):
@wrap_check_policy
def delete_snapshot(self, context, snapshot, force=False,
unmanage_only=False):
- if not force and snapshot['status'] not in ["available", "error"]:
+ if not force and snapshot.status not in ["available", "error"]:
LOG.error(_LE('Unable to delete snapshot: %(snap_id)s, '
'due to invalid status. '
'Status must be available or '
'error, not %(snap_status)s.'),
- {'snap_id': snapshot['id'],
- 'snap_status': snapshot['status']})
+ {'snap_id': snapshot.id,
+ 'snap_status': snapshot.status})
msg = _("Volume Snapshot status must be available or error.")
raise exception.InvalidSnapshot(reason=msg)
- cgsnapshot_id = snapshot.get('cgsnapshot_id', None)
+ cgsnapshot_id = snapshot.cgsnapshot_id
if cgsnapshot_id:
msg = _('Unable to delete snapshot %s because it is part of a '
- 'consistency group.') % snapshot['id']
+ 'consistency group.') % snapshot.id
LOG.error(msg)
raise exception.InvalidSnapshot(reason=msg)
- snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ snapshot_obj = self.get_snapshot(context, snapshot.id)
snapshot_obj.status = 'deleting'
snapshot_obj.save()
def get_snapshot_metadata(self, context, snapshot):
"""Get all metadata associated with a snapshot."""
- snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ snapshot_obj = self.get_snapshot(context, snapshot.id)
LOG.info(_LI("Get snapshot metadata completed successfully."),
resource=snapshot)
return snapshot_obj.metadata
def delete_snapshot_metadata(self, context, snapshot, key):
"""Delete the given metadata item from a snapshot."""
- snapshot_obj = self.get_snapshot(context, snapshot['id'])
+ snapshot_obj = self.get_snapshot(context, snapshot.id)
snapshot_obj.delete_metadata_key(context, key)
LOG.info(_LI("Delete snapshot metadata completed successfully."),
resource=snapshot)
"""
def validate_snap_size(size):
- if snapshot and size < snapshot['volume_size']:
+ if snapshot and size < snapshot.volume_size:
msg = _("Volume size '%(size)s'GB cannot be smaller than"
" the snapshot size %(snap_size)sGB. "
"They must be >= original snapshot size.")
msg = msg % {'size': size,
- 'snap_size': snapshot['volume_size']}
+ 'snap_size': snapshot.volume_size}
raise exception.InvalidInput(reason=msg)
def validate_source_size(size):
if not size and source_volume:
size = source_volume['size']
elif not size and snapshot:
- size = snapshot['volume_size']
+ size = snapshot.volume_size
size = utils.as_int(size)
LOG.debug("Validating volume '%(size)s' using %(functors)s" %
self._notify_about_snapshot_usage(context, snapshot, "create.end")
LOG.info(_LI("Create snapshot completed successfully"),
resource=snapshot)
- return snapshot.id
@locked_snapshot_operation
def delete_snapshot(self, context, snapshot, unmanage_only=False):
snapshot._context = context
project_id = snapshot.project_id
- self._notify_about_snapshot_usage(
- context, snapshot, "delete.start")
+ self._notify_about_snapshot_usage(context, snapshot, "delete.start")
try:
# NOTE(flaper87): Verify the driver is enabled
'gigabytes': -snapshot.volume_size,
}
volume_ref = self.db.volume_get(context, snapshot.volume_id)
- QUOTAS.add_volume_type_opts(context,
- reserve_opts,
+ QUOTAS.add_volume_type_opts(context, reserve_opts,
volume_ref.get('volume_type_id'))
- reservations = QUOTAS.reserve(context,
- project_id=project_id,
+ reservations = QUOTAS.reserve(context, project_id=project_id,
**reserve_opts)
except Exception:
reservations = None
QUOTAS.commit(context, reservations, project_id=project_id)
LOG.info(_LI("Delete snapshot completed successfully"),
resource=snapshot)
- return True
def attach_volume(self, context, volume_id, instance_uuid, host_name,
mountpoint, mode):
"not in a valid state. Valid states are: "
"%(valid)s.") %
{'group': group.id,
- 'snap': snap['id'],
+ 'snap': snap.id,
'valid': VALID_CREATE_CG_SRC_SNAP_STATUS})
raise exception.InvalidConsistencyGroup(reason=msg)
sorted_snapshots = []
for vol in volumes:
found_snaps = filter(
- lambda snap: snap['id'] == vol['snapshot_id'], snapshots)
+ lambda snap: snap.id == vol['snapshot_id'], snapshots)
if not found_snaps:
LOG.error(_LE("Source snapshot cannot be found for target "
"volume %(volume_id)s."),
usage_info)
-def _usage_from_snapshot(snapshot_ref, **extra_usage_info):
+def _usage_from_snapshot(snapshot, **extra_usage_info):
usage_info = {
- 'tenant_id': snapshot_ref['project_id'],
- 'user_id': snapshot_ref['user_id'],
- 'availability_zone': snapshot_ref['volume']['availability_zone'],
- 'volume_id': snapshot_ref['volume_id'],
- 'volume_size': snapshot_ref['volume_size'],
- 'snapshot_id': snapshot_ref['id'],
- 'display_name': snapshot_ref['display_name'],
- 'created_at': str(snapshot_ref['created_at']),
- 'status': snapshot_ref['status'],
- 'deleted': null_safe_str(snapshot_ref['deleted']),
- 'metadata': null_safe_str(snapshot_ref.get('metadata')),
+ 'tenant_id': snapshot.project_id,
+ 'user_id': snapshot.user_id,
+ 'availability_zone': snapshot.volume['availability_zone'],
+ 'volume_id': snapshot.volume_id,
+ 'volume_size': snapshot.volume_size,
+ 'snapshot_id': snapshot.id,
+ 'display_name': snapshot.display_name,
+ 'created_at': str(snapshot.created_at),
+ 'status': snapshot.status,
+ 'deleted': null_safe_str(snapshot.deleted),
+ 'metadata': null_safe_str(snapshot.metadata),
}
usage_info.update(extra_usage_info)