import abc
import ast
+import copy
+import os
import re
from oslo.config import cfg
+from oslo.serialization import jsonutils
import six
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
from cinder.openstack.common import fileutils
-from cinder.openstack.common.gettextutils import _, _LE
-from cinder.openstack.common import jsonutils
+from cinder.openstack.common._i18n import _, _LE, _LW
from cinder.openstack.common import log as logging
default='default',
help=_('Default rule. Enforced when a requested rule is not '
'found.')),
+ cfg.MultiStrOpt('policy_dirs',
+ default=['policy.d'],
+ help=_('Directories where policy configuration files are '
+ 'stored. They can be relative to any directory '
+ 'in the search path defined by the config_dir '
+ 'option, or absolute paths. The file defined by '
+ 'policy_file must exist for these directories to '
+ 'be searched.')),
]
CONF = cfg.CONF
_checks = {}
+def list_opts():
+ """Entry point for oslo.config-generator."""
+ return [(None, copy.deepcopy(policy_opts))]
+
+
class PolicyNotAuthorized(Exception):
def __init__(self, rule):
def __init__(self, policy_file=None, rules=None,
default_rule=None, use_conf=True):
- self.rules = Rules(rules, default_rule)
self.default_rule = default_rule or CONF.policy_default_rule
+ self.rules = Rules(rules, self.default_rule)
self.policy_path = None
self.policy_file = policy_file or CONF.policy_file
if self.use_conf:
if not self.policy_path:
- self.policy_path = self._get_policy_path()
-
+ self.policy_path = self._get_policy_path(self.policy_file)
+
+ self._load_policy_file(self.policy_path, force_reload)
+ for path in CONF.policy_dirs:
+ try:
+ path = self._get_policy_path(path)
+ except cfg.ConfigFilesNotFoundError:
+ LOG.warn(_LW("Can not find policy directory: %s"), path)
+ continue
+ self._walk_through_policy_directory(path,
+ self._load_policy_file,
+ force_reload, False)
+
+ @staticmethod
+ def _walk_through_policy_directory(path, func, *args):
+ # We do not iterate over sub-directories.
+ policy_files = next(os.walk(path))[2]
+ policy_files.sort()
+ for policy_file in [p for p in policy_files if not p.startswith('.')]:
+ func(os.path.join(path, policy_file), *args)
+
+ def _load_policy_file(self, path, force_reload, overwrite=True):
reloaded, data = fileutils.read_cached_file(
- self.policy_path, force_reload=force_reload)
+ path, force_reload=force_reload)
if reloaded or not self.rules:
rules = Rules.load_json(data, self.default_rule)
- self.set_rules(rules)
+ self.set_rules(rules, overwrite)
LOG.debug("Rules successfully reloaded")
- def _get_policy_path(self):
- """Locate the policy json data file.
+ def _get_policy_path(self, path):
+ """Locate the policy json data file/path.
- :param policy_file: Custom policy file to locate.
+ :param path: It's value can be a full path or related path. When
+ full path specified, this function just returns the full
+ path. When related path specified, this function will
+ search configuration directories to find one that exists.
:returns: The policy path
- :raises: ConfigFilesNotFoundError if the file couldn't
+ :raises: ConfigFilesNotFoundError if the file/path couldn't
be located.
"""
- policy_file = CONF.find_file(self.policy_file)
+ policy_path = CONF.find_file(path)
- if policy_file:
- return policy_file
+ if policy_path:
+ return policy_path
- raise cfg.ConfigFilesNotFoundError((self.policy_file,))
+ raise cfg.ConfigFilesNotFoundError((path,))
def enforce(self, rule, target, creds, do_raise=False,
exc=None, *args, **kwargs):
:param do_raise: Whether to raise an exception or not if check
fails.
:param exc: Class of the exception to raise if the check fails.
- Any remaining arguments passed to check() (both
+ Any remaining arguments passed to enforce() (both
positional and keyword arguments) will be passed to
the exception class. If not specified, PolicyNotAuthorized
will be used.
return state.result
except ValueError:
# Couldn't parse the rule
- LOG.exception(_LE("Failed to understand rule %r") % rule)
+ LOG.exception(_LE("Failed to understand rule %s") % rule)
# Fail closed
return FalseCheck()
'Member':%(role.name)s
"""
- # TODO(termie): do dict inspection via dot syntax
try:
match = self.match % target
except KeyError:
leftval = ast.literal_eval(self.kind)
except ValueError:
try:
- leftval = creds[self.kind]
+ kind_parts = self.kind.split('.')
+ leftval = creds
+ for kind_part in kind_parts:
+ leftval = leftval[kind_part]
except KeyError:
return False
return match == six.text_type(leftval)