self.assertEqual(expected, res_dict)
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+ def test_create_volume_exists(self):
+ # Create the volume type and a volume with the volume type.
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+ db.volume_create(context.get_admin_context(),
+ {'id': 'fake_id',
+ 'display_description': 'Test Desc',
+ 'size': 20,
+ 'status': 'creating',
+ 'instance_uuid': None,
+ 'host': 'dummy',
+ 'volume_type_id': volume_type['id']})
+
+ body = {"encryption": {'cipher': 'cipher',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ 'provider': 'fake_provider',
+ 'volume_type_id': volume_type['id']}}
+
+ # Try to create encryption specs for a volume type
+ # with a volume.
+ res = self._get_response(volume_type, req_method='POST',
+ req_body=json.dumps(body),
+ req_headers='application/json')
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'badRequest': {
+ 'code': 400,
+ 'message': ('Cannot create encryption specs. '
+ 'Volume type in use.')
+ }
+ }
+ self.assertEqual(expected, res_dict)
+ db.volume_destroy(context.get_admin_context(), 'fake_id')
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
def _encryption_create_bad_body(self, body,
msg='Create body is not valid.'):
volume_type = {
db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
def test_delete_with_volume_in_use(self):
- # Create the volume type and volumes with the volume type.
+ # Create the volume type
volume_type = {
'id': 'fake_type_id',
'name': 'fake_type',
}
db.volume_type_create(context.get_admin_context(), volume_type)
- db.volume_create(context.get_admin_context(),
- {'id': 'fake_id',
- 'display_description': 'Test Desc',
- 'size': 20,
- 'status': 'creating',
- 'instance_uuid': None,
- 'host': 'dummy',
- 'volume_type_id': volume_type['id']})
- db.volume_create(context.get_admin_context(),
- {'id': 'fake_id2',
- 'display_description': 'Test Desc2',
- 'size': 2,
- 'status': 'creating',
- 'instance_uuid': None,
- 'host': 'dummy',
- 'volume_type_id': volume_type['id']})
body = {"encryption": {'cipher': 'cipher',
'key_size': 128,
'control_location': 'front-end',
res_dict = json.loads(res.body)
self.assertEqual(volume_type['id'], res_dict['volume_type_id'])
+ # Create volumes with the volume type
+ db.volume_create(context.get_admin_context(),
+ {'id': 'fake_id',
+ 'display_description': 'Test Desc',
+ 'size': 20,
+ 'status': 'creating',
+ 'instance_uuid': None,
+ 'host': 'dummy',
+ 'volume_type_id': volume_type['id']})
+
+ db.volume_create(context.get_admin_context(),
+ {'id': 'fake_id2',
+ 'display_description': 'Test Desc2',
+ 'size': 2,
+ 'status': 'creating',
+ 'instance_uuid': None,
+ 'host': 'dummy',
+ 'volume_type_id': volume_type['id']})
+
# Delete, and test that there is an error since volumes exist
res = self._get_response(volume_type, req_method='DELETE',
req_headers='application/json',