from cinder import test
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
+from cinder.tests.unit.image import fake as fake_image
+from cinder.tests.unit.keymgr import mock_key_mgr
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import create_volume
from cinder.volume.flows.manager import create_volume as create_volume_manager
task._cast_create_volume(self.ctxt, spec, props)
+ @mock.patch('cinder.volume.volume_types.is_encrypted')
+ @mock.patch('cinder.volume.flows.api.create_volume.'
+ 'ExtractVolumeRequestTask.'
+ '_get_volume_type_id')
+ def test_extract_volume_request_from_image_encrypted(
+ self,
+ fake_get_volume_type_id,
+ fake_is_encrypted):
+
+ fake_image_service = fake_image.FakeImageService()
+ image_id = 1
+ image_meta = {}
+ image_meta['id'] = image_id
+ image_meta['status'] = 'active'
+ image_meta['size'] = 1
+ fake_image_service.create(self.ctxt, image_meta)
+ fake_key_manager = mock_key_mgr.MockKeyManager()
+
+ task = create_volume.ExtractVolumeRequestTask(
+ fake_image_service,
+ {'nova'})
+
+ fake_is_encrypted.return_value = True
+ self.assertRaises(exception.InvalidInput,
+ task.execute,
+ self.ctxt,
+ size=1,
+ snapshot=None,
+ image_id=image_id,
+ source_volume=None,
+ availability_zone='nova',
+ volume_type=None,
+ metadata=None,
+ key_manager=fake_key_manager,
+ source_replica=None,
+ consistencygroup=None,
+ cgsnapshot=None)
+
+ @mock.patch('cinder.volume.volume_types.is_encrypted')
+ @mock.patch('cinder.volume.volume_types.get_volume_type_qos_specs')
+ @mock.patch('cinder.volume.flows.api.create_volume.'
+ 'ExtractVolumeRequestTask.'
+ '_get_volume_type_id')
+ def test_extract_volume_request_from_image(
+ self,
+ fake_get_type_id,
+ fake_get_qos,
+ fake_is_encrypted):
+
+ fake_image_service = fake_image.FakeImageService()
+ image_id = 2
+ image_meta = {}
+ image_meta['id'] = image_id
+ image_meta['status'] = 'active'
+ image_meta['size'] = 1
+ fake_image_service.create(self.ctxt, image_meta)
+ fake_key_manager = mock_key_mgr.MockKeyManager()
+ volume_type = 'type1'
+
+ task = create_volume.ExtractVolumeRequestTask(
+ fake_image_service,
+ {'nova'})
+
+ fake_is_encrypted.return_value = False
+ fake_get_type_id.return_value = 1
+ fake_get_qos.return_value = {'qos_specs': None}
+ result = task.execute(self.ctxt,
+ size=1,
+ snapshot=None,
+ image_id=image_id,
+ source_volume=None,
+ availability_zone='nova',
+ volume_type=volume_type,
+ metadata=None,
+ key_manager=fake_key_manager,
+ source_replica=None,
+ consistencygroup=None,
+ cgsnapshot=None)
+ expected_result = {'size': 1,
+ 'snapshot_id': None,
+ 'source_volid': None,
+ 'availability_zone': 'nova',
+ 'volume_type': volume_type,
+ 'volume_type_id': 1,
+ 'encryption_key_id': None,
+ 'qos_specs': None,
+ 'source_replicaid': None,
+ 'consistencygroup_id': None,
+ 'cgsnapshot_id': None, }
+ self.assertEqual(expected_result, result)
+
class CreateVolumeFlowManagerTestCase(test.TestCase):