self.controller._delete(req, 1)
self.assertEqual(1, len(self.notifier.notifications))
+ @mock.patch('cinder.volume.volume_types.destroy')
+ @mock.patch('cinder.volume.volume_types.get_volume_type')
+ @mock.patch('cinder.policy.enforce')
+ def test_volume_types_delete_with_non_admin(self, mock_policy_enforce,
+ mock_get, mock_destroy):
+
+ # allow policy authorized user to delete type
+ mock_policy_enforce.return_value = None
+ mock_get.return_value = \
+ {'extra_specs': {"key1": "value1"},
+ 'id': 1,
+ 'name': u'vol_type_1',
+ 'description': u'vol_type_desc_1'}
+ mock_destroy.side_effect = return_volume_types_destroy
+
+ req = fakes.HTTPRequest.blank('/v2/fake/types/1',
+ use_admin_context=False)
+ self.assertEqual(0, len(self.notifier.notifications))
+ self.controller._delete(req, 1)
+ self.assertEqual(1, len(self.notifier.notifications))
+ # non policy authorized user fails to delete type
+ mock_policy_enforce.side_effect = (
+ exception.PolicyNotAuthorized(action='type_delete'))
+ self.assertRaises(exception.PolicyNotAuthorized,
+ self.controller._delete,
+ req, 1)
+
def test_create(self):
self.stubs.Set(volume_types, 'create',
return_volume_types_create)
body = {'volume_type': 'string'}
self._create_volume_type_bad_body(body=body)
+ @mock.patch('cinder.volume.volume_types.create')
+ @mock.patch('cinder.volume.volume_types.get_volume_type_by_name')
+ @mock.patch('cinder.policy.enforce')
+ def test_create_with_none_admin(self, mock_policy_enforce,
+ mock_get_volume_type_by_name,
+ mock_create_type):
+
+ # allow policy authorized user to create type
+ mock_policy_enforce.return_value = None
+ mock_get_volume_type_by_name.return_value = \
+ {'extra_specs': {"key1": "value1"},
+ 'id': 1,
+ 'name': u'vol_type_1',
+ 'description': u'vol_type_desc_1'}
+
+ body = {"volume_type": {"name": "vol_type_1",
+ "os-volume-type-access:is_public": True,
+ "extra_specs": {"key1": "value1"}}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types',
+ use_admin_context=False)
+
+ self.assertEqual(0, len(self.notifier.notifications))
+ res_dict = self.controller._create(req, body)
+
+ self.assertEqual(1, len(self.notifier.notifications))
+ self._check_test_results(res_dict, {
+ 'expected_name': 'vol_type_1', 'expected_desc': 'vol_type_desc_1'})
+
+ # non policy authorized user fails to create type
+ mock_policy_enforce.side_effect = (
+ exception.PolicyNotAuthorized(action='type_create'))
+ self.assertRaises(exception.PolicyNotAuthorized,
+ self.controller._create,
+ req, body)
+
@mock.patch('cinder.volume.volume_types.update')
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_update(self, mock_get, mock_update):
'expected_desc': 'vol_type_desc_666'})
self.assertEqual(1, len(self.notifier.notifications))
+ @mock.patch('cinder.volume.volume_types.update')
+ @mock.patch('cinder.volume.volume_types.get_volume_type')
+ @mock.patch('cinder.policy.enforce')
+ def test_update_with_non_admin(self, mock_policy_enforce, mock_get,
+ mock_update):
+
+ # allow policy authorized user to update type
+ mock_policy_enforce.return_value = None
+ mock_get.return_value = return_volume_types_get_volume_type_updated(
+ '1', is_public=False)
+
+ body = {"volume_type": {"name": "vol_type_1_1",
+ "description": "vol_type_desc_1_1",
+ "is_public": False}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types/1',
+ use_admin_context=False)
+ req.method = 'PUT'
+
+ self.assertEqual(0, len(self.notifier.notifications))
+ res_dict = self.controller._update(req, '1', body)
+ self.assertEqual(1, len(self.notifier.notifications))
+ self._check_test_results(res_dict,
+ {'expected_desc': 'vol_type_desc_1_1',
+ 'expected_name': 'vol_type_1_1',
+ 'is_public': False})
+
+ # non policy authorized user fails to update type
+ mock_policy_enforce.side_effect = (
+ exception.PolicyNotAuthorized(action='type_update'))
+ self.assertRaises(exception.PolicyNotAuthorized,
+ self.controller._update,
+ req, '1', body)
+
def _check_test_results(self, results, expected_results):
self.assertEqual(1, len(results))
self.assertEqual(expected_results['expected_desc'],
new_type_name)
volume_types.destroy(self.ctxt, type_ref.id)
+ def test_volume_type_create_then_destroy_with_non_admin(self):
+ """Ensure volume types can be created and deleted by non-admin user.
+
+ If a non-admn user is authorized at API, volume type operations
+ should be permitted.
+ """
+ prev_all_vtypes = volume_types.get_all_types(self.ctxt)
+ self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
+
+ # create
+ type_ref = volume_types.create(self.ctxt,
+ self.vol_type1_name,
+ self.vol_type1_specs,
+ description=self.vol_type1_description)
+ new = volume_types.get_volume_type_by_name(self.ctxt,
+ self.vol_type1_name)
+ self.assertEqual(self.vol_type1_description, new['description'])
+ new_all_vtypes = volume_types.get_all_types(self.ctxt)
+ self.assertEqual(len(prev_all_vtypes) + 1,
+ len(new_all_vtypes),
+ 'drive type was not created')
+
+ # update
+ new_type_name = self.vol_type1_name + '_updated'
+ new_type_desc = self.vol_type1_description + '_updated'
+ type_ref_updated = volume_types.update(self.ctxt,
+ type_ref.id,
+ new_type_name,
+ new_type_desc)
+ self.assertEqual(new_type_name, type_ref_updated['name'])
+ self.assertEqual(new_type_desc, type_ref_updated['description'])
+
+ # destroy
+ volume_types.destroy(self.ctxt, type_ref['id'])
+ new_all_vtypes = volume_types.get_all_types(self.ctxt)
+ self.assertEqual(prev_all_vtypes,
+ new_all_vtypes,
+ 'drive type was not deleted')
+
def test_create_volume_type_with_invalid_params(self):
"""Ensure exception will be returned."""
vol_type_invalid_specs = "invalid_extra_specs"
vtype_access = db.volume_type_access_get_all(self.ctxt, vtype_id)
self.assertNotIn(project_id, vtype_access)
+ def test_add_access_with_non_admin(self):
+ self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
+ project_id = '456'
+ vtype = volume_types.create(self.ctxt, 'type1', is_public=False)
+ vtype_id = vtype.get('id')
+
+ volume_types.add_volume_type_access(self.ctxt, vtype_id, project_id)
+ vtype_access = db.volume_type_access_get_all(self.ctxt.elevated(),
+ vtype_id)
+ self.assertIn(project_id, [a.project_id for a in vtype_access])
+
+ def test_remove_access_with_non_admin(self):
+ self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
+ project_id = '456'
+ vtype = volume_types.create(self.ctxt, 'type1', projects=['456'],
+ is_public=False)
+ vtype_id = vtype.get('id')
+
+ volume_types.remove_volume_type_access(self.ctxt, vtype_id, project_id)
+ vtype_access = db.volume_type_access_get_all(self.ctxt.elevated(),
+ vtype_id)
+ self.assertNotIn(project_id, vtype_access)
+
def test_get_volume_type_qos_specs(self):
qos_ref = qos_specs.create(self.ctxt, 'qos-specs-1', {'k1': 'v1',
'k2': 'v2',
"""Creates volume types."""
extra_specs = extra_specs or {}
projects = projects or []
+ elevated = context if context.is_admin else context.elevated()
try:
- type_ref = db.volume_type_create(context,
+ type_ref = db.volume_type_create(elevated,
dict(name=name,
extra_specs=extra_specs,
is_public=is_public,
if id is None:
msg = _("id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
- old_volume_type = get_volume_type(context, id)
+ elevated = context if context.is_admin else context.elevated()
+ old_volume_type = get_volume_type(elevated, id)
try:
- type_updated = db.volume_type_update(context,
+ type_updated = db.volume_type_update(elevated,
id,
dict(name=name,
description=description,
if name:
old_type_name = old_volume_type.get('name')
if old_type_name != name:
- QUOTAS.update_quota_resource(context,
+ QUOTAS.update_quota_resource(elevated,
old_type_name,
name)
except db_exc.DBError:
msg = _("id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
else:
- db.volume_type_destroy(context, id)
+ elevated = context if context.is_admin else context.elevated()
+ db.volume_type_destroy(elevated, id)
def get_all_types(context, inactive=0, filters=None, marker=None,
if volume_type_id is None:
msg = _("volume_type_id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
- if is_public_volume_type(context, volume_type_id):
+ elevated = context if context.is_admin else context.elevated()
+ if is_public_volume_type(elevated, volume_type_id):
msg = _("Type access modification is not applicable to public volume "
"type.")
raise exception.InvalidVolumeType(reason=msg)
- return db.volume_type_access_add(context, volume_type_id, project_id)
+ return db.volume_type_access_add(elevated, volume_type_id, project_id)
def remove_volume_type_access(context, volume_type_id, project_id):
if volume_type_id is None:
msg = _("volume_type_id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
- if is_public_volume_type(context, volume_type_id):
+ elevated = context if context.is_admin else context.elevated()
+ if is_public_volume_type(elevated, volume_type_id):
msg = _("Type access modification is not applicable to public volume "
"type.")
raise exception.InvalidVolumeType(reason=msg)
- return db.volume_type_access_remove(context, volume_type_id, project_id)
+ return db.volume_type_access_remove(elevated, volume_type_id, project_id)
def is_encrypted(context, volume_type_id):
--- /dev/null
+---
+fixes:
+ - |
+ Enabled a cloud operator to correctly manage policy for
+ volume type operations. To permit volume type operations
+ for specific user, you can for example do as follows.
+
+ * Add ``storage_type_admin`` role.
+ * Add ``admin_or_storage_type_admin`` rule to ``policy.json``, e.g.
+ ``"admin_or_storage_type_admin": "is_admin:True or role:storage_type_admin",``
+ * Modify rule for types_manage and volume_type_access, e.g.
+ ``"volume_extension:types_manage": "rule:admin_or_storage_type_admin",
+ "volume_extension:volume_type_access:addProjectAccess": "rule:admin_or_storage_type_admin",
+ "volume_extension:volume_type_access:removeProjectAccess": "rule:admin_or_storage_type_admin",``