from quantum.db import api as db_api
from quantum.openstack.common import context as common_context
from quantum.openstack.common import log as logging
+from quantum import policy
LOG = logging.getLogger(__name__)
'context: %s'), kwargs)
super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
is_admin=is_admin)
- self.roles = roles or []
- if self.is_admin is None:
- self.is_admin = 'admin' in [x.lower() for x in self.roles]
- elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]:
- self.roles.append('admin')
self.read_deleted = read_deleted
if not timestamp:
timestamp = datetime.utcnow()
self.timestamp = timestamp
self._session = None
+ self.roles = roles or []
+ if self.is_admin is None:
+ self.is_admin = policy.check_is_admin(self)
+ elif self.is_admin:
+ # Ensure context is populated with admin roles
+ # TODO(salvatore-orlando): It should not be necessary
+ # to populate roles in artificially-generated contexts
+ # address in bp/make-authz-orthogonal
+ admin_roles = policy.get_admin_roles()
+ if admin_roles:
+ self.roles = list(set(self.roles) | set(admin_roles))
@property
def project_id(self):
LOG = logging.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
+ADMIN_CTX_POLICY = 'context_is_admin'
cfg.CONF.import_opt('policy_file', 'quantum.common.config')
credentials = context.to_dict()
return policy.check(match_rule, real_target, credentials,
exceptions.PolicyNotAuthorized, action=action)
+
+
+def check_is_admin(context):
+ """Verify context has admin rights according to policy settings."""
+ init()
+ # the target is user-self
+ credentials = context.to_dict()
+ target = credentials
+ # Backward compatibility: if ADMIN_CTX_POLICY is not
+ # found, default to validating role:admin
+ admin_policy = (ADMIN_CTX_POLICY in policy._rules
+ and ADMIN_CTX_POLICY or 'role:admin')
+ return policy.check(admin_policy, target, credentials)
+
+
+def _extract_roles(rule, roles):
+ if isinstance(rule, policy.RoleCheck):
+ roles.append(rule.match.lower())
+ elif isinstance(rule, policy.RuleCheck):
+ _extract_roles(policy._rules[rule.match], roles)
+ elif hasattr(rule, 'rules'):
+ for rule in rule.rules:
+ _extract_roles(rule, roles)
+
+
+def get_admin_roles():
+ """Return a list of roles which are granted admin rights according
+ to policy settings.
+ """
+ # NOTE(salvatore-orlando): This function provides a solution for
+ # populating implicit contexts with the appropriate roles so that
+ # they correctly pass policy checks, and will become superseded
+ # once all explicit policy checks are removed from db logic and
+ # plugin modules. For backward compatibility it returns the literal
+ # admin if ADMIN_CTX_POLICY is not defined
+ init()
+ if not policy._rules or ADMIN_CTX_POLICY not in policy._rules:
+ return ['admin']
+ try:
+ admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY]
+ except (KeyError, TypeError):
+ return
+ roles = []
+ _extract_roles(admin_ctx_rule, roles)
+ return roles
super(PolicyFileTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
- self.context = context.Context('fake', 'fake')
+ self.context = context.Context('fake', 'fake', is_admin=False)
self.target = {}
self.tempdir = self.useFixture(fixtures.TempDir())
policy.reset()
policy.init()
self.addCleanup(policy.reset)
+ self.admin_only_legacy = "role:admin"
+ self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- "admin_or_network_owner": "role:admin or "
+ "context_is_admin": "role:admin",
+ "admin_or_network_owner": "rule:context_is_admin or "
"tenant_id:%(network_tenant_id)s",
- "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
- "admin_only": "role:admin",
+ "admin_or_owner": ("rule:context_is_admin or "
+ "tenant_id:%(tenant_id)s"),
+ "admin_only": "rule:context_is_admin",
"regular_user": "role:user",
"shared": "field:networks:shared=True",
"external": "field:networks:router:external=True",
def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network')
+ def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
+ del self.rules[policy.ADMIN_CTX_POLICY]
+ self.rules['admin_only'] = common_policy.parse_rule(
+ self.admin_only_legacy)
+ self.rules['admin_or_owner'] = common_policy.parse_rule(
+ self.admin_or_owner_legacy)
+ self._test_enforce_adminonly_attribute('create_network')
+
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
+ def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self):
+ del self.rules[policy.ADMIN_CTX_POLICY]
+ self.rules['admin_only'] = common_policy.parse_rule(
+ self.admin_only_legacy)
+ self.rules['admin_or_owner'] = common_policy.parse_rule(
+ self.admin_or_owner_legacy)
+ action = "create_network"
+ target = {'shared': True, 'tenant_id': 'somebody_else'}
+ self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+ self.context, action, target, None)
+
def test_enforce_regularuser_on_read(self):
action = "get_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
target = {'network_id': 'whatever'}
result = policy.enforce(self.context, action, target, self.plugin)
self.assertTrue(result)
+
+ def test_get_roles_context_is_admin_rule_missing(self):
+ rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ "some_other_rule": "role:admin",
+ }.items())
+ common_policy.set_rules(common_policy.Rules(rules))
+ # 'admin' role is expected for bw compatibility
+ self.assertEqual(['admin'], policy.get_admin_roles())
+
+ def test_get_roles_with_role_check(self):
+ rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ policy.ADMIN_CTX_POLICY: "role:admin",
+ }.items())
+ common_policy.set_rules(common_policy.Rules(rules))
+ self.assertEqual(['admin'], policy.get_admin_roles())
+
+ def test_get_roles_with_rule_check(self):
+ rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
+ "some_other_rule": "role:admin",
+ }.items())
+ common_policy.set_rules(common_policy.Rules(rules))
+ self.assertEqual(['admin'], policy.get_admin_roles())
+
+ def test_get_roles_with_or_check(self):
+ self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2",
+ "rule1": "role:admin_1",
+ "rule2": "role:admin_2"
+ }.items())
+ self.assertEqual(['admin_1', 'admin_2'],
+ policy.get_admin_roles())
+
+ def test_get_roles_with_other_rules(self):
+ self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
+ policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
+ }.items())
+ self.assertEqual(['xxx'], policy.get_admin_roles())