"""
import collections
-import itertools
import logging as std_logging
import re
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions
-from neutron.i18n import _LE, _LI, _LW
+from neutron.i18n import _LE, _LW
from neutron.openstack.common import policy
_ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
-# Maps deprecated 'extension' policies to new-style policies
-DEPRECATED_POLICY_MAP = {
- 'extension:provider_network':
- ['network:provider:network_type',
- 'network:provider:physical_network',
- 'network:provider:segmentation_id'],
- 'extension:router':
- ['network:router:external'],
- 'extension:port_binding':
- ['port:binding:vif_type', 'port:binding:vif_details',
- 'port:binding:profile', 'port:binding:host_id']
-}
-DEPRECATED_ACTION_MAP = {
- 'view': ['get'],
- 'set': ['create', 'update']
-}
def reset():
"""
LOG.debug("Loading policies from file: %s", _ENFORCER.policy_path)
- # Ensure backward compatibility with folsom/grizzly convention
- # for extension rules
- for pol in policies.keys():
- if any([pol.startswith(depr_pol) for depr_pol in
- DEPRECATED_POLICY_MAP.keys()]):
- LOG.warn(_LW("Found deprecated policy rule:%s. Please consider "
- "upgrading your policy configuration file"), pol)
- pol_name, action = pol.rsplit(':', 1)
- try:
- new_actions = DEPRECATED_ACTION_MAP[action]
- new_policies = DEPRECATED_POLICY_MAP[pol_name]
- # bind new actions and policies together
- for actual_policy in ['_'.join(item) for item in
- itertools.product(new_actions,
- new_policies)]:
- if actual_policy not in policies:
- # New policy, same rule
- LOG.info(_LI("Inserting policy:%(new_policy)s in "
- "place of deprecated "
- "policy:%(old_policy)s"),
- {'new_policy': actual_policy,
- 'old_policy': pol})
- policies[actual_policy] = policies[pol]
- # Remove old-style policy
- del policies[pol]
- except KeyError:
- LOG.error(_LE("Backward compatibility unavailable for "
- "deprecated policy %s. The policy will "
- "not be enforced"), pol)
init()
_ENFORCER.set_rules(policies, overwrite)
def test_enforce_tenant_id_check_invalid_parent_resource_raises(self):
self._test_enforce_tenant_id_raises('tenant_id:%(foobaz_tenant_id)s')
- def _test_set_rules_with_deprecated_policy(self, input_rules,
- expected_rules):
- policy.set_rules(input_rules.copy())
- # verify deprecated policy has been removed
- for pol in input_rules.keys():
- self.assertNotIn(pol, policy._ENFORCER.rules)
- # verify deprecated policy was correctly translated. Iterate
- # over items for compatibility with unittest2 in python 2.6
- for rule in expected_rules:
- self.assertIn(rule, policy._ENFORCER.rules)
- self.assertEqual(str(policy._ENFORCER.rules[rule]),
- expected_rules[rule])
-
- def test_set_rules_with_deprecated_view_policy(self):
- self._test_set_rules_with_deprecated_policy(
- {'extension:router:view': 'rule:admin_or_owner'},
- {'get_network:router:external': 'rule:admin_or_owner'})
-
- def test_set_rules_with_deprecated_set_policy(self):
- expected_policies = ['create_network:provider:network_type',
- 'create_network:provider:physical_network',
- 'create_network:provider:segmentation_id',
- 'update_network:provider:network_type',
- 'update_network:provider:physical_network',
- 'update_network:provider:segmentation_id']
- self._test_set_rules_with_deprecated_policy(
- {'extension:provider_network:set': 'rule:admin_only'},
- dict((policy, 'rule:admin_only') for policy in
- expected_policies))
-
def test_process_rules(self):
action = "create_" + FAKE_RESOURCE_NAME
# Construct RuleChecks for an action, attribute and subattribute