]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Make the 'admin' role configurable
authorSalvatore Orlando <salv.orlando@gmail.com>
Mon, 22 Apr 2013 07:44:14 +0000 (09:44 +0200)
committerSalvatore Orlando <salv.orlando@gmail.com>
Mon, 22 Apr 2013 18:42:02 +0000 (20:42 +0200)
Bug 1158434

This patch adds a new policy named 'context_is_admin' which defines
an admin user as a collection of roles or else. The quantum context
has been updated to check for this policy when setting the is_admin
flag.
This patch also adds a method for gathering 'admin' roles from policy
rules as current logic requires the context to be always populate with
the correct roles for admin rules, even when the context is implicitly
generated with get_admin_context or context.elevated.
Backward compatibility is ensuring by preserving the old behavior if
the 'context_is_admin' policy is not found in policy.json

Change-Id: I9acea75cca0c47e083a9149e358328ea3ca12d68

etc/policy.json
quantum/context.py
quantum/policy.py
quantum/tests/unit/test_policy.py

index 4e433eec011e9ec205bf9d9492f4ba40e013daf0..f91494a31561bacb337db384f496fce1c429735a 100644 (file)
@@ -1,7 +1,8 @@
 {
-    "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
-    "admin_or_network_owner": "role:admin or tenant_id:%(network_tenant_id)s",
-    "admin_only": "role:admin",
+    "context_is_admin":  "role:admin",
+    "admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
+    "admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network_tenant_id)s",
+    "admin_only": "rule:context_is_admin",
     "regular_user": "",
     "shared": "field:networks:shared=True",
     "external": "field:networks:router:external=True",
index 236da4d7554867376e3ae631f3e3e70e7e96ccb6..01c1d377bf5c6be40d574fa621fce9180a13e8bc 100644 (file)
@@ -24,6 +24,7 @@ from datetime import datetime
 from quantum.db import api as db_api
 from quantum.openstack.common import context as common_context
 from quantum.openstack.common import log as logging
+from quantum import policy
 
 
 LOG = logging.getLogger(__name__)
@@ -48,16 +49,22 @@ class ContextBase(common_context.RequestContext):
                        'context: %s'), kwargs)
         super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
                                           is_admin=is_admin)
-        self.roles = roles or []
-        if self.is_admin is None:
-            self.is_admin = 'admin' in [x.lower() for x in self.roles]
-        elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]:
-            self.roles.append('admin')
         self.read_deleted = read_deleted
         if not timestamp:
             timestamp = datetime.utcnow()
         self.timestamp = timestamp
         self._session = None
+        self.roles = roles or []
+        if self.is_admin is None:
+            self.is_admin = policy.check_is_admin(self)
+        elif self.is_admin:
+            # Ensure context is populated with admin roles
+            # TODO(salvatore-orlando): It should not be necessary
+            # to populate roles in artificially-generated contexts
+            # address in bp/make-authz-orthogonal
+            admin_roles = policy.get_admin_roles()
+            if admin_roles:
+                self.roles = list(set(self.roles) | set(admin_roles))
 
     @property
     def project_id(self):
index 1683d5c7c093a87b799eef758d3d740fc95e654d..7d101a4556d2f39dc31b31b2121882346d631b30 100644 (file)
@@ -31,6 +31,7 @@ from quantum.openstack.common import policy
 LOG = logging.getLogger(__name__)
 _POLICY_PATH = None
 _POLICY_CACHE = {}
+ADMIN_CTX_POLICY = 'context_is_admin'
 cfg.CONF.import_opt('policy_file', 'quantum.common.config')
 
 
@@ -207,3 +208,48 @@ def enforce(context, action, target, plugin=None):
     credentials = context.to_dict()
     return policy.check(match_rule, real_target, credentials,
                         exceptions.PolicyNotAuthorized, action=action)
+
+
+def check_is_admin(context):
+    """Verify context has admin rights according to policy settings."""
+    init()
+    # the target is user-self
+    credentials = context.to_dict()
+    target = credentials
+    # Backward compatibility: if ADMIN_CTX_POLICY is not
+    # found, default to validating role:admin
+    admin_policy = (ADMIN_CTX_POLICY in policy._rules
+                    and ADMIN_CTX_POLICY or 'role:admin')
+    return policy.check(admin_policy, target, credentials)
+
+
+def _extract_roles(rule, roles):
+    if isinstance(rule, policy.RoleCheck):
+        roles.append(rule.match.lower())
+    elif isinstance(rule, policy.RuleCheck):
+        _extract_roles(policy._rules[rule.match], roles)
+    elif hasattr(rule, 'rules'):
+        for rule in rule.rules:
+            _extract_roles(rule, roles)
+
+
+def get_admin_roles():
+    """Return a list of roles which are granted admin rights according
+    to policy settings.
+    """
+    # NOTE(salvatore-orlando): This function provides a solution for
+    # populating implicit contexts with the appropriate roles so that
+    # they correctly pass policy checks, and will become superseded
+    # once all explicit policy checks are removed from db logic and
+    # plugin modules. For backward compatibility it returns the literal
+    # admin if ADMIN_CTX_POLICY is not defined
+    init()
+    if not policy._rules or ADMIN_CTX_POLICY not in policy._rules:
+        return ['admin']
+    try:
+        admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY]
+    except (KeyError, TypeError):
+        return
+    roles = []
+    _extract_roles(admin_ctx_rule, roles)
+    return roles
index 7e7938f9cbc65f6062aec1b8495bce258deab9eb..924b80841db68576374d31711b93209ce393387b 100644 (file)
@@ -35,7 +35,7 @@ class PolicyFileTestCase(base.BaseTestCase):
         super(PolicyFileTestCase, self).setUp()
         policy.reset()
         self.addCleanup(policy.reset)
