super(AccessKey, self).__init__(name, json_snippet, stack)
self._secret = None
+ def _get_userid(self):
+ """
+ Helper function to derive the keystone userid, which is stored in the
+ resource_id of the User associated with this key. We want to avoid
+ looking the name up via listing keystone users, as this requires admin
+ rights in keystone, so FnGetAtt which calls _secret_accesskey won't
+ work for normal non-admin users
+ """
+ # Lookup User resource by intrinsic reference (which is what is passed
+ # into the UserName parameter. Would be cleaner to just make the User
+ # resource return resource_id for FnGetRefId but the AWS definition of
+ # user does say it returns a user name not ID
+ for r in self.stack.resources:
+ refid = self.stack.resources[r].FnGetRefId()
+ if refid == self.properties['UserName']:
+ return self.stack.resources[r].resource_id
+
def handle_create(self):
- username = self.properties['UserName']
- user_id = self.keystone().get_user_by_name(username)
+ user_id = self._get_userid()
if user_id is None:
raise exception.NotFound('could not find user %s' %
- username)
+ self.properties['UserName'])
kp = self.keystone().get_ec2_keypair(user_id)
if not kp:
def handle_delete(self):
self.resource_id_set(None)
self._secret = None
- user_id = self.keystone().get_user_by_name(self.properties['UserName'])
+ user_id = self._get_userid()
if user_id and self.resource_id:
self.keystone().delete_ec2_keypair(user_id, self.resource_id)
'''
Return the user's access key, fetching it from keystone if necessary
'''
- user_id = self.keystone().get_user_by_name(self.properties['UserName'])
+ user_id = self._get_userid()
if self._secret is None:
if not self.resource_id:
logger.warn('could not get secret for %s Error:%s' %
from heat.openstack.common import cfg
-@attr(tag=['unit', 'resource'])
+@attr(tag=['unit', 'resource', 'Unit'])
@attr(speed='fast')
class UserTest(unittest.TestCase):
def setUp(self):
self.assertEqual(user.User.CREATE_COMPLETE, resource.state)
return resource
- def create_access_key(self, t, stack, resource_name):
- resource = user.AccessKey(resource_name,
- t['Resources'][resource_name],
- stack)
- self.assertEqual(None, resource.validate())
- self.assertEqual(None, resource.create())
- self.assertEqual(user.AccessKey.CREATE_COMPLETE,
- resource.state)
- return resource
-
def test_user(self):
self.m.StubOutWithMock(user.User, 'keystone')
self.assertEqual('DELETE_COMPLETE', resource.state)
self.m.VerifyAll()
- def test_access_key(self):
+ def create_access_key(self, t, stack, resource_name):
+ resource = user.AccessKey(resource_name,
+ t['Resources'][resource_name],
+ stack)
+ self.assertEqual(None, resource.validate())
+ self.assertEqual(None, resource.create())
+ self.assertEqual(user.AccessKey.CREATE_COMPLETE,
+ resource.state)
+ return resource
+ def test_access_key(self):
self.m.StubOutWithMock(user.AccessKey, 'keystone')
user.AccessKey.keystone().MultipleTimes().AndReturn(self.fc)
self.m.ReplayAll()
t = self.load_template()
+ # Override the Ref for UserName with a hard-coded name,
+ # so we don't need to create the User resource
+ t['Resources']['HostKeys']['Properties']['UserName'] =\
+ 'test_stack.CfnUser'
stack = self.parse_stack(t)
+ stack.resources['CfnUser'].resource_id = self.fc.user_id
resource = self.create_access_key(t, stack, 'HostKeys')
self.m.VerifyAll()
def test_access_key_no_user(self):
-
- self.m.StubOutWithMock(user.AccessKey, 'keystone')
- user.AccessKey.keystone().MultipleTimes().AndReturn(self.fc)
-
self.m.ReplayAll()
t = self.load_template()
+ # Set the resource properties UserName to an unknown user
+ t['Resources']['HostKeys']['Properties']['UserName'] =\
+ 'test_stack.NoExist'
stack = self.parse_stack(t)
+ stack.resources['CfnUser'].resource_id = self.fc.user_id
- # Set the resource properties to an unknown user
- t['Resources']['HostKeys']['Properties']['UserName'] = 'NoExist'
resource = user.AccessKey('HostKeys',
t['Resources']['HostKeys'],
stack)
- self.assertEqual('could not find user NoExist',
+ self.assertEqual('could not find user test_stack.NoExist',
resource.create())
self.assertEqual(user.AccessKey.CREATE_FAILED,
resource.state)
self.m.VerifyAll()
- # allows testing of the test directly, shown below
- if __name__ == '__main__':
- sys.argv.append(__file__)
- nose.main()
+# allows testing of the test directly, shown below
+if __name__ == '__main__':
+ sys.argv.append(__file__)
+ nose.main()