]
+def create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
+ **kwargs):
+ """Create a snapshot object."""
+ metadata = metadata or {}
+ snap = objects.Snapshot(ctxt or context.get_admin_context())
+ snap.volume_size = size
+ snap.user_id = 'fake'
+ snap.project_id = 'fake'
+ snap.volume_id = volume_id
+ snap.status = "creating"
+ if metadata is not None:
+ snap.metadata = metadata
+ snap.update(kwargs)
+
+ snap.create()
+ return snap
+
+
class FakeImageService(object):
def __init__(self, db_driver=None, image_service=None):
pass
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- snapshot_id = self._create_snapshot(volume_src['id'],
- size=volume_src['size'])['id']
+ snapshot_id = create_snapshot(volume_src['id'],
+ size=volume_src['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot_obj)
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- snapshot_id = self._create_snapshot(volume_src['id'],
- size=volume_src['size'])['id']
+ snapshot_id = create_snapshot(volume_src['id'],
+ size=volume_src['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.driver._initialized = False
# no lock
self.volume.create_volume(self.context, src_vol_id)
- snap_id = self._create_snapshot(src_vol_id,
- size=src_vol['size'])['id']
+ snap_id = create_snapshot(src_vol_id,
+ size=src_vol['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snapshot_obj)
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from snapshot
- snapshot_id = self._create_snapshot(src_vol['id'])['id']
+ snapshot_id = create_snapshot(src_vol['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, src_vol['id'], snapshot_obj)
volume = db.volume_get(self.context, src_vol_id)
# create snapshot of volume
- snapshot_id = self._create_snapshot(volume['id'])['id']
+ snapshot_id = create_snapshot(volume['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, volume['id'], snapshot_obj)
self.volume.create_volume(self.context, src_vol_id)
# create snapshot
- snap_id = self._create_snapshot(src_vol_id,
- size=src_vol['size'])['id']
+ snap_id = create_snapshot(src_vol_id,
+ size=src_vol['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snapshot_obj)
availability_zone='az2',
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- snapshot = self._create_snapshot(volume_src['id'])
+ snapshot = create_snapshot(volume_src['id'])
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot)
# This will allow us to test cross-node interactions
pass
- @staticmethod
- def _create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
- **kwargs):
- """Create a snapshot object."""
- metadata = metadata or {}
- snap = objects.Snapshot(ctxt or context.get_admin_context())
- snap.volume_size = size
- snap.user_id = 'fake'
- snap.project_id = 'fake'
- snap.volume_id = volume_id
- snap.status = "creating"
- if metadata is not None:
- snap.metadata = metadata
- snap.update(kwargs)
-
- snap.create()
- return snap
-
def test_create_delete_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(
self.assertEqual(2, len(self.notifier.notifications),
self.notifier.notifications)
- snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ snapshot = create_snapshot(volume['id'], size=volume['size'])
snapshot_id = snapshot.id
self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertEqual(
"""Test snapshot can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, **self.volume_params)
- snapshot = self._create_snapshot(volume['id'], size=volume['size'],
- metadata=test_meta)
+ snapshot = create_snapshot(volume['id'], size=volume['size'],
+ metadata=test_meta)
snapshot_id = snapshot.id
result_dict = snapshot.metadata
"""Test volume can't be deleted with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
def test_can_delete_errored_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(self.context, CONF.host)
- snapshot = self._create_snapshot(volume.id, size=volume['size'],
- ctxt=self.context, status='bad')
+ snapshot = create_snapshot(volume.id, size=volume['size'],
+ ctxt=self.context, status='bad')
self.assertRaises(exception.InvalidSnapshot,
self.volume_api.delete_snapshot,
self.context,
self.assertTrue(vol_glance_meta)
# create snapshot from bootable volume
- snap = self._create_snapshot(volume_id)
+ snap = create_snapshot(volume_id)
self.volume.create_snapshot(ctxt, volume_id, snap)
# get snapshot's volume_glance_metadata
ctxt = context.get_admin_context()
vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
self.assertTrue(vol_glance_meta)
- snap = self._create_snapshot(volume_id)
+ snap = create_snapshot(volume_id)
snap_stat = snap.status
self.assertTrue(snap.id)
self.assertTrue(snap_stat)
# set bootable flag of volume to True
db.volume_update(self.context, volume_id, {'bootable': True})
- snapshot = self._create_snapshot(volume['id'])
+ snapshot = create_snapshot(volume['id'])
self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_snapshot_glance_metadata_get,
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
- snapshot = self._create_snapshot(volume_id, size=volume['size'])
+ snapshot = create_snapshot(volume_id, size=volume['size'])
self.volume.create_snapshot(self.context, volume_id, snapshot)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
- snapshot = self._create_snapshot(volume_id)
+ snapshot = create_snapshot(volume_id)
snapshot_id = snapshot.id
self.volume.create_snapshot(self.context, volume_id, snapshot)
def test_volume_api_update_snapshot(self):
# create raw snapshot
volume = tests_utils.create_volume(self.context, **self.volume_params)
- snapshot = self._create_snapshot(volume['id'])
+ snapshot = create_snapshot(volume['id'])
snapshot_id = snapshot.id
self.assertIsNone(snapshot.display_name)
# use volume.api to update name
"""Test volume deletion with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
"""Test volume deletion with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
- snapshot = self._create_snapshot(volume['id'], size=volume['size'])
+ snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, volume['id'], snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
@ddt.ddt
-class VolumeMigrationTestCase(VolumeTestCase):
+class VolumeMigrationTestCase(BaseVolumeTestCase):
+
+ def setUp(self):
+ super(VolumeMigrationTestCase, self).setUp()
+ self._clear_patch = mock.patch('cinder.volume.utils.clear_volume',
+ autospec=True)
+ self._clear_patch.start()
+ self.expected_status = 'available'
+
+ def tearDown(self):
+ super(VolumeMigrationTestCase, self).tearDown()
+ self._clear_patch.stop()
+
def test_migrate_volume_driver(self):
"""Test volume migration done by driver."""
# stub out driver and rpc functions
volume.previous_status = 'available'
volume.save()
if snap:
- self._create_snapshot(volume.id, size=volume.size)
+ create_snapshot(volume.id, size=volume.size)
if driver or diff_equal:
host_obj = {'host': CONF.host, 'capabilities': {}}
else: