opts = [
cfg.StrOpt('auth_uri',
default=None,
- help=_("Authentication Endpoint URI")),
- cfg.StrOpt('keystone_ec2_uri',
- default=None,
- help=_("Keystone EC2 Service Endpoint URI"))
+ help=_("Authentication Endpoint URI"))
]
cfg.CONF.register_opts(opts, group='ec2authtoken')
else:
return cfg.CONF.ec2authtoken[name]
+ def _conf_get_keystone_ec2_uri(self):
+ auth_uri = self._conf_get('auth_uri')
+ if auth_uri.endswith('/'):
+ return '%sec2tokens' % auth_uri
+ return '%s/ec2tokens' % auth_uri
+
def _get_signature(self, req):
"""
Extract the signature from the request, this can be a get/post
# for httplib and urlparse
# pylint: disable-msg=E1101
- keystone_ec2_uri = self._conf_get('keystone_ec2_uri')
+ keystone_ec2_uri = self._conf_get_keystone_ec2_uri()
logger.info('Authenticating with %s' % keystone_ec2_uri)
o = urlparse.urlparse(keystone_ec2_uri)
if o.scheme == 'http':
req.headers['X-Tenant-Name'] = tenant
req.headers['X-Tenant-Id'] = tenant_id
req.headers['X-Auth-URL'] = self._conf_get('auth_uri')
- req.headers['X-Auth-EC2_URL'] = keystone_ec2_uri
metadata = result['access'].get('metadata', {})
roles = metadata.get('roles', [])
"""
def __init__(self, auth_token=None, username=None, password=None,
- aws_creds=None, aws_auth_uri=None, tenant=None,
+ aws_creds=None, tenant=None,
tenant_id=None, auth_url=None, roles=None, is_admin=False,
read_only=False, show_deleted=False,
owner_is_tenant=True, overwrite=True, **kwargs):
self.username = username
self.password = password
self.aws_creds = aws_creds
- self.aws_auth_uri = aws_auth_uri
self.tenant_id = tenant_id
self.auth_url = auth_url
self.roles = roles or []
'username': self.user,
'password': self.password,
'aws_creds': self.aws_creds,
- 'aws_auth_uri': self.aws_auth_uri,
'tenant': self.tenant,
'tenant_id': self.tenant_id,
'auth_url': self.auth_url,
username = None
password = None
aws_creds = None
- aws_auth_uri = None
if headers.get('X-Auth-User') is not None:
username = headers.get('X-Auth-User')
password = headers.get('X-Auth-Key')
elif headers.get('X-Auth-EC2-Creds') is not None:
aws_creds = headers.get('X-Auth-EC2-Creds')
- aws_auth_uri = headers.get('X-Auth-EC2-Url')
token = headers.get('X-Auth-Token')
tenant = headers.get('X-Tenant-Name')
req.context = self.make_context(auth_token=token,
tenant=tenant, tenant_id=tenant_id,
aws_creds=aws_creds,
- aws_auth_uri=aws_auth_uri,
username=username,
password=password,
auth_url=auth_url, roles=roles,
return req
def test_conf_get_paste(self):
- dummy_conf = {'auth_uri': 'abc',
- 'keystone_ec2_uri': 'xyz'}
+ dummy_conf = {'auth_uri': 'http://192.0.2.9/v2.0'}
ec2 = ec2token.EC2Token(app=None, conf=dummy_conf)
- self.assertEqual(ec2._conf_get('auth_uri'), 'abc')
- self.assertEqual(ec2._conf_get('keystone_ec2_uri'), 'xyz')
+ self.assertEqual(ec2._conf_get('auth_uri'), 'http://192.0.2.9/v2.0')
+ self.assertEqual(ec2._conf_get_keystone_ec2_uri(),
+ 'http://192.0.2.9/v2.0/ec2tokens')
def test_conf_get_opts(self):
- cfg.CONF.set_default('auth_uri', 'abc', group='ec2authtoken')
- cfg.CONF.set_default('keystone_ec2_uri', 'xyz', group='ec2authtoken')
+ cfg.CONF.set_default('auth_uri', 'http://192.0.2.9/v2.0/',
+ group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={})
- self.assertEqual(ec2._conf_get('auth_uri'), 'abc')
- self.assertEqual(ec2._conf_get('keystone_ec2_uri'), 'xyz')
+ self.assertEqual(ec2._conf_get('auth_uri'), 'http://192.0.2.9/v2.0/')
+ self.assertEqual(ec2._conf_get_keystone_ec2_uri(),
+ 'http://192.0.2.9/v2.0/ec2tokens')
def test_get_signature_param_old(self):
params = {'Signature': 'foo'}
"path": "/v1",
"body_hash": body_hash}})
req_headers = {'Content-Type': 'application/json'}
- req_path = '/foo'
+ req_path = '/v2.0/ec2tokens'
httplib.HTTPConnection.request('POST', req_path,
body=req_creds,
headers=req_headers).AndReturn(None)
httplib.HTTPConnection.close().AndReturn(None)
def test_call_ok(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
self.m.VerifyAll()
def test_call_ok_roles(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
self.m.VerifyAll()
def test_call_err_tokenid(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0/'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
self.m.VerifyAll()
def test_call_err_signature(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
self.m.VerifyAll()
def test_call_err_denied(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
self.m.VerifyAll()
def test_call_ok_v2(self):
- dummy_conf = {'auth_uri': 'http://123:5000/foo',
- 'keystone_ec2_uri': 'http://456:5000/foo'}
+ dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
params = {'AWSAccessKeyId': 'foo', 'Signature': 'xyz'}
req_env = {'SERVER_NAME': 'heat',