From 35988f13931115658cf20d323da2f549073bcd52 Mon Sep 17 00:00:00 2001 From: Salvatore Orlando Date: Mon, 22 Apr 2013 09:44:14 +0200 Subject: [PATCH] Make the 'admin' role configurable Bug 1158434 This patch adds a new policy named 'context_is_admin' which defines an admin user as a collection of roles or else. The quantum context has been updated to check for this policy when setting the is_admin flag. This patch also adds a method for gathering 'admin' roles from policy rules as current logic requires the context to be always populate with the correct roles for admin rules, even when the context is implicitly generated with get_admin_context or context.elevated. Backward compatibility is ensuring by preserving the old behavior if the 'context_is_admin' policy is not found in policy.json Change-Id: I9acea75cca0c47e083a9149e358328ea3ca12d68 --- etc/policy.json | 7 ++-- quantum/context.py | 17 +++++--- quantum/policy.py | 46 +++++++++++++++++++++ quantum/tests/unit/test_policy.py | 69 +++++++++++++++++++++++++++++-- 4 files changed, 127 insertions(+), 12 deletions(-) diff --git a/etc/policy.json b/etc/policy.json index 4e433eec0..f91494a31 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -1,7 +1,8 @@ { - "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s", - "admin_or_network_owner": "role:admin or tenant_id:%(network_tenant_id)s", - "admin_only": "role:admin", + "context_is_admin": "role:admin", + "admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s", + "admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network_tenant_id)s", + "admin_only": "rule:context_is_admin", "regular_user": "", "shared": "field:networks:shared=True", "external": "field:networks:router:external=True", diff --git a/quantum/context.py b/quantum/context.py index 236da4d75..01c1d377b 100644 --- a/quantum/context.py +++ b/quantum/context.py @@ -24,6 +24,7 @@ from datetime import datetime from quantum.db import api as db_api from quantum.openstack.common import context as common_context from quantum.openstack.common import log as logging +from quantum import policy LOG = logging.getLogger(__name__) @@ -48,16 +49,22 @@ class ContextBase(common_context.RequestContext): 'context: %s'), kwargs) super(ContextBase, self).__init__(user=user_id, tenant=tenant_id, is_admin=is_admin) - self.roles = roles or [] - if self.is_admin is None: - self.is_admin = 'admin' in [x.lower() for x in self.roles] - elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]: - self.roles.append('admin') self.read_deleted = read_deleted if not timestamp: timestamp = datetime.utcnow() self.timestamp = timestamp self._session = None + self.roles = roles or [] + if self.is_admin is None: + self.is_admin = policy.check_is_admin(self) + elif self.is_admin: + # Ensure context is populated with admin roles + # TODO(salvatore-orlando): It should not be necessary + # to populate roles in artificially-generated contexts + # address in bp/make-authz-orthogonal + admin_roles = policy.get_admin_roles() + if admin_roles: + self.roles = list(set(self.roles) | set(admin_roles)) @property def project_id(self): diff --git a/quantum/policy.py b/quantum/policy.py index 1683d5c7c..7d101a455 100644 --- a/quantum/policy.py +++ b/quantum/policy.py @@ -31,6 +31,7 @@ from quantum.openstack.common import policy LOG = logging.getLogger(__name__) _POLICY_PATH = None _POLICY_CACHE = {} +ADMIN_CTX_POLICY = 'context_is_admin' cfg.CONF.import_opt('policy_file', 'quantum.common.config') @@ -207,3 +208,48 @@ def enforce(context, action, target, plugin=None): credentials = context.to_dict() return policy.check(match_rule, real_target, credentials, exceptions.PolicyNotAuthorized, action=action) + + +def check_is_admin(context): + """Verify context has admin rights according to policy settings.""" + init() + # the target is user-self + credentials = context.to_dict() + target = credentials + # Backward compatibility: if ADMIN_CTX_POLICY is not + # found, default to validating role:admin + admin_policy = (ADMIN_CTX_POLICY in policy._rules + and ADMIN_CTX_POLICY or 'role:admin') + return policy.check(admin_policy, target, credentials) + + +def _extract_roles(rule, roles): + if isinstance(rule, policy.RoleCheck): + roles.append(rule.match.lower()) + elif isinstance(rule, policy.RuleCheck): + _extract_roles(policy._rules[rule.match], roles) + elif hasattr(rule, 'rules'): + for rule in rule.rules: + _extract_roles(rule, roles) + + +def get_admin_roles(): + """Return a list of roles which are granted admin rights according + to policy settings. + """ + # NOTE(salvatore-orlando): This function provides a solution for + # populating implicit contexts with the appropriate roles so that + # they correctly pass policy checks, and will become superseded + # once all explicit policy checks are removed from db logic and + # plugin modules. For backward compatibility it returns the literal + # admin if ADMIN_CTX_POLICY is not defined + init() + if not policy._rules or ADMIN_CTX_POLICY not in policy._rules: + return ['admin'] + try: + admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY] + except (KeyError, TypeError): + return + roles = [] + _extract_roles(admin_ctx_rule, roles) + return roles diff --git a/quantum/tests/unit/test_policy.py b/quantum/tests/unit/test_policy.py index 7e7938f9c..924b80841 100644 --- a/quantum/tests/unit/test_policy.py +++ b/quantum/tests/unit/test_policy.py @@ -35,7 +35,7 @@ class PolicyFileTestCase(base.BaseTestCase): super(PolicyFileTestCase, self).setUp() policy.reset() self.addCleanup(policy.reset) - self.context = context.Context('fake', 'fake') + self.context = context.Context('fake', 'fake', is_admin=False) self.target = {} self.tempdir = self.useFixture(fixtures.TempDir()) @@ -200,11 +200,15 @@ class QuantumPolicyTestCase(base.BaseTestCase): policy.reset() policy.init() self.addCleanup(policy.reset) + self.admin_only_legacy = "role:admin" + self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s" self.rules = dict((k, common_policy.parse_rule(v)) for k, v in { - "admin_or_network_owner": "role:admin or " + "context_is_admin": "role:admin", + "admin_or_network_owner": "rule:context_is_admin or " "tenant_id:%(network_tenant_id)s", - "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s", - "admin_only": "role:admin", + "admin_or_owner": ("rule:context_is_admin or " + "tenant_id:%(tenant_id)s"), + "admin_only": "rule:context_is_admin", "regular_user": "role:user", "shared": "field:networks:shared=True", "external": "field:networks:router:external=True", @@ -278,12 +282,31 @@ class QuantumPolicyTestCase(base.BaseTestCase): def test_enforce_adminonly_attribute_update(self): self._test_enforce_adminonly_attribute('update_network') + def test_enforce_adminonly_attribute_no_context_is_admin_policy(self): + del self.rules[policy.ADMIN_CTX_POLICY] + self.rules['admin_only'] = common_policy.parse_rule( + self.admin_only_legacy) + self.rules['admin_or_owner'] = common_policy.parse_rule( + self.admin_or_owner_legacy) + self._test_enforce_adminonly_attribute('create_network') + def test_enforce_adminonly_attribute_nonadminctx_returns_403(self): action = "create_network" target = {'shared': True, 'tenant_id': 'somebody_else'} self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, self.context, action, target, None) + def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self): + del self.rules[policy.ADMIN_CTX_POLICY] + self.rules['admin_only'] = common_policy.parse_rule( + self.admin_only_legacy) + self.rules['admin_or_owner'] = common_policy.parse_rule( + self.admin_or_owner_legacy) + action = "create_network" + target = {'shared': True, 'tenant_id': 'somebody_else'} + self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, + self.context, action, target, None) + def test_enforce_regularuser_on_read(self): action = "get_network" target = {'shared': True, 'tenant_id': 'somebody_else'} @@ -300,3 +323,41 @@ class QuantumPolicyTestCase(base.BaseTestCase): target = {'network_id': 'whatever'} result = policy.enforce(self.context, action, target, self.plugin) self.assertTrue(result) + + def test_get_roles_context_is_admin_rule_missing(self): + rules = dict((k, common_policy.parse_rule(v)) for k, v in { + "some_other_rule": "role:admin", + }.items()) + common_policy.set_rules(common_policy.Rules(rules)) + # 'admin' role is expected for bw compatibility + self.assertEqual(['admin'], policy.get_admin_roles()) + + def test_get_roles_with_role_check(self): + rules = dict((k, common_policy.parse_rule(v)) for k, v in { + policy.ADMIN_CTX_POLICY: "role:admin", + }.items()) + common_policy.set_rules(common_policy.Rules(rules)) + self.assertEqual(['admin'], policy.get_admin_roles()) + + def test_get_roles_with_rule_check(self): + rules = dict((k, common_policy.parse_rule(v)) for k, v in { + policy.ADMIN_CTX_POLICY: "rule:some_other_rule", + "some_other_rule": "role:admin", + }.items()) + common_policy.set_rules(common_policy.Rules(rules)) + self.assertEqual(['admin'], policy.get_admin_roles()) + + def test_get_roles_with_or_check(self): + self.rules = dict((k, common_policy.parse_rule(v)) for k, v in { + policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2", + "rule1": "role:admin_1", + "rule2": "role:admin_2" + }.items()) + self.assertEqual(['admin_1', 'admin_2'], + policy.get_admin_roles()) + + def test_get_roles_with_other_rules(self): + self.rules = dict((k, common_policy.parse_rule(v)) for k, v in { + policy.ADMIN_CTX_POLICY: "role:xxx or other:value", + }.items()) + self.assertEqual(['xxx'], policy.get_admin_roles()) -- 2.45.2