get_admin_roles was introduced so that contextes generated from
within plugins could be used for policy checks. This was the case
up to the Havana release as several plugins invoked the policy
engine directly to authorize requests.
This was an incorrect behaviour and has now been fixed, meaning
that get_admin_roles is no longer need and can be safely removed.
This will result in a leaner and more reliable codebase. Indeed the
function being removed here was the cause of several bugs where the
policy engine was initialized too early in the server bootstrap
process.
While this patch removes the feature it does not remove the
load_admin_roles parameter from context.get_admin_context. Doing so
will break other projects such as neutron-lbaas. The parameter is
deprecated by this patch and an appropriate warning emitted.
As a consequence neutron's will now no longer perform policy checks
when context.is_admin=True. This flag is instead set either when
a context is explicitly created for granting admin privileges, or
when Neutron is operating in noauth mode. In the latter case every
request is treated by neutron as an admin request, and get_admin_roles
is simply ensuring the appropriate roles get pushed into the context
so that the policy engine will grant admin rights to the request.
This behaviour is probably just a waste of resource; also it is not
adding anything from a security perspective.
On the other hand not performing checks when context.is_admin is
True should not pose a security threat either in noauth mode or
with the keystone middleware. In the former case the software keeps
operating assuming admin rights for every requests, whereas in the
latter case the keystone middleware will always supply a context
with the appropriate roles, and there is no way for an attacker
to trick keystonemiddleware into generating a context for which
is_admin=True.
Finally, this patch also does some non-trivial changes in test_l3.py
as some tests were mocking context.to_dict ignoring the is_admin flag.
Closes-Bug: #
1446021
Change-Id: I8a5c02712a0b43f3e36a4f14620ebbd73fbfb03f
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
if not tenant_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
- return context.Context(user_id, tenant_id,
- load_admin_roles=False, **rpc_ctxt_dict)
+ return context.Context(user_id, tenant_id, **rpc_ctxt_dict)
class Service(service.Service):
import copy
import datetime
+from debtcollector import removals
from oslo_context import context as oslo_context
from oslo_log import log as logging
"""
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
- roles=None, timestamp=None, load_admin_roles=True,
- request_id=None, tenant_name=None, user_name=None,
- overwrite=True, auth_token=None, **kwargs):
+ roles=None, timestamp=None, request_id=None, tenant_name=None,
+ user_name=None, overwrite=True, auth_token=None, **kwargs):
"""Object initialization.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
- elif self.is_admin and load_admin_roles:
- # Ensure context is populated with admin roles
- admin_roles = policy.get_admin_roles()
- if admin_roles:
- self.roles = list(set(self.roles) | set(admin_roles))
@property
def project_id(self):
return self._session
+@removals.removed_kwarg('load_admin_roles')
def get_admin_context(read_deleted="no", load_admin_roles=True):
return Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted,
- load_admin_roles=load_admin_roles,
overwrite=False)
:return: Returns True if access is permitted else False.
"""
+ # If we already know the context has admin rights do not perform an
+ # additional check and authorize the operation
+ if context.is_admin:
+ return True
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
match_rule, target, credentials = _prepare_check(context,
:raises neutron.openstack.common.policy.PolicyNotAuthorized:
if verification fails.
"""
+ # If we already know the context has admin rights do not perform an
+ # additional check and authorize the operation
+ if context.is_admin:
+ return True
rule, target, credentials = _prepare_check(context,
action,
target,
elif hasattr(rule, 'rules'):
for rule in rule.rules:
_extract_roles(rule, roles)
-
-
-def get_admin_roles():
- """Return a list of roles which are granted admin rights according
- to policy settings.
- """
- # NOTE(salvatore-orlando): This function provides a solution for
- # populating implicit contexts with the appropriate roles so that
- # they correctly pass policy checks, and will become superseded
- # once all explicit policy checks are removed from db logic and
- # plugin modules. For backward compatibility it returns the literal
- # admin if ADMIN_CTX_POLICY is not defined
- init()
- if not _ENFORCER.rules or ADMIN_CTX_POLICY not in _ENFORCER.rules:
- return ['admin']
- try:
- admin_ctx_rule = _ENFORCER.rules[ADMIN_CTX_POLICY]
- except (KeyError, TypeError):
- return
- roles = []
- _extract_roles(admin_ctx_rule, roles)
- return roles
tenant_id=None,
is_admin=True,
read_deleted="no",
- load_admin_roles=True,
overwrite=False)
facade = session.EngineFacade(db_url, mysql_sql_mode='STRICT_ALL_TABLES')
ctx._session = facade.get_session(autocommit=False, expire_on_commit=True)
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
- ipv6_ra_mode=None, ipv6_address_mode=None):
+ ipv6_ra_mode=None, ipv6_address_mode=None,
+ tenant_id=None, set_context=False):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
gateway_ip=gateway,
- tenant_id=network['network']['tenant_id'],
+ tenant_id=(tenant_id or
+ network['network']['tenant_id']),
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
host_routes=host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
- ipv6_address_mode=ipv6_address_mode)
+ ipv6_address_mode=ipv6_address_mode,
+ set_context=set_context)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
host_routes=None,
shared=None,
ipv6_ra_mode=None,
- ipv6_address_mode=None):
+ ipv6_address_mode=None,
+ tenant_id=None,
+ set_context=False):
with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
- ipv6_address_mode=ipv6_address_mode)
+ ipv6_address_mode=ipv6_address_mode,
+ tenant_id=tenant_id,
+ set_context=set_context)
yield subnet
@contextlib.contextmanager
def _test_validate_subnet_ipv6_modes(self, cur_subnet=None,
expect_success=True, **modes):
plugin = manager.NeutronManager.get_plugin()
- ctx = context.get_admin_context(load_admin_roles=False)
+ ctx = context.get_admin_context()
new_subnet = {'ip_version': 6,
'cidr': 'fe80::/64',
'enable_dhcp': True,
plugin = manager.NeutronManager.get_plugin()
e = self.assertRaises(exception,
plugin._validate_subnet,
- context.get_admin_context(
- load_admin_roles=False),
+ context.get_admin_context(),
subnet)
self.assertThat(
str(e),
expected_code=error_code)
def test_router_add_interface_subnet_with_bad_tenant_returns_404(self):
- with mock.patch('neutron.context.Context.to_dict') as tdict:
- tenant_id = _uuid()
- admin_context = {'roles': ['admin']}
- tenant_context = {'tenant_id': 'bad_tenant',
- 'roles': []}
- tdict.return_value = admin_context
- with self.router(tenant_id=tenant_id) as r:
- with self.network(tenant_id=tenant_id) as n:
- with self.subnet(network=n) as s:
- tdict.return_value = tenant_context
- err_code = exc.HTTPNotFound.code
- self._router_interface_action('add',
- r['router']['id'],
- s['subnet']['id'],
- None,
- err_code)
- tdict.return_value = admin_context
- body = self._router_interface_action('add',
- r['router']['id'],
- s['subnet']['id'],
- None)
- self.assertIn('port_id', body)
- tdict.return_value = tenant_context
- self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None,
- err_code)
+ tenant_id = _uuid()
+ with self.router(tenant_id=tenant_id, set_context=True) as r:
+ with self.network(tenant_id=tenant_id, set_context=True) as n:
+ with self.subnet(network=n, set_context=True) as s:
+ err_code = exc.HTTPNotFound.code
+ self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ expected_code=err_code,
+ tenant_id='bad_tenant')
+ body = self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
+ self.assertIn('port_id', body)
+ self._router_interface_action('remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ expected_code=err_code,
+ tenant_id='bad_tenant')
def test_router_add_interface_subnet_with_port_from_other_tenant(self):
tenant_id = _uuid()
HTTPBadRequest.code)
def test_router_add_interface_port_bad_tenant_returns_404(self):
- with mock.patch('neutron.context.Context.to_dict') as tdict:
- admin_context = {'roles': ['admin']}
- tenant_context = {'tenant_id': 'bad_tenant',
- 'roles': []}
- tdict.return_value = admin_context
- with self.router() as r:
- with self.port() as p:
- tdict.return_value = tenant_context
- err_code = exc.HTTPNotFound.code
- self._router_interface_action('add',
- r['router']['id'],
- None,
- p['port']['id'],
- err_code)
- tdict.return_value = admin_context
- self._router_interface_action('add',
- r['router']['id'],
- None,
- p['port']['id'])
+ tenant_id = _uuid()
+ with self.router(tenant_id=tenant_id, set_context=True) as r:
+ with self.network(tenant_id=tenant_id, set_context=True) as n:
+ with self.subnet(tenant_id=tenant_id, network=n,
+ set_context=True) as s:
+ with self.port(tenant_id=tenant_id, subnet=s,
+ set_context=True) as p:
+ err_code = exc.HTTPNotFound.code
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'],
+ expected_code=err_code,
+ tenant_id='bad_tenant')
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'],
+ tenant_id=tenant_id)
- tdict.return_value = tenant_context
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'],
- err_code)
+ # clean-up should fail as well
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'],
+ expected_code=err_code,
+ tenant_id='bad_tenant')
def test_router_add_interface_dup_subnet1_returns_400(self):
with self.router() as r:
tenant_id = 'tenant_id1'
self.assertRaises(exceptions.QuotaResourceUnknown,
quota.QUOTAS.limit_check,
- context.get_admin_context(load_admin_roles=False),
+ context.get_admin_context(),
tenant_id,
foobar=1)
'nexthop': '1.2.3.4'}]}
error = self.assertRaises(exception,
FAKE_SERVER._validate_subnet,
- neutron_context.get_admin_context(
- load_admin_roles=False),
+ neutron_context.get_admin_context(),
subnet)
self.assertThat(
str(error),
self.assertIsNone(ctx_dict['auth_token'])
self.assertFalse(hasattr(ctx, 'session'))
- def test_neutron_context_with_load_roles_true(self):
- ctx = context.get_admin_context()
- self.assertIn('admin', ctx.roles)
-
- def test_neutron_context_with_load_roles_false(self):
- ctx = context.get_admin_context(load_admin_roles=False)
- self.assertFalse(ctx.roles)
-
def test_neutron_context_elevated_retains_request_id(self):
ctx = context.Context('user_id', 'tenant_id')
self.assertFalse(ctx.is_admin)
def test_check_is_admin_with_admin_context_succeeds(self):
admin_context = context.get_admin_context()
+ # explicitly set roles as this test verifies user credentials
+ # with the policy engine
+ admin_context.roles = ['admin']
self.assertTrue(policy.check_is_admin(admin_context))
def test_check_is_admin_with_user_context_fails(self):
def test_enforce_tenant_id_check_invalid_parent_resource_raises(self):
self._test_enforce_tenant_id_raises('tenant_id:%(foobaz_tenant_id)s')
- def test_get_roles_context_is_admin_rule_missing(self):
- rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- "some_other_rule": "role:admin",
- }.items())
- policy.set_rules(common_policy.Rules(rules))
- # 'admin' role is expected for bw compatibility
- self.assertEqual(['admin'], policy.get_admin_roles())
-
- def test_get_roles_with_role_check(self):
- rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- policy.ADMIN_CTX_POLICY: "role:admin",
- }.items())
- policy.set_rules(common_policy.Rules(rules))
- self.assertEqual(['admin'], policy.get_admin_roles())
-
- def test_get_roles_with_rule_check(self):
- rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
- "some_other_rule": "role:admin",
- }.items())
- policy.set_rules(common_policy.Rules(rules))
- self.assertEqual(['admin'], policy.get_admin_roles())
-
- def test_get_roles_with_or_check(self):
- self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2",
- "rule1": "role:admin_1",
- "rule2": "role:admin_2"
- }.items())
- self.assertEqual(['admin_1', 'admin_2'],
- policy.get_admin_roles())
-
- def test_get_roles_with_other_rules(self):
- self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
- policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
- }.items())
- self.assertEqual(['xxx'], policy.get_admin_roles())
-
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
policy.set_rules(input_rules.copy())
Paste
PasteDeploy>=1.5.0
Routes>=1.12.3,!=2.0
+debtcollector>=0.3.0 # Apache-2.0
eventlet>=0.17.3
greenlet>=0.3.2
httplib2>=0.7.5