cipher = AES.new(cfg.CONF.auth_encryption_key[:32], AES.MODE_CFB, iv)
res = cipher.decrypt(auth[AES.block_size:])
return res
-
-
-def authenticate(con, service_type='cloudformation', service_name='heat-cfn'):
- """ Authenticate a user context. This authenticates either an
- EC2 style key context or a keystone user/pass context.
-
- In the case of EC2 style authentication this will also set the
- username in the context so we can use it to key in the database.
- """
-
- args = {
- 'project_id': con.tenant,
- 'auth_url': con.auth_url,
- 'service_type': service_type,
- 'service_name': service_name,
- }
-
- if con.password is not None:
- credentials = {
- 'username': con.username,
- 'api_key': con.password,
- }
- elif con.auth_token is not None:
- credentials = {
- 'username': con.service_user,
- 'api_key': con.service_password,
- 'proxy_token': con.auth_token,
- 'proxy_tenant_id': con.tenant_id,
- }
- else:
- logger.error("Authentication failed, no password or auth_token!")
- return None
-
- args.update(credentials)
- try:
- # Workaround for issues with python-keyring, need no_cache=True
- # ref https://bugs.launchpad.net/python-novaclient/+bug/1020238
- # TODO(shardy): May be able to remove when the bug above is fixed
- nova = client.Client(no_cache=True, **args)
- except TypeError:
- # for compatibility with essex, which doesn't have no_cache=True
- # TODO(shardy): remove when we no longer support essex
- nova = client.Client(**args)
-
- nova.authenticate()
- return nova
if service_type in self._nova:
return self._nova[service_type]
- self._nova[service_type] = auth.authenticate(self.context,
- service_type=service_type,
- service_name=None)
- return self._nova[service_type]
+ con = self.context
+ args = {
+ 'project_id': con.tenant,
+ 'auth_url': con.auth_url,
+ 'service_type': service_type,
+ }
+
+ if con.password is not None:
+ args['username'] = con.username
+ args['api_key'] = con.password
+ elif con.auth_token is not None:
+ args['username'] = con.service_user
+ args['api_key'] = con.service_password
+ args['proxy_token'] = con.auth_token
+ args['proxy_tenant_id'] = con.tenant_id
+ else:
+ logger.error("Nova connection failed, no password or auth_token!")
+ return None
+
+ client = None
+ try:
+ # Workaround for issues with python-keyring, need no_cache=True
+ # ref https://bugs.launchpad.net/python-novaclient/+bug/1020238
+ # TODO(shardy): May be able to remove when the bug above is fixed
+ client = nc.Client(no_cache=True, **args)
+ client.authenticate()
+ self._nova[service_type] = client
+ except TypeError:
+ # for compatibility with essex, which doesn't have no_cache=True
+ # TODO(shardy): remove when we no longer support essex
+ client = nc.Client(**args)
+ client.authenticate()
+ self._nova[service_type] = client
+
+ return client
def swift(self):
if swiftclient_present == False:
from heat.common import config
from heat.common import context
-from heat.engine import auth
from heat.engine import identifier
from heat.openstack.common import cfg
from heat.openstack.common import rpc
ctx.username = user
self.m.StubOutWithMock(ctx, 'tenant_id')
ctx.tenant_id = 't'
- self.m.StubOutWithMock(auth, 'authenticate')
return ctx
def _dummy_GET_request(self, params={}):
from heat.common import config
from heat.common import context
-from heat.engine import auth
from heat.openstack.common import cfg
from heat.openstack.common import rpc
import heat.openstack.common.rpc.common as rpc_common
ctx = context.get_admin_context()
self.m.StubOutWithMock(ctx, 'username')
ctx.username = user
- self.m.StubOutWithMock(auth, 'authenticate')
return ctx
def _dummy_GET_request(self, params={}):
from heat.common import config
from heat.common import context
-from heat.engine import auth
from heat.engine import identifier
from heat.openstack.common import cfg
from heat.openstack.common import rpc
ctx.username = user
self.m.StubOutWithMock(ctx, 'tenant_id')
ctx.tenant_id = self.tenant
- self.m.StubOutWithMock(auth, 'authenticate')
return ctx
def _environ(self, path):
def test_validate_volumeattach_valid(self):
t = json.loads(test_template_volumeattach % 'vdq')
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
stack = parser.Stack(None, 'test_stack', parser.Template(t))
self.m.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
def test_validate_volumeattach_invalid(self):
t = json.loads(test_template_volumeattach % 'sda')
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
stack = parser.Stack(None, 'test_stack', parser.Template(t))
self.m.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
t = json.loads(test_template_ref % 'WikiDatabase')
t['Parameters']['KeyName']['Value'] = 'test'
params = {}
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
self.m.StubOutWithMock(instances.Instance, 'nova')
instances.Instance.nova().AndReturn(self.fc)
t = json.loads(test_template_ref % 'WikiDatabasez')
t['Parameters']['KeyName']['Value'] = 'test'
params = {}
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
self.m.StubOutWithMock(instances.Instance, 'nova')
instances.Instance.nova().AndReturn(self.fc)
t = json.loads(test_template_findinmap_valid)
t['Parameters']['KeyName']['Value'] = 'test'
params = {}
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
self.m.StubOutWithMock(instances.Instance, 'nova')
instances.Instance.nova().AndReturn(self.fc)
t = json.loads(test_template_findinmap_invalid)
t['Parameters']['KeyName']['Value'] = 'test'
params = {}
- self.m.StubOutWithMock(auth, 'authenticate')
- auth.authenticate(None).AndReturn(True)
self.m.StubOutWithMock(instances.Instance, 'nova')
instances.Instance.nova().AndReturn(self.fc)