From 84500c75bcd8d0e7cc6d03f6f12159e8f694e00a Mon Sep 17 00:00:00 2001 From: Steven Hardy Date: Wed, 16 Jan 2013 16:21:16 +0000 Subject: [PATCH] heat engine : make watchrule actions run in stack ThreadGroup Make watchrule alarm actions run via greenthreads in the stack ThreadGroup - this allows them to be correctly cancelled if the stack is deleted whilst an alarm action is in progress. fixes bug 1097847 Change-Id: I3190a05e3d0abd492961071d2db59a212bd4d373 Signed-off-by: Steven Hardy --- heat/engine/service.py | 8 +- heat/engine/watchrule.py | 29 ++--- heat/tests/test_engine_service.py | 41 +++++-- heat/tests/test_watch.py | 191 +++++++++++++++++++++++++++++- 4 files changed, 239 insertions(+), 30 deletions(-) diff --git a/heat/engine/service.py b/heat/engine/service.py index a45c3d44..1d397986 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -467,7 +467,9 @@ class EngineService(service.Service): return for wr in wrs: rule = watchrule.WatchRule.load(stack_context, watch=wr) - rule.evaluate() + actions = rule.evaluate() + for action in actions: + self._start_in_thread(sid, action) @request_context def create_watch_data(self, context, watch_name, stats_data): @@ -534,7 +536,9 @@ class EngineService(service.Service): arg3 -> State (must be one defined in WatchRule class ''' wr = watchrule.WatchRule.load(context, watch_name) - wr.set_watch_state(state) + actions = wr.set_watch_state(state) + for action in actions: + self._start_in_thread(wr.stack_id, action) # Return the watch with the state overriden to indicate success # We do not update the timestamps as we are not modifying the DB diff --git a/heat/engine/watchrule.py b/heat/engine/watchrule.py index a5aa5a74..6450bb8b 100644 --- a/heat/engine/watchrule.py +++ b/heat/engine/watchrule.py @@ -22,10 +22,8 @@ from heat.engine import timestamp from heat.db import api as db_api from heat.engine import parser from heat.rpc import api as rpc_api -import eventlet logger = logging.getLogger(__name__) -greenpool = eventlet.GreenPool() class WatchRule(object): @@ -213,40 +211,39 @@ class WatchRule(object): # has enough time progressed to run the rule self.now = timeutils.utcnow() if self.now < (self.last_evaluated + self.timeperiod): - return - self.run_rule() + return [] + return self.run_rule() def run_rule(self): new_state = self.get_alarm_state() + actions = [] if new_state != self.state: - action = self.rule_action(new_state) + actions = self.rule_actions(new_state) self.state = new_state self.last_evaluated = self.now self.store() + return actions - def rule_action(self, new_state): + def rule_actions(self, new_state): logger.warn('WATCH: stack:%s, watch_name:%s %s', self.stack_id, self.name, new_state) - - actioned = False + actions = [] if not self.ACTION_MAP[new_state] in self.rule: logger.info('no action for new state %s', new_state) - actioned = True else: s = db_api.stack_get(self.context, self.stack_id) if s and s.status in (parser.Stack.CREATE_COMPLETE, parser.Stack.UPDATE_COMPLETE): stack = parser.Stack.load(self.context, stack=s) for a in self.rule[self.ACTION_MAP[new_state]]: - greenpool.spawn_n(stack[a].alarm) - actioned = True + actions.append(stack[a].alarm) else: logger.warning("Could not process watch state %s for stack" % new_state) - return actioned + return actions def create_watch_data(self, data): if not self.rule['MetricName'] in data: @@ -264,16 +261,20 @@ class WatchRule(object): def set_watch_state(self, state): ''' - Temporarily set the watch state + Temporarily set the watch state, returns list of functions to be + scheduled in the stack ThreadGroup for the specified state ''' if state not in self.WATCH_STATES: raise ValueError('Unknown watch state %s' % state) + actions = [] if state != self.state: - if self.rule_action(state): + actions = self.rule_actions(state) + if actions: logger.debug("Overriding state %s for watch %s with %s" % (self.state, self.name, state)) else: logger.warning("Unable to override state %s for watch %s" % (self.state, self.name)) + return actions diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 5a7c7ba3..c3ac1702 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -79,11 +79,15 @@ def setup_mocks(mocks, stack): class DummyThreadGroup(object): + def __init__(self): + self.threads = [] + def add_timer(self, interval, callback, initial_delay=None, *args, **kwargs): pass def add_thread(self, callback, *args, **kwargs): + self.threads.append(callback) pass def stop(self): @@ -790,21 +794,38 @@ class stackServiceTest(unittest.TestCase): class DummyAction: alarm = "dummyfoo" + dummy_action = DummyAction() self.m.StubOutWithMock(parser.Stack, '__getitem__') parser.Stack.__getitem__( - 'WebServerRestartPolicy').AndReturn(DummyAction()) + 'WebServerRestartPolicy').AndReturn(dummy_action) - self.m.StubOutWithMock(watchrule.greenpool, 'spawn_n') - watchrule.greenpool.spawn_n("dummyfoo").AndReturn(None) - self.m.ReplayAll() + # Replace the real stack threadgroup with a dummy one, so we can + # check the function returned on ALARM is correctly scheduled + self.man.stg[self.stack.id] = DummyThreadGroup() - for state in watchrule.WatchRule.WATCH_STATES: + self.m.ReplayAll() - result = self.man.set_watch_state(self.ctx, - watch_name="OverrideAlarm", - state=state) - self.assertNotEqual(result, None) - self.assertEqual(result[engine_api.WATCH_STATE_VALUE], state) + state = watchrule.WatchRule.NODATA + result = self.man.set_watch_state(self.ctx, + watch_name="OverrideAlarm", + state=state) + self.assertEqual(result[engine_api.WATCH_STATE_VALUE], state) + self.assertEqual(self.man.stg[self.stack.id].threads, []) + + state = watchrule.WatchRule.NORMAL + result = self.man.set_watch_state(self.ctx, + watch_name="OverrideAlarm", + state=state) + self.assertEqual(result[engine_api.WATCH_STATE_VALUE], state) + self.assertEqual(self.man.stg[self.stack.id].threads, []) + + state = watchrule.WatchRule.ALARM + result = self.man.set_watch_state(self.ctx, + watch_name="OverrideAlarm", + state=state) + self.assertEqual(result[engine_api.WATCH_STATE_VALUE], state) + self.assertEqual(self.man.stg[self.stack.id].threads, + [DummyAction.alarm]) # Cleanup, delete the dummy rule db_api.watch_rule_delete(self.ctx, "OverrideAlarm") diff --git a/heat/tests/test_watch.py b/heat/tests/test_watch.py index c5372f08..e62109b4 100644 --- a/heat/tests/test_watch.py +++ b/heat/tests/test_watch.py @@ -23,6 +23,7 @@ import heat.db as db_api from heat.openstack.common import timeutils from heat.engine import watchrule +from heat.engine import parser class WatchData: @@ -32,6 +33,10 @@ class WatchData: 'Unit': 'Count'}} +class DummyAction: + alarm = "DummyAction" + + @attr(tag=['unit', 'watchrule']) @attr(speed='fast') class WatchRuleTest(unittest.TestCase): @@ -41,7 +46,8 @@ class WatchRuleTest(unittest.TestCase): # Create a dummy stack in the DB as WatchRule instances # must be associated with a stack ctx = context.get_admin_context() - tmpl = db_api.raw_template_create(ctx, {'foo': 'bar'}) + empty_tmpl = {"template": {}} + tmpl = db_api.raw_template_create(ctx, empty_tmpl) dummy_stack = {'id': '6754d843-bed2-40dc-a325-84882bb90a98', 'name': 'dummystack', 'raw_template_id': tmpl.id, @@ -62,14 +68,26 @@ class WatchRuleTest(unittest.TestCase): self.m = mox.Mox() self.ctx = context.get_admin_context() - self.m.StubOutWithMock(self.ctx, 'username') self.ctx.username = self.username + self.ctx.tenant_id = u'123456' self.m.ReplayAll() def tearDown(self): self.m.UnsetStubs() + def _action_set_stubs(self, now): + # Setup stubs for the action tests + self.m.StubOutWithMock(timeutils, 'utcnow') + timeutils.utcnow().MultipleTimes().AndReturn(now) + + dummy_action = DummyAction() + self.m.StubOutWithMock(parser.Stack, '__getitem__') + parser.Stack.__getitem__(mox.IgnoreArg() + ).MultipleTimes().AndReturn(dummy_action) + + self.m.ReplayAll() + def test_minimum(self): rule = {'EvaluationPeriods': '1', 'MetricName': 'test_metric', @@ -339,8 +357,9 @@ class WatchRuleTest(unittest.TestCase): stack_id=self.stack_id, last_evaluated=last) - watcher.evaluate() + actions = watcher.evaluate() self.assertEqual(watcher.state, 'NORMAL') + self.assertEqual(actions, []) # now - last == Period, so should set ALARM last = now - datetime.timedelta(seconds=300) @@ -352,6 +371,170 @@ class WatchRuleTest(unittest.TestCase): stack_id=self.stack_id, last_evaluated=last) - watcher.evaluate() + actions = watcher.evaluate() self.assertEqual(watcher.state, 'ALARM') self.assertEqual(watcher.last_evaluated, now) + # No AlarmActions defined in the rule, so expect [] + self.assertEqual(actions, []) + + def test_rule_actions_alarm_normal(self): + rule = {'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'AlarmActions': ['DummyAction'], + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + self._action_set_stubs(now) + + # Set data so rule evaluates to NORMAL state + last = now - datetime.timedelta(seconds=300) + data = WatchData(25, now - datetime.timedelta(seconds=150)) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + watch_data=[data], + stack_id=self.stack_id, + last_evaluated=last) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'NORMAL') + self.assertEqual(actions, []) + + def test_rule_actions_alarm_alarm(self): + rule = {'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'AlarmActions': ['DummyAction'], + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + self._action_set_stubs(now) + + # Set data so rule evaluates to ALARM state + last = now - datetime.timedelta(seconds=300) + data = WatchData(35, now - datetime.timedelta(seconds=150)) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + watch_data=[data], + stack_id=self.stack_id, + last_evaluated=last) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'ALARM') + self.assertEqual(actions, ['DummyAction']) + + # re-set last_evaluated so the rule will be evaluated again, + # but since we're already in ALARM state, we should not generate + # any additional actions + last = now - datetime.timedelta(seconds=300) + watcher.last_evaluated = last + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'ALARM') + self.assertEqual(actions, []) + + def test_rule_actions_alarm_two_actions(self): + rule = {'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'AlarmActions': ['DummyAction', 'AnotherDummyAction'], + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + self._action_set_stubs(now) + + # Set data so rule evaluates to ALARM state + last = now - datetime.timedelta(seconds=300) + data = WatchData(35, now - datetime.timedelta(seconds=150)) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + watch_data=[data], + stack_id=self.stack_id, + last_evaluated=last) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'ALARM') + self.assertEqual(actions, ['DummyAction', 'DummyAction']) + + def test_rule_actions_ok_alarm(self): + rule = {'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'OKActions': ['DummyAction'], + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + self._action_set_stubs(now) + + # Set data so rule evaluates to ALARM state + last = now - datetime.timedelta(seconds=300) + data = WatchData(35, now - datetime.timedelta(seconds=150)) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + watch_data=[data], + stack_id=self.stack_id, + last_evaluated=last) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'ALARM') + self.assertEqual(actions, []) + + # Move time forward and add data below threshold so we transition from + # ALARM -> NORMAL, so evaluate() should output a 'DummyAction' + now = now + datetime.timedelta(seconds=300) + self.m.UnsetStubs() + self._action_set_stubs(now) + + data = WatchData(25, now - datetime.timedelta(seconds=150)) + watcher.watch_data = [data] + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'NORMAL') + self.assertEqual(actions, ['DummyAction']) + + def test_rule_actions_nodata(self): + rule = {'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'InsufficientDataActions': ['DummyAction'], + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + self._action_set_stubs(now) + + # Set data so rule evaluates to ALARM state + last = now - datetime.timedelta(seconds=300) + data = WatchData(35, now - datetime.timedelta(seconds=150)) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + watch_data=[data], + stack_id=self.stack_id, + last_evaluated=last) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'ALARM') + self.assertEqual(actions, []) + + # Move time forward and don't add data so we transition from + # ALARM -> NODATA, so evaluate() should output a 'DummyAction' + now = now + datetime.timedelta(seconds=300) + self.m.UnsetStubs() + self._action_set_stubs(now) + + actions = watcher.evaluate() + self.assertEqual(watcher.state, 'NODATA') + self.assertEqual(actions, ['DummyAction']) -- 2.45.2