]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Unit test coverage for user resources, plus some user fixes.
authorSteve Baker <sbaker@redhat.com>
Fri, 21 Sep 2012 03:18:36 +0000 (15:18 +1200)
committerSteve Baker <sbaker@redhat.com>
Fri, 21 Sep 2012 03:33:27 +0000 (15:33 +1200)
- User 'Policies' attribute removed since it shouldn't exist
- fixed 'SecretAccessKey' attribute in AccessKey

Change-Id: Iff7cf0bda074cb615045e7236f274cafa56909f6

heat/engine/user.py
heat/tests/test_user.py [new file with mode: 0644]

index 45920e1b7201c85eb138e665d8d97f1aff9ee83b..67d681cd6e15f1119ba4d24c4fa54bc0c08498d8 100644 (file)
@@ -32,6 +32,9 @@ class DummyId:
     def __init__(self, id):
         self.id = id
 
+    def __eq__(self, other):
+        return self.id == other.id
+
 
 class User(Resource):
     properties_schema = {'Path': {'Type': 'String'},
@@ -100,16 +103,9 @@ class User(Resource):
         return unicode(self.physical_resource_name())
 
     def FnGetAtt(self, key):
-        res = None
-        if key == 'Policies':
-            res = self.properties['Policies']
-        else:
-            raise exception.InvalidTemplateAttribute(
-                    resource=self.physical_resource_name(), key=key)
-
-        logger.info('%s.GetAtt(%s) == %s' % (self.physical_resource_name(),
-                                             key, res))
-        return unicode(res)
+        #TODO Implement Arn attribute
+        raise exception.InvalidTemplateAttribute(
+                resource=self.physical_resource_name(), key=key)
 
 
 class AccessKey(Resource):
@@ -129,15 +125,16 @@ class AccessKey(Resource):
         tenant_id = self.context.tenant_id
         users = self.keystone().users.list(tenant_id=tenant_id)
         for u in users:
-            if u.name == self.properties['UserName']:
+            if u.name == username:
                 return u
         return None
 
     def handle_create(self):
-        user = self._user_from_name(self.properties['UserName'])
+        username = self.properties['UserName']
+        user = self._user_from_name(username)
         if user is None:
             raise exception.NotFound('could not find user %s' %
-                                     self.properties['UserName'])
+                                     username)
 
         tenant_id = self.context.tenant_id
         cred = self.keystone().ec2.create(user.id, tenant_id)
@@ -177,7 +174,7 @@ class AccessKey(Resource):
         res = None
         if key == 'UserName':
             res = self.properties['UserName']
