def setUp(self):
super(CloudWatchAlarmTest, self).setUp()
utils.setup_dummy_db()
+ self.ctx = utils.dummy_context()
def parse_stack(self):
t = template_format.parse(AWS_CloudWatch_Alarm)
- return utils.parse_stack(t)
+ self.stack = utils.parse_stack(t)
+ return self.stack
- @utils.wr_delete_after
+ @utils.stack_delete_after
def test_resource_create_good(self):
s = self.parse_stack()
- self.wr = s['test_me']
self.assertEqual(None, scheduler.TaskRunner(s['test_me'].create)())
+ @utils.stack_delete_after
def test_resource_create_failed(self):
s = self.parse_stack()
with patch.object(watchrule.WatchRule, 'store') as bad_store:
task_func = scheduler.TaskRunner(s['test_me'].create)
self.assertRaises(exception.ResourceFailure, task_func)
+ @utils.stack_delete_after
def test_resource_delete_good(self):
s = self.parse_stack()
self.assertEqual(None, scheduler.TaskRunner(s['test_me'].create)())
self.assertEqual(None, scheduler.TaskRunner(s['test_me'].delete)())
+ @utils.stack_delete_after
@utils.wr_delete_after
def test_resource_delete_notfound(self):
# if a resource is not found, handle_delete() should not raise
# an exception.
s = self.parse_stack()
- self.wr = s['test_me']
self.assertEqual(None, scheduler.TaskRunner(s['test_me'].create)())
+ res_name = self.stack['test_me'].physical_resource_name()
+ self.wr = watchrule.WatchRule.load(self.ctx,
+ watch_name=res_name)
+
with patch.object(watchrule.WatchRule, 'destroy') as bad_destroy:
bad_destroy.side_effect = exception.WatchRuleNotFound
self.assertEqual(None, scheduler.TaskRunner(s['test_me'].delete)())