self.assertEqual(db_s.status, 'DELETE_COMPLETE')
+@attr(tag=['unit', 'engine-api', 'engine-manager'])
+@attr(speed='fast')
+class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
+
+ def setUp(self):
+ self.m = mox.Mox()
+ self.username = 'stack_manager_create_test_user'
+ self.tenant = 'stack_manager_create_test_tenant'
+ self.ctx = create_context(self.m, self.username, self.tenant)
+ auth.authenticate(self.ctx).AndReturn(True)
+
+ self.man = manager.EngineManager()
+
+ def tearDown(self):
+ self.m.UnsetStubs()
+
+ def test_stack_create(self):
+ stack_name = 'manager_create_test_stack'
+ params = {'foo': 'bar'}
+ template = '{ "Template": "data" }'
+
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.StubOutWithMock(parser, 'Template')
+ self.m.StubOutWithMock(parser, 'Parameters')
+ self.m.StubOutWithMock(parser, 'Stack')
+
+ parser.Template(template).AndReturn(stack.t)
+ parser.Parameters(stack_name,
+ stack.t,
+ params).AndReturn(stack.parameters)
+ parser.Stack(self.ctx, stack.name,
+ stack.t, stack.parameters).AndReturn(stack)
+
+ self.m.StubOutWithMock(stack, 'validate')
+ stack.validate().AndReturn({'Description': 'Successfully validated'})
+
+ self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+ manager.greenpool.spawn_n(stack.create)
+
+ self.m.ReplayAll()
+
+ result = self.man.create_stack(self.ctx, stack_name,
+ template, params, {})
+ self.assertEqual(result, stack.identifier())
+ self.assertTrue(isinstance(result, dict))
+ self.assertTrue(result['stack_id'])
+ self.m.VerifyAll()
+
+ def test_stack_create_verify_err(self):
+ stack_name = 'manager_create_verify_err_test_stack'
+ params = {'foo': 'bar'}
+ template = '{ "Template": "data" }'
+
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.StubOutWithMock(parser, 'Template')
+ self.m.StubOutWithMock(parser, 'Parameters')
+ self.m.StubOutWithMock(parser, 'Stack')
+
+ parser.Template(template).AndReturn(stack.t)
+ parser.Parameters(stack_name,
+ stack.t,
+ params).AndReturn(stack.parameters)
+ parser.Stack(self.ctx, stack.name,
+ stack.t, stack.parameters).AndReturn(stack)
+
+ self.m.StubOutWithMock(stack, 'validate')
+ error = {'Description': 'fubar'}
+ stack.validate().AndReturn(error)
+
+ self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+
+ self.m.ReplayAll()
+
+ result = self.man.create_stack(self.ctx, stack_name,
+ template, params, {})
+ self.assertEqual(result, error)
+ self.m.VerifyAll()
+
+ def test_stack_delete(self):
+ stack_name = 'manager_delete_test_stack'
+ stack = get_wordpress_stack(stack_name, self.ctx)
+ stack.store()
+
+ self.m.StubOutWithMock(parser.Stack, 'load')
+ self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+
+ parser.Stack.load(self.ctx, stack.id).AndReturn(stack)
+ manager.greenpool.spawn_n(stack.delete)
+
+ self.m.ReplayAll()
+
+ self.assertEqual(self.man.delete_stack(self.ctx,
+ stack.identifier(), {}),
+ None)
+ self.m.VerifyAll()
+
+ def test_stack_delete_nonexist(self):
+ stack_name = 'manager_delete_nonexist_test_stack'
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.ReplayAll()
+
+ self.assertRaises(AttributeError,
+ self.man.delete_stack,
+ self.ctx, stack.identifier(), {})
+ self.m.VerifyAll()
+
+ def test_stack_update(self):
+ stack_name = 'manager_update_test_stack'
+ params = {'foo': 'bar'}
+ template = '{ "Template": "data" }'
+
+ old_stack = get_wordpress_stack(stack_name, self.ctx)
+ old_stack.store()
+
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.StubOutWithMock(parser, 'Stack')
+ self.m.StubOutWithMock(parser.Stack, 'load')
+ parser.Stack.load(self.ctx, old_stack.id).AndReturn(old_stack)
+
+ self.m.StubOutWithMock(parser, 'Template')
+ self.m.StubOutWithMock(parser, 'Parameters')
+
+ parser.Template(template).AndReturn(stack.t)
+ parser.Parameters(stack_name,
+ stack.t,
+ params).AndReturn(stack.parameters)
+ parser.Stack(self.ctx, stack.name,
+ stack.t, stack.parameters).AndReturn(stack)
+
+ self.m.StubOutWithMock(stack, 'validate')
+ stack.validate().AndReturn({'Description': 'Successfully validated'})
+
+ self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+ manager.greenpool.spawn_n(old_stack.update, stack)
+
+ self.m.ReplayAll()
+
+ result = self.man.update_stack(self.ctx, old_stack.identifier(),
+ template, params, {})
+ self.assertEqual(result, old_stack.identifier())
+ self.assertTrue(isinstance(result, dict))
+ self.assertTrue(result['stack_id'])
+ self.m.VerifyAll()
+
+ def test_stack_update_verify_err(self):
+ stack_name = 'manager_update_verify_err_test_stack'
+ params = {'foo': 'bar'}
+ template = '{ "Template": "data" }'
+
+ old_stack = get_wordpress_stack(stack_name, self.ctx)
+ old_stack.store()
+
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.StubOutWithMock(parser, 'Stack')
+ self.m.StubOutWithMock(parser.Stack, 'load')
+ parser.Stack.load(self.ctx, old_stack.id).AndReturn(old_stack)
+
+ self.m.StubOutWithMock(parser, 'Template')
+ self.m.StubOutWithMock(parser, 'Parameters')
+
+ parser.Template(template).AndReturn(stack.t)
+ parser.Parameters(stack_name,
+ stack.t,
+ params).AndReturn(stack.parameters)
+ parser.Stack(self.ctx, stack.name,
+ stack.t, stack.parameters).AndReturn(stack)
+
+ self.m.StubOutWithMock(stack, 'validate')
+ error = {'Description': 'fubar'}
+ stack.validate().AndReturn(error)
+
+ self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+
+ self.m.ReplayAll()
+
+ result = self.man.update_stack(self.ctx, old_stack.identifier(),
+ template, params, {})
+ self.assertEqual(result, error)
+ self.m.VerifyAll()
+
+ def test_stack_update_nonexist(self):
+ stack_name = 'manager_update_nonexist_test_stack'
+ params = {'foo': 'bar'}
+ template = '{ "Template": "data" }'
+ stack = get_wordpress_stack(stack_name, self.ctx)
+
+ self.m.ReplayAll()
+
+ self.assertRaises(AttributeError,
+ self.man.update_stack,
+ self.ctx, stack.identifier(), template, params, {})
+ self.m.VerifyAll()
+
+
@attr(tag=['unit', 'engine-api', 'engine-manager'])
@attr(speed='fast')
class stackManagerTest(unittest.TestCase):
self.assertRaises(AttributeError, self.man.identify_stack,
self.ctx, 'wibble')
+ def test_stack_create_existing(self):
+ self.assertRaises(AttributeError, self.man.create_stack,
+ self.ctx, self.stack_name, self.stack.t, {}, {})
+
def test_stack_event_list(self):
el = self.man.list_events(self.ctx, self.stack_identity, {})