"""
Policy engine for quantum. Largely copied from nova.
"""
+import itertools
from oslo.config import cfg
_POLICY_PATH = None
_POLICY_CACHE = {}
ADMIN_CTX_POLICY = 'context_is_admin'
+# Maps deprecated 'extension' policies to new-style policies
+DEPRECATED_POLICY_MAP = {
+ 'extension:provider_network':
+ ['network:provider:network_type',
+ 'network:provider:physical_network',
+ 'network:provider:segmentation_id'],
+ 'extension:router':
+ ['network:router:external'],
+ 'extension:port_binding':
+ ['port:binding:vif_type', 'port:binding:capabilities',
+ 'port:binding:profile', 'port:binding:host_id']
+}
+DEPRECATED_ACTION_MAP = {
+ 'view': ['get'],
+ 'set': ['create', 'update']
+}
+
cfg.CONF.import_opt('policy_file', 'quantum.common.config')
def _set_rules(data):
default_rule = 'default'
LOG.debug(_("loading policies from file: %s"), _POLICY_PATH)
- # TODO(salvatore-orlando): Ensure backward compatibility with
- # folsom/grizzly style for extension rules (bp/make-authz-orthogonal)
- policy.set_rules(policy.Rules.load_json(data, default_rule))
+ # Ensure backward compatibility with folsom/grizzly convention
+ # for extension rules
+ policies = policy.Rules.load_json(data, default_rule)
+ for pol in policies.keys():
+ if any([pol.startswith(depr_pol) for depr_pol in
+ DEPRECATED_POLICY_MAP.keys()]):
+ LOG.warn(_("Found deprecated policy rule:%s. Please consider "
+ "upgrading your policy configuration file"), pol)
+ pol_name, action = pol.rsplit(':', 1)
+ try:
+ new_actions = DEPRECATED_ACTION_MAP[action]
+ new_policies = DEPRECATED_POLICY_MAP[pol_name]
+ # bind new actions and policies together
+ for actual_policy in ['_'.join(item) for item in
+ itertools.product(new_actions,
+ new_policies)]:
+ if not actual_policy in policies:
+ # New policy, same rule
+ LOG.info(_("Inserting policy:%(new_policy)s in place "
+ "of deprecated policy:%(old_policy)s"),
+ {'new_policy': actual_policy,
+ 'old_policy': pol})
+ policies[actual_policy] = policies[pol]
+ # Remove old-style policy
+ del policies[pol]
+ except KeyError:
+ LOG.error(_("Backward compatibility unavailable for "
+ "deprecated policy %s. The policy will "
+ "not be enforced"), pol)
+ policy.set_rules(policies)
def _is_attribute_explicitly_set(attribute_name, resource, target):
"""Test of Policy Engine For Quantum"""
+import json
import StringIO
import urllib2
policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
}.items())
self.assertEqual(['xxx'], policy.get_admin_roles())
+
+ def _test_set_rules_with_deprecated_policy(self, input_rules,
+ expected_rules):
+ policy._set_rules(json.dumps(input_rules))
+ # verify deprecated policy has been removed
+ for pol in input_rules.keys():
+ self.assertNotIn(pol, common_policy._rules)
+ # verify deprecated policy was correctly translated. Iterate
+ # over items for compatibility with unittest2 in python 2.6
+ for rule in expected_rules:
+ self.assertIn(rule, common_policy._rules)
+ self.assertEqual(str(common_policy._rules[rule]),
+ expected_rules[rule])
+
+ def test_set_rules_with_deprecated_view_policy(self):
+ self._test_set_rules_with_deprecated_policy(
+ {'extension:router:view': 'rule:admin_or_owner'},
+ {'get_network:router:external': 'rule:admin_or_owner'})
+
+ def test_set_rules_with_deprecated_set_policy(self):
+ expected_policies = ['create_network:provider:network_type',
+ 'create_network:provider:physical_network',
+ 'create_network:provider:segmentation_id',
+ 'update_network:provider:network_type',
+ 'update_network:provider:physical_network',
+ 'update_network:provider:segmentation_id']
+ self._test_set_rules_with_deprecated_policy(
+ {'extension:provider_network:set': 'rule:admin_only'},
+ dict((policy, 'rule:admin_only') for policy in
+ expected_policies))