from cinder.openstack.common import log as logging
from cinder.openstack.common import local
from cinder.openstack.common import timeutils
+from cinder import policy
from cinder import utils
self.roles = roles or []
self.is_admin = is_admin
if self.is_admin is None:
- self.is_admin = 'admin' in [x.lower() for x in self.roles]
+ self.is_admin = policy.check_is_admin(self.roles)
elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin')
self.read_deleted = read_deleted
policy.enforce(match_list, target, credentials,
exception.PolicyNotAuthorized, action=action)
+
+
+def check_is_admin(roles):
+ """Whether or not roles contains 'admin' role according to policy setting.
+
+ """
+ init()
+
+ action = 'context_is_admin'
+ match_list = ('rule:%s' % action,)
+ # include project_id on target to avoid KeyError if context_is_admin
+ # policy definition is missing, and default admin_or_owner rule
+ # attempts to apply. Since our credentials dict does not include a
+ # project_id, this target can never match as a generic rule.
+ target = {'project_id': ''}
+ credentials = {'roles': roles}
+
+ return policy.enforce(match_list, target, credentials)
class PolicyFileTestCase(test.TestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
- policy.reset()
+ # since is_admin is defined by policy, create context before reset
self.context = context.RequestContext('fake', 'fake')
+ policy.reset()
self.target = {}
def tearDown(self):
self._set_brain("default_noexist")
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {})
+
+
+class ContextIsAdminPolicyTestCase(test.TestCase):
+
+ def setUp(self):
+ super(ContextIsAdminPolicyTestCase, self).setUp()
+ policy.reset()
+ policy.init()
+
+ def test_default_admin_role_is_admin(self):
+ ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
+ self.assertFalse(ctx.is_admin)
+ ctx = context.RequestContext('fake', 'fake', roles=['admin'])
+ self.assert_(ctx.is_admin)
+
+ def test_custom_admin_role_is_admin(self):
+ # define explict rules for context_is_admin
+ rules = {
+ 'context_is_admin': [["role:administrator"], ["role:johnny-admin"]]
+ }
+ brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
+ common_policy.set_brain(brain)
+ ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
+ self.assert_(ctx.is_admin)
+ ctx = context.RequestContext('fake', 'fake', roles=['administrator'])
+ self.assert_(ctx.is_admin)
+ # default rule no longer applies
+ ctx = context.RequestContext('fake', 'fake', roles=['admin'])
+ self.assertFalse(ctx.is_admin)
+
+ def test_context_is_admin_undefined(self):
+ rules = {
+ "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
+ "default": [["rule:admin_or_owner"]],
+ }
+ brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
+ common_policy.set_brain(brain)
+ ctx = context.RequestContext('fake', 'fake')
+ self.assertFalse(ctx.is_admin)
+ ctx = context.RequestContext('fake', 'fake', roles=['admin'])
+ self.assert_(ctx.is_admin)
{
- "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
+ "context_is_admin": [["role:admin"]],
+ "admin_or_owner": [["is_admin:True"], ["project_id:%(project_id)s"]],
"default": [["rule:admin_or_owner"]],
- "admin_api": [["role:admin"]],
+ "admin_api": [["is_admin:True"]],
"volume:create": [],
"volume:get_all": [],