from neutron.db import api as db_api
from neutron.openstack.common import context as common_context
+from neutron.openstack.common import local
from neutron.openstack.common import log as logging
from neutron import policy
"""
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
- roles=None, timestamp=None, load_admin_roles=True, **kwargs):
+ roles=None, timestamp=None, load_admin_roles=True,
+ request_id=None, tenant_name=None, user_name=None,
+ overwrite=True, **kwargs):
"""Object initialization.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
indicates deleted records are visible, 'only' indicates that
*only* deleted records are visible.
+
+ :param overwrite: Set to False to ensure that the greenthread local
+ copy of the index is not overwritten.
+
+ :param kwargs: Extra arguments that might be present, but we ignore
+ because they possibly came in from older rpc messages.
"""
if kwargs:
LOG.warn(_('Arguments dropped when creating '
'context: %s'), kwargs)
+
super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
- is_admin=is_admin)
+ is_admin=is_admin,
+ request_id=request_id)
+ self.user_name = user_name
+ self.tenant_name = tenant_name
+
self.read_deleted = read_deleted
if not timestamp:
timestamp = datetime.utcnow()
admin_roles = policy.get_admin_roles()
if admin_roles:
self.roles = list(set(self.roles) | set(admin_roles))
+ # Allow openstack.common.log to access the context
+ if overwrite or not hasattr(local.store, 'context'):
+ local.store.context = self
@property
def project_id(self):
'is_admin': self.is_admin,
'read_deleted': self.read_deleted,
'roles': self.roles,
- 'timestamp': str(self.timestamp)}
+ 'timestamp': str(self.timestamp),
+ 'request_id': self.request_id,
+ 'tenant': self.tenant,
+ 'user': self.user,
+ 'tenant_name': self.tenant_name,
+ 'user_name': self.user_name,
+ }
@classmethod
def from_dict(cls, values):
tenant_id=None,
is_admin=True,
read_deleted=read_deleted,
- load_admin_roles=load_admin_roles)
+ load_admin_roles=load_admin_roles,
+ overwrite=False)
def get_admin_context_without_session(read_deleted="no"):
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
- def test_no_user_no_user_id(self):
+ def test_no_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '401 Unauthorized')
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuserid')
+ self.assertEqual(self.context.user, 'testuserid')
def test_with_tenant_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.tenant_id, 'testtenantid')
+ self.assertEqual(self.context.tenant, 'testtenantid')
def test_roles_no_admin(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
'role4', 'role5', 'AdMiN'])
self.assertEqual(self.context.is_admin, True)
+
+ def test_with_user_tenant_name(self):
+ self.request.headers['X_PROJECT_ID'] = 'testtenantid'
+ self.request.headers['X_USER_ID'] = 'testuserid'
+ self.request.headers['X_PROJECT_NAME'] = 'testtenantname'
+ self.request.headers['X_USER_NAME'] = 'testusername'
+ response = self.request.get_response(self.middleware)
+ self.assertEqual(response.status, '200 OK')
+ self.assertEqual(self.context.user_id, 'testuserid')
+ self.assertEqual(self.context.user_name, 'testusername')
+ self.assertEqual(self.context.tenant_id, 'testtenantid')
+ self.assertEqual(self.context.tenant_name, 'testtenantname')
# under the License.
import mock
+from testtools import matchers
from neutron import context
+from neutron.openstack.common import local
from neutron.tests import base
self.addCleanup(self._db_api_session_patcher.stop)
def test_neutron_context_create(self):
- cxt = context.Context('user_id', 'tenant_id')
- self.assertEqual('user_id', cxt.user_id)
- self.assertEqual('tenant_id', cxt.project_id)
+ ctx = context.Context('user_id', 'tenant_id')
+ self.assertEqual('user_id', ctx.user_id)
+ self.assertEqual('tenant_id', ctx.project_id)
+ self.assertEqual('tenant_id', ctx.tenant_id)
+ self.assertThat(ctx.request_id, matchers.StartsWith('req-'))
+ self.assertEqual('user_id', ctx.user)
+ self.assertEqual('tenant_id', ctx.tenant)
+ self.assertIsNone(ctx.user_name)
+ self.assertIsNone(ctx.tenant_name)
+
+ def test_neutron_context_create_with_name(self):
+ ctx = context.Context('user_id', 'tenant_id',
+ tenant_name='tenant_name', user_name='user_name')
+ # Check name is set
+ self.assertEqual('user_name', ctx.user_name)
+ self.assertEqual('tenant_name', ctx.tenant_name)
+ # Check user/tenant contains its ID even if user/tenant_name is passed
+ self.assertEqual('user_id', ctx.user)
+ self.assertEqual('tenant_id', ctx.tenant)
+
+ def test_neutron_context_create_with_request_id(self):
+ ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
+ self.assertEqual('req_id_xxx', ctx.request_id)
def test_neutron_context_to_dict(self):
- cxt = context.Context('user_id', 'tenant_id')
- cxt_dict = cxt.to_dict()
- self.assertEqual('user_id', cxt_dict['user_id'])
- self.assertEqual('tenant_id', cxt_dict['project_id'])
+ ctx = context.Context('user_id', 'tenant_id')
+ ctx_dict = ctx.to_dict()
+ self.assertEqual('user_id', ctx_dict['user_id'])
+ self.assertEqual('tenant_id', ctx_dict['project_id'])
+ self.assertEqual(ctx.request_id, ctx_dict['request_id'])
+ self.assertEqual('user_id', ctx_dict['user'])
+ self.assertEqual('tenant_id', ctx_dict['tenant'])
+ self.assertIsNone(ctx_dict['user_name'])
+ self.assertIsNone(ctx_dict['tenant_name'])
+
+ def test_neutron_context_to_dict_with_name(self):
+ ctx = context.Context('user_id', 'tenant_id',
+ tenant_name='tenant_name', user_name='user_name')
+ ctx_dict = ctx.to_dict()
+ self.assertEqual('user_name', ctx_dict['user_name'])
+ self.assertEqual('tenant_name', ctx_dict['tenant_name'])
def test_neutron_context_admin_to_dict(self):
self.db_api_session.return_value = 'fakesession'
- cxt = context.get_admin_context()
- cxt_dict = cxt.to_dict()
- self.assertIsNone(cxt_dict['user_id'])
- self.assertIsNone(cxt_dict['tenant_id'])
- self.assertIsNotNone(cxt.session)
- self.assertNotIn('session', cxt_dict)
+ ctx = context.get_admin_context()
+ ctx_dict = ctx.to_dict()
+ self.assertIsNone(ctx_dict['user_id'])
+ self.assertIsNone(ctx_dict['tenant_id'])
+ self.assertIsNotNone(ctx.session)
+ self.assertNotIn('session', ctx_dict)
def test_neutron_context_admin_without_session_to_dict(self):
- cxt = context.get_admin_context_without_session()
- cxt_dict = cxt.to_dict()
- self.assertIsNone(cxt_dict['user_id'])
- self.assertIsNone(cxt_dict['tenant_id'])
- try:
- cxt.session
- except Exception:
- pass
- else:
- self.assertFalse(True, 'without_session admin context'
- 'should has no session property!')
+ ctx = context.get_admin_context_without_session()
+ ctx_dict = ctx.to_dict()
+ self.assertIsNone(ctx_dict['user_id'])
+ self.assertIsNone(ctx_dict['tenant_id'])
+ self.assertFalse(hasattr(ctx, 'session'))
def test_neutron_context_with_load_roles_true(self):
ctx = context.get_admin_context()
def test_neutron_context_with_load_roles_false(self):
ctx = context.get_admin_context(load_admin_roles=False)
self.assertFalse(ctx.roles)
+
+ def test_neutron_context_elevated_retains_request_id(self):
+ ctx = context.Context('user_id', 'tenant_id')
+ self.assertFalse(ctx.is_admin)
+ req_id_before = ctx.request_id
+
+ elevated_ctx = ctx.elevated()
+ self.assertTrue(elevated_ctx.is_admin)
+ self.assertEqual(req_id_before, elevated_ctx.request_id)
+
+ def test_neutron_context_overwrite(self):
+ ctx1 = context.Context('user_id', 'tenant_id')
+ self.assertEqual(ctx1.request_id, local.store.context.request_id)
+
+ # If overwrite is not specified, request_id should be updated.
+ ctx2 = context.Context('user_id', 'tenant_id')
+ self.assertNotEqual(ctx2.request_id, ctx1.request_id)
+ self.assertEqual(ctx2.request_id, local.store.context.request_id)
+
+ # If overwrite is specified, request_id should be kept.
+ ctx3 = context.Context('user_id', 'tenant_id', overwrite=False)
+ self.assertNotEqual(ctx3.request_id, ctx2.request_id)
+ self.assertEqual(ctx2.request_id, local.store.context.request_id)
+
+ def test_neutron_context_get_admin_context_not_update_local_store(self):
+ ctx = context.Context('user_id', 'tenant_id')
+ req_id_before = local.store.context.request_id
+ self.assertEqual(ctx.request_id, req_id_before)
+
+ ctx_admin = context.get_admin_context()
+ self.assertEqual(req_id_before, local.store.context.request_id)
+ self.assertNotEqual(req_id_before, ctx_admin.request_id)