import time
+import mock
+
from cinder import context
+from cinder import exception
from cinder import test
+from cinder.tests import fake_snapshot
+from cinder.tests import fake_volume
from cinder.volume.flows.api import create_volume
+from cinder.volume.flows.manager import create_volume as create_volume_manager
class fake_scheduler_rpc_api(object):
fake_db())
task._cast_create_volume(self.ctxt, spec, props)
+
+
+class CreateVolumeFlowManagerTestCase(test.TestCase):
+
+ def setUp(self):
+ super(CreateVolumeFlowManagerTestCase, self).setUp()
+ self.ctxt = context.get_admin_context()
+
+ @mock.patch('cinder.volume.flows.manager.create_volume.'
+ 'CreateVolumeFromSpecTask.'
+ '_handle_bootable_volume_glance_meta')
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create_from_snapshot(self, snapshot_get_by_id, handle_bootable):
+ fake_db = mock.MagicMock()
+ fake_driver = mock.MagicMock()
+ fake_manager = create_volume_manager.CreateVolumeFromSpecTask(
+ fake_db, fake_driver)
+ volume = fake_volume.fake_db_volume()
+ orig_volume_db = mock.MagicMock(id=10, bootable=True)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctxt)
+ snapshot_get_by_id.return_value = snapshot_obj
+ fake_db.volume_get.return_value = orig_volume_db
+
+ fake_manager._create_from_snapshot(self.ctxt, volume,
+ snapshot_obj.id)
+ fake_driver.create_volume_from_snapshot.assert_called_once_with(
+ volume, snapshot_obj)
+ fake_db.volume_get.assert_called_once_with(self.ctxt,
+ snapshot_obj.volume_id)
+ handle_bootable.assert_called_once_with(self.ctxt, volume['id'],
+ snapshot_id=snapshot_obj.id)
+
+ @mock.patch('cinder.objects.Snapshot.get_by_id')
+ def test_create_from_snapshot_update_failure(self, snapshot_get_by_id):
+ fake_db = mock.MagicMock()
+ fake_driver = mock.MagicMock()
+ fake_manager = create_volume_manager.CreateVolumeFromSpecTask(
+ fake_db, fake_driver)
+ volume = fake_volume.fake_db_volume()
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctxt)
+ snapshot_get_by_id.return_value = snapshot_obj
+ fake_db.volume_get.side_effect = exception.CinderException
+
+ self.assertRaises(exception.MetadataUpdateFailure,
+ fake_manager._create_from_snapshot, self.ctxt,
+ volume, snapshot_obj.id)
+ fake_driver.create_volume_from_snapshot.assert_called_once_with(
+ volume, snapshot_obj)
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI
from cinder.image import glance
+from cinder import objects
from cinder.openstack.common import log as logging
from cinder import utils
from cinder.volume.flows import common
def _create_from_snapshot(self, context, volume_ref, snapshot_id,
**kwargs):
volume_id = volume_ref['id']
- snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
model_update = self.driver.create_volume_from_snapshot(volume_ref,
- snapshot_ref)
+ snapshot)
# NOTE(harlowja): Subtasks would be useful here since after this
# point the volume has already been created and further failures
# will not destroy the volume (although they could in the future).
make_bootable = False
try:
originating_vref = self.db.volume_get(context,
- snapshot_ref['volume_id'])
+ snapshot.volume_id)
make_bootable = originating_vref.bootable
except exception.CinderException as ex:
LOG.exception(_LE("Failed fetching snapshot %(snapshot_id)s "
" flag using the provided glance snapshot "
"%(snapshot_ref_id)s volume reference") %
{'snapshot_id': snapshot_id,
- 'snapshot_ref_id': snapshot_ref['volume_id']})
+ 'snapshot_ref_id': snapshot.volume_id})
raise exception.MetadataUpdateFailure(reason=ex)
if make_bootable:
self._handle_bootable_volume_glance_meta(context, volume_id,