+# -*- coding: utf-8 -*-
+#
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
In the list-of-lists representation, each check inside the innermost
list is combined as with an "and" conjunction--for that check to pass,
all the specified checks must pass. These innermost lists are then
-combined as with an "or" conjunction. This is the original way of
-expressing policies, but there now exists a new way: the policy
-language.
-
-In the policy language, each check is specified the same way as in the
-list-of-lists representation: a simple "a:b" pair that is matched to
-the correct code to perform that check. However, conjunction
-operators are available, allowing for more expressiveness in crafting
-policies.
-
-As an example, take the following rule, expressed in the list-of-lists
-representation::
+combined as with an "or" conjunction. As an example, take the following
+rule, expressed in the list-of-lists representation::
[["role:admin"], ["project_id:%(project_id)s", "role:projectadmin"]]
-In the policy language, this becomes::
+This is the original way of expressing policies, but there now exists a
+new way: the policy language.
+
+In the policy language, each check is specified the same way as in the
+list-of-lists representation: a simple "a:b" pair that is matched to
+the correct class to perform that check::
+
+ +===========================================================================+
+ | TYPE | SYNTAX |
+ +===========================================================================+
+ |User's Role | role:admin |
+ +---------------------------------------------------------------------------+
+ |Rules already defined on policy | rule:admin_required |
+ +---------------------------------------------------------------------------+
+ |Against URL's¹ | http://my-url.org/check |
+ +---------------------------------------------------------------------------+
+ |User attributes² | project_id:%(target.project.id)s |
+ +---------------------------------------------------------------------------+
+ |Strings | <variable>:'xpto2035abc' |
+ | | 'myproject':<variable> |
+ +---------------------------------------------------------------------------+
+ | | project_id:xpto2035abc |
+ |Literals | domain_id:20 |
+ | | True:%(user.enabled)s |
+ +===========================================================================+
+
+¹URL checking must return 'True' to be valid
+²User attributes (obtained through the token): user_id, domain_id or project_id
+
+Conjunction operators are available, allowing for more expressiveness
+in crafting policies. So, in the policy language, the previous check in
+list-of-lists becomes::
role:admin or (project_id:%(project_id)s and role:projectadmin)
project_id:%(project_id)s and not role:dunce
-It is possible to perform policy checks on the following user
-attributes (obtained through the token): user_id, domain_id or
-project_id::
-
- domain_id:<some_value>
-
Attributes sent along with API calls can be used by the policy engine
(on the right side of the expression), by using the following syntax::
- <some_value>:user.id
+ <some_value>:%(user.id)s
Contextual attributes of objects identified by their IDs are loaded
from the database. They are also available to the policy engine and
can be checked through the `target` keyword::
- <some_value>:target.role.name
-
-All these attributes (related to users, API calls, and context) can be
-checked against each other or against constants, be it literals (True,
-<a_number>) or strings.
+ <some_value>:%(target.role.name)s
Finally, two special policy checks should be mentioned; the policy
check "@" will always accept an access, and the policy check "!" will
import abc
import ast
import copy
+import logging
import os
import re
import six.moves.urllib.request as urlrequest
from cinder.openstack.common import fileutils
-from cinder.openstack.common._i18n import _, _LE, _LW
-from cinder.openstack.common import log as logging
+from cinder.openstack.common._i18n import _, _LE, _LI
policy_opts = [
:param default_rule: Default rule to use, CONF.default_rule will
be used if none is specified.
:param use_conf: Whether to load rules from cache or config file.
+ :param overwrite: Whether to overwrite existing rules when reload rules
+ from config file.
"""
def __init__(self, policy_file=None, rules=None,
- default_rule=None, use_conf=True):
+ default_rule=None, use_conf=True, overwrite=True):
self.default_rule = default_rule or CONF.policy_default_rule
self.rules = Rules(rules, self.default_rule)
self.policy_path = None
self.policy_file = policy_file or CONF.policy_file
self.use_conf = use_conf
+ self.overwrite = overwrite
def set_rules(self, rules, overwrite=True, use_conf=False):
"""Create a new Rules object based on the provided dict of rules.
Policy file is cached and will be reloaded if modified.
- :param force_reload: Whether to overwrite current rules.
+ :param force_reload: Whether to reload rules from config file.
"""
if force_reload:
if not self.policy_path:
self.policy_path = self._get_policy_path(self.policy_file)
- self._load_policy_file(self.policy_path, force_reload)
+ self._load_policy_file(self.policy_path, force_reload,
+ overwrite=self.overwrite)
for path in CONF.policy_dirs:
try:
path = self._get_policy_path(path)
except cfg.ConfigFilesNotFoundError:
- LOG.warn(_LW("Can not find policy directory: %s"), path)
+ LOG.info(_LI("Can not find policy directory: %s"), path)
continue
self._walk_through_policy_directory(path,
self._load_policy_file,
def _load_policy_file(self, path, force_reload, overwrite=True):
reloaded, data = fileutils.read_cached_file(
path, force_reload=force_reload)
- if reloaded or not self.rules:
+ if reloaded or not self.rules or not overwrite:
rules = Rules.load_json(data, self.default_rule)
- self.set_rules(rules, overwrite)
+ self.set_rules(rules, overwrite=overwrite, use_conf=True)
LOG.debug("Rules successfully reloaded")
def _get_policy_path(self, path):
"""
url = ('http:' + self.match) % target
- data = {'target': jsonutils.dumps(target),
+
+ # Convert instances of object() in target temporarily to
+ # empty dict to avoid circular reference detection
+ # errors in jsonutils.dumps().
+ temp_target = copy.deepcopy(target)
+ for key in target.keys():
+ element = target.get(key)
+ if type(element) is object:
+ temp_target[key] = {}
+
+ data = {'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
post_data = urlparse.urlencode(data)
f = urlrequest.urlopen(url, post_data)