# Use request_id if already set
req_id = req.environ.get(request_id.ENV_REQUEST_ID)
+ # Get the auth token
+ auth_token = req.headers.get('X_AUTH_TOKEN',
+ req.headers.get('X_STORAGE_TOKEN'))
+
# Create a context with the authentication data
ctx = context.Context(user_id, tenant_id, roles=roles,
user_name=user_name, tenant_name=tenant_name,
- request_id=req_id)
+ request_id=req_id, auth_token=auth_token)
# Inject the context...
req.environ['neutron.context'] = ctx
class RPCDispatcher(rpc_dispatcher.RPCDispatcher):
def __call__(self, incoming):
- LOG.debug('Incoming RPC: ctxt:%s message:%s', incoming.ctxt,
+ # NOTE(yamahata): '***' is chosen for consistency with
+ # openstack.common.strutils.mask_password
+ sanitize_key_list = ('auth_token', )
+ sanitized_ctxt = dict((k, '***' if k in sanitize_key_list else v)
+ for (k, v) in incoming.ctxt.items())
+ LOG.debug('Incoming RPC: ctxt:%s message:%s', sanitized_ctxt,
incoming.message)
return super(RPCDispatcher, self).__call__(incoming)
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
roles=None, timestamp=None, load_admin_roles=True,
request_id=None, tenant_name=None, user_name=None,
- overwrite=True, **kwargs):
+ overwrite=True, auth_token=None, **kwargs):
"""Object initialization.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
- super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
+ super(ContextBase, self).__init__(auth_token=auth_token,
+ user=user_id, tenant=tenant_id,
is_admin=is_admin,
request_id=request_id)
self.user_name = user_name
'tenant_name': self.tenant_name,
'project_name': self.tenant_name,
'user_name': self.user_name,
+ 'auth_token': self.auth_token,
}
@classmethod
self.request.environ[request_id.ENV_REQUEST_ID] = req_id
self.request.get_response(self.middleware)
self.assertEqual(req_id, self.context.request_id)
+
+ def test_with_auth_token(self):
+ self.request.headers['X_PROJECT_ID'] = 'testtenantid'
+ self.request.headers['X_USER_ID'] = 'testuserid'
+ response = self.request.get_response(self.middleware)
+ self.assertEqual(response.status, '200 OK')
+ self.assertEqual(self.context.auth_token, 'testauthtoken')
+
+ def test_without_auth_token(self):
+ self.request.headers['X_PROJECT_ID'] = 'testtenantid'
+ self.request.headers['X_USER_ID'] = 'testuserid'
+ del self.request.headers['X_AUTH_TOKEN']
+ self.request.get_response(self.middleware)
+ self.assertIsNone(self.context.auth_token)
self.assertEqual('tenant_id', ctx.tenant)
self.assertIsNone(ctx.user_name)
self.assertIsNone(ctx.tenant_name)
+ self.assertIsNone(ctx.auth_token)
def test_neutron_context_create_logs_unknown_kwarg(self):
with mock.patch.object(context.LOG, 'debug') as mock_log:
ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
self.assertEqual('req_id_xxx', ctx.request_id)
+ def test_neutron_context_create_with_auth_token(self):
+ ctx = context.Context('user_id', 'tenant_id',
+ auth_token='auth_token_xxx')
+ self.assertEqual('auth_token_xxx', ctx.auth_token)
+
def test_neutron_context_to_dict(self):
ctx = context.Context('user_id', 'tenant_id')
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_name'])
self.assertIsNone(ctx_dict['tenant_name'])
self.assertIsNone(ctx_dict['project_name'])
+ self.assertIsNone(ctx_dict['auth_token'])
def test_neutron_context_to_dict_with_name(self):
ctx = context.Context('user_id', 'tenant_id',
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
self.assertEqual('tenant_name', ctx_dict['project_name'])
+ def test_neutron_context_to_dict_with_auth_token(self):
+ ctx = context.Context('user_id', 'tenant_id',
+ auth_token='auth_token_xxx')
+ ctx_dict = ctx.to_dict()
+ self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
+
def test_neutron_context_admin_to_dict(self):
self.db_api_session.return_value = 'fakesession'
ctx = context.get_admin_context()
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_id'])
self.assertIsNone(ctx_dict['tenant_id'])
+ self.assertIsNone(ctx_dict['auth_token'])
self.assertIsNotNone(ctx.session)
self.assertNotIn('session', ctx_dict)
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_id'])
self.assertIsNone(ctx_dict['tenant_id'])
+ self.assertIsNone(ctx_dict['auth_token'])
self.assertFalse(hasattr(ctx, 'session'))
def test_neutron_context_with_load_roles_true(self):