-        self.context = context.Context('fake', 'fake')
+        self.context = context.Context('fake', 'fake', is_admin=False)
         self.target = {}
         self.tempdir = self.useFixture(fixtures.TempDir())
 
@@ -200,11 +200,15 @@ class QuantumPolicyTestCase(base.BaseTestCase):
         policy.reset()
         policy.init()
         self.addCleanup(policy.reset)
+        self.admin_only_legacy = "role:admin"
+        self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
         self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
-            "admin_or_network_owner": "role:admin or "
+            "context_is_admin": "role:admin",
+            "admin_or_network_owner": "rule:context_is_admin or "
                                       "tenant_id:%(network_tenant_id)s",
-            "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
-            "admin_only": "role:admin",
+            "admin_or_owner": ("rule:context_is_admin or "
+                               "tenant_id:%(tenant_id)s"),
+            "admin_only": "rule:context_is_admin",
             "regular_user": "role:user",
             "shared": "field:networks:shared=True",
             "external": "field:networks:router:external=True",
@@ -278,12 +282,31 @@ class QuantumPolicyTestCase(base.BaseTestCase):
     def test_enforce_adminonly_attribute_update(self):
         self._test_enforce_adminonly_attribute('update_network')
 
+    def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
+        del self.rules[policy.ADMIN_CTX_POLICY]
+        self.rules['admin_only'] = common_policy.parse_rule(
+            self.admin_only_legacy)
+        self.rules['admin_or_owner'] = common_policy.parse_rule(
+            self.admin_or_owner_legacy)
+        self._test_enforce_adminonly_attribute('create_network')
+
     def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
         action = "create_network"
         target = {'shared': True, 'tenant_id': 'somebody_else'}
         self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
                           self.context, action, target, None)
 
+    def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self):
+        del self.rules[policy.ADMIN_CTX_POLICY]
+        self.rules['admin_only'] = common_policy.parse_rule(
+            self.admin_only_legacy)
+        self.rules['admin_or_owner'] = common_policy.parse_rule(
+            self.admin_or_owner_legacy)
+        action = "create_network"
+        target = {'shared': True, 'tenant_id': 'somebody_else'}
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, target, None)
+
     def test_enforce_regularuser_on_read(self):
         action = "get_network"
         target = {'shared': True, 'tenant_id': 'somebody_else'}
@@ -300,3 +323,41 @@ class QuantumPolicyTestCase(base.BaseTestCase):
             target = {'network_id': 'whatever'}
             result = policy.enforce(self.context, action, target, self.plugin)
             self.assertTrue(result)
+
+    def test_get_roles_context_is_admin_rule_missing(self):
+        rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+            "some_other_rule": "role:admin",
+        }.items())
+        common_policy.set_rules(common_policy.Rules(rules))
+        # 'admin' role is expected for bw compatibility
+        self.assertEqual(['admin'], policy.get_admin_roles())
+
+    def test_get_roles_with_role_check(self):
+        rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+            policy.ADMIN_CTX_POLICY: "role:admin",
+        }.items())
+        common_policy.set_rules(common_policy.Rules(rules))
+        self.assertEqual(['admin'], policy.get_admin_roles())
+
+    def test_get_roles_with_rule_check(self):
+        rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+            policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
+            "some_other_rule": "role:admin",
+        }.items())
+        common_policy.set_rules(common_policy.Rules(rules))
+        self.assertEqual(['admin'], policy.get_admin_roles())
+
+    def test_get_roles_with_or_check(self):
+        self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+            policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2",
+            "rule1": "role:admin_1",
+            "rule2": "role:admin_2"
+        }.items())
+        self.assertEqual(['admin_1', 'admin_2'],
+                         policy.get_admin_roles())
+
+    def test_get_roles_with_other_rules(self):
+        self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+            policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
+        }.items())
+        self.assertEqual(['xxx'], policy.get_admin_roles())