-        if key == 'SecretAccessKey':
+        elif key == 'SecretAccessKey':
             res = self._secret_accesskey()
         else:
             raise exception.InvalidTemplateAttribute(
diff --git a/heat/tests/test_user.py b/heat/tests/test_user.py
new file mode 100644 (file)
index 0000000..508edc5
--- /dev/null
@@ -0,0 +1,229 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+import sys
+import os
+
+import json
+import nose
+import mox
+import unittest
+
+from nose.plugins.attrib import attr
+
+from heat.common import exception
+from heat.engine import parser
+from heat.engine import user
+from heat.tests.v1_1 import fakes
+from keystoneclient.v2_0 import users
+from keystoneclient.v2_0 import ec2
+
+
+@attr(tag=['unit', 'resource'])
+@attr(speed='fast')
+class UserTest(unittest.TestCase):
+    def setUp(self):
+        self.m = mox.Mox()
+        self.fc = fakes.FakeClient()
+        self.fc.users = users.UserManager(None)
+        self.fc.ec2 = ec2.CredentialsManager(None)
+        self.m.StubOutWithMock(user.User, 'keystone')
+        self.m.StubOutWithMock(user.AccessKey, 'keystone')
+        self.m.StubOutWithMock(self.fc.users, 'create')
+        self.m.StubOutWithMock(self.fc.users, 'get')
+        self.m.StubOutWithMock(self.fc.users, 'delete')
+        self.m.StubOutWithMock(self.fc.users, 'list')
+        self.m.StubOutWithMock(self.fc.ec2, 'create')
+        self.m.StubOutWithMock(self.fc.ec2, 'get')
+        self.m.StubOutWithMock(self.fc.ec2, 'delete')
+
+    def tearDown(self):
+        self.m.UnsetStubs()
+        print "UserTest teardown complete"
+
+    def load_template(self):
+        self.path = os.path.dirname(os.path.realpath(__file__)).\
+            replace('heat/tests', 'templates')
+        f = open("%s/Rails_Single_Instance.template" % self.path)
+        t = json.loads(f.read())
+        f.close()
+        return t
+
+    def parse_stack(self, t):
+        class DummyContext():
+            tenant_id = 'test_tenant'
+            username = 'test_username'
+            password = 'password'
+            auth_url = 'http://localhost:5000/v2.0'
+        t['Parameters']['KeyName']['Value'] = 'test'
+        t['Parameters']['DBRootPassword']['Value'] = 'test'
+        t['Parameters']['DBUsername']['Value'] = 'test'
+        t['Parameters']['DBPassword']['Value'] = 'test'
+        stack = parser.Stack(DummyContext(), 'test_stack', parser.Template(t),
+                             stack_id=-1)
+
+        return stack
+
+    def create_user(self, t, stack, resource_name):
+        resource = user.User(resource_name,
+                                      t['Resources'][resource_name],
+                                      stack)
+        self.assertEqual(None, resource.validate())
+        self.assertEqual(None, resource.create())
+        self.assertEqual(user.User.CREATE_COMPLETE, resource.state)
+        return resource
+
+    def create_access_key(self, t, stack, resource_name):
+        resource = user.AccessKey(resource_name,
+                                      t['Resources'][resource_name],
+                                      stack)
+        self.assertEqual(None, resource.validate())
+        self.assertEqual(None, resource.create())
+        self.assertEqual(user.AccessKey.CREATE_COMPLETE,
+                         resource.state)
+        return resource
+
+    def test_user(self):
+
+        fake_user = users.User(self.fc.users, {'id': '1'})
+        user.User.keystone().AndReturn(self.fc)
+        self.fc.users.create('test_stack.CfnUser',
+                             '',
+                             'test_stack.CfnUser@heat-api.org',
+                             enabled=True,
+                             tenant_id='test_tenant').AndReturn(fake_user)
+
+        # delete script
+        user.User.keystone().AndReturn(self.fc)
+        self.fc.users.get(user.DummyId('1')).AndRaise(Exception('not found'))
+
+        user.User.keystone().AndReturn(self.fc)
+        self.fc.users.get(user.DummyId('1')).AndReturn(fake_user)
+        self.fc.users.delete(fake_user).AndRaise(Exception('delete failed'))
+
+        self.fc.users.delete(fake_user).AndReturn(None)
+
+        self.m.ReplayAll()
+
+        t = self.load_template()
+        stack = self.parse_stack(t)
+
+        resource = self.create_user(t, stack, 'CfnUser')
+        self.assertEqual('1', resource.instance_id)
+        self.assertEqual('test_stack.CfnUser', resource.FnGetRefId())
+
+        self.assertEqual('CREATE_COMPLETE', resource.state)
+        self.assertEqual(user.User.UPDATE_REPLACE,
+                  resource.handle_update())
+
+        resource.instance_id = None
+        self.assertEqual(None, resource.delete())
+        self.assertEqual('DELETE_COMPLETE', resource.state)
+
+        resource.instance_id = '1'
+        resource.state_set('CREATE_COMPLETE')
+        self.assertEqual('CREATE_COMPLETE', resource.state)
+
+        self.assertEqual(None, resource.delete())
+        self.assertEqual('DELETE_COMPLETE', resource.state)
+
+        resource.state_set('CREATE_COMPLETE')
+        self.assertEqual('CREATE_COMPLETE', resource.state)
+
+        self.assertEqual(None, resource.delete())
+        self.assertEqual('DELETE_COMPLETE', resource.state)
+        self.m.VerifyAll()
+
+    def test_access_key(self):
+
+        fake_user = users.User(self.fc.users, {'id': '1',
+                                               'name': 'test_stack.CfnUser'})
+        fake_cred = ec2.EC2(self.fc.ec2, {
+                        'access': '03a4967889d94a9c8f707d267c127a3d',
+                        'secret': 'd5fd0c08f8cc417ead0355c67c529438'})
+
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.users.list(tenant_id='test_tenant').AndReturn([fake_user])
+
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.ec2.create('1', 'test_tenant').AndReturn(fake_cred)
+
+        # fetch secret key
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.users.list(tenant_id='test_tenant').AndReturn([fake_user])
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.ec2.get('1',
+                '03a4967889d94a9c8f707d267c127a3d').AndReturn(fake_cred)
+
+        # delete script
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.users.list(tenant_id='test_tenant').AndReturn([fake_user])
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.ec2.delete('1',
+                           '03a4967889d94a9c8f707d267c127a3d').AndReturn(None)
+
+        self.m.ReplayAll()
+
+        t = self.load_template()
+        stack = self.parse_stack(t)
+
+        resource = self.create_access_key(t, stack, 'HostKeys')
+
+        self.assertEqual(user.AccessKey.UPDATE_REPLACE,
+                  resource.handle_update())
+        self.assertEqual('03a4967889d94a9c8f707d267c127a3d',
+                         resource.instance_id)
+
+        self.assertEqual('d5fd0c08f8cc417ead0355c67c529438',
+                         resource._secret)
+
+        self.assertEqual(resource.FnGetAtt('UserName'), 'test_stack.CfnUser')
+        resource._secret = None
+        self.assertEqual(resource.FnGetAtt('SecretAccessKey'),
+                         'd5fd0c08f8cc417ead0355c67c529438')
+        try:
+            resource.FnGetAtt('Foo')
+        except exception.InvalidTemplateAttribute:
+            pass
+        else:
+            raise Exception('Expected InvalidTemplateAttribute')
+
+        self.assertEqual(None, resource.delete())
+        self.m.VerifyAll()
+
+    def test_access_key_no_user(self):
+
+        user.AccessKey.keystone().AndReturn(self.fc)
+        self.fc.users.list(tenant_id='test_tenant').AndReturn([])
+
+        self.m.ReplayAll()
+
+        t = self.load_template()
+        stack = self.parse_stack(t)
+
+        resource = user.AccessKey('HostKeys',
+                                      t['Resources']['HostKeys'],
+                                      stack)
+        self.assertEqual('could not find user test_stack.CfnUser',
+                         resource.create())
+        self.assertEqual(user.AccessKey.CREATE_FAILED,
+                         resource.state)
+
+        self.m.VerifyAll()
+
+    # allows testing of the test directly, shown below
+    if __name__ == '__main__':
+        sys.argv.append(__file__)
+        nose.main()