From ebc3129e44d5762756d28133880b07a13f6e0be3 Mon Sep 17 00:00:00 2001 From: Steven Hardy Date: Fri, 28 Sep 2012 08:02:20 +0100 Subject: [PATCH] heat engine : Move watch logic into WatchRule class Move engine/manager.py logic into WatchRule class so watch- related DB manipulation and related logic is encapsulated in the WatchRule class Fixes #217 Change-Id: I5405ab631de17efda7eefb45dadad55ee12c533c Signed-off-by: Steven Hardy --- heat/engine/cloud_watch.py | 16 ++-- heat/engine/manager.py | 113 +++--------------------- heat/engine/watchrule.py | 148 +++++++++++++++++++++++++++++-- heat/tests/test_watch.py | 174 ++++++++++++++++++++++++++++++++----- 4 files changed, 311 insertions(+), 140 deletions(-) diff --git a/heat/engine/cloud_watch.py b/heat/engine/cloud_watch.py index a82f9b0c..22747cb6 100644 --- a/heat/engine/cloud_watch.py +++ b/heat/engine/cloud_watch.py @@ -18,8 +18,9 @@ import json import os from heat.common import exception -from heat.db import api as db_api +from heat.engine import watchrule from heat.engine.resources import Resource +from heat.db import api as db_api from heat.openstack.common import log as logging @@ -63,15 +64,10 @@ class CloudWatchAlarm(Resource): return Resource.validate(self) def handle_create(self): - wr_values = { - 'name': self.name, - 'rule': self.parsed_template('Properties'), - 'state': 'NORMAL', - 'stack_name': self.stack.name - } - - wr = db_api.watch_rule_create(self.context, wr_values) - self.instance_id = wr.id + wr = watchrule.WatchRule(context=self.context, watch_name=self.name, + rule=self.parsed_template('Properties'), + stack_name=self.stack.name) + wr.store() def handle_update(self): return self.UPDATE_REPLACE diff --git a/heat/engine/manager.py b/heat/engine/manager.py index 92fc1fe3..62e8d0d0 100644 --- a/heat/engine/manager.py +++ b/heat/engine/manager.py @@ -26,7 +26,6 @@ from heat import manager from heat.db import api as db_api from heat.common import config from heat.common import utils as heat_utils -from heat.common import context as ctxtlib from heat.engine import api from heat.engine import identifier from heat.engine import parser @@ -399,81 +398,24 @@ class EngineManager(manager.Manager): now = timeutils.utcnow() try: - wrs = db_api.watch_rule_get_all(context) + wrn = [w.name for w in db_api.watch_rule_get_all(context)] except Exception as ex: logger.warn('periodic_task db error (%s) %s' % ('watch rule removed?', str(ex))) return - for wr in wrs: - # has enough time progressed to run the rule - dt_period = datetime.timedelta(seconds=int(wr.rule['Period'])) - if now < (wr.last_evaluated + dt_period): - continue - - self.run_rule(context, wr, now) - - def run_rule(self, context, wr, now=timeutils.utcnow()): - watcher = watchrule.WatchRule(wr.rule, wr.watch_data, - wr.last_evaluated, now) - new_state = watcher.get_alarm_state() - - if new_state != wr.state: - if self.rule_action(wr, new_state): - wr.state = new_state - - wr.last_evaluated = now - wr.save() - - def rule_action(self, wr, new_state): - # TODO : push watch-rule processing into engine.watchrule - logger.warn('WATCH: stack:%s, watch_name:%s %s', - wr.stack_name, wr.name, new_state) - - actioned = False - if not watchrule.WatchRule.ACTION_MAP[new_state] in wr.rule: - logger.info('no action for new state %s', - new_state) - actioned = True - else: - s = db_api.stack_get_by_name(None, wr.stack_name) - if s and s.status in (parser.Stack.CREATE_COMPLETE, - parser.Stack.UPDATE_COMPLETE): - user_creds = db_api.user_creds_get(s.user_creds_id) - ctxt = ctxtlib.RequestContext.from_dict(user_creds) - stack = parser.Stack.load(ctxt, s.id) - for a in wr.rule[watchrule.WatchRule.ACTION_MAP[new_state]]: - greenpool.spawn_n(stack[a].alarm) - actioned = True - else: - logger.warning("Could not process watch state %s for stack" % - new_state) - return actioned + for wr in wrn: + rule = watchrule.WatchRule.load(context, wr) + rule.evaluate() def create_watch_data(self, context, watch_name, stats_data): ''' This could be used by CloudWatch and WaitConditions and treat HA service events like any other CloudWatch. ''' - wr = db_api.watch_rule_get_by_name(None, watch_name) - if wr is None: - logger.warn('NoSuch watch:%s' % (watch_name)) - return ['NoSuch Watch Rule', None] - - if not wr.rule['MetricName'] in stats_data: - logger.warn('new data has incorrect metric:%s' % - (wr.rule['MetricName'])) - return ['MetricName %s missing' % wr.rule['MetricName'], None] - - watch_data = { - 'data': stats_data, - 'watch_rule_id': wr.id - } - wd = db_api.watch_data_create(None, watch_data) - logger.debug('new watch:%s data:%s' % (watch_name, str(wd.data))) - if wr.rule['Statistic'] == 'SampleCount': - self.run_rule(None, wr) - - return [None, wd.data] + rule = watchrule.WatchRule.load(context, watch_name) + rule.create_watch_data(stats_data) + logger.debug('new watch:%s data:%s' % (watch_name, str(stats_data))) + return stats_data def show_watch(self, context, watch_name): ''' @@ -482,22 +424,15 @@ class EngineManager(manager.Manager): arg2 -> Name of the watch you want to see, or None to see all ''' if watch_name: - try: - wr = db_api.watch_rule_get_by_name(context, watch_name) - except Exception as ex: - logger.warn('show_watch (%s) db error %s' % - (watch_name, str(ex))) - if wr: - wrs = [wr] - else: - raise AttributeError('Unknown watch name %s' % watch_name) + wrn = [watch_name] else: try: - wrs = db_api.watch_rule_get_all(context) + wrn = [w.name for w in db_api.watch_rule_get_all(context)] except Exception as ex: logger.warn('show_watch (all) db error %s' % str(ex)) return + wrs = [watchrule.WatchRule.load(context, w) for w in wrn] result = [api.format_watch(w) for w in wrs] return result @@ -532,30 +467,8 @@ class EngineManager(manager.Manager): arg2 -> Name of the watch arg3 -> State (must be one defined in WatchRule class ''' - - if state not in watchrule.WatchRule.WATCH_STATES: - raise AttributeError('Unknown watch state %s' % state) - - if watch_name: - try: - wr = db_api.watch_rule_get_by_name(context, watch_name) - except Exception as ex: - logger.warn('show_watch (%s) db error %s' % - (watch_name, str(ex))) - - if not wr: - raise AttributeError('Unknown watch name %s' % watch_name) - - else: - raise AttributeError('Must pass watch_name') - - if state != wr.state: - if self.rule_action(wr, state): - logger.debug("Overriding state %s for watch %s with %s" % - (wr.state, watch_name, state)) - else: - logger.warning("Unable to override state %s for watch %s" % - (wr.state, watch_name)) + wr = watchrule.WatchRule.load(context, watch_name) + wr.set_watch_state(state) # Return the watch with the state overriden to indicate success # We do not update the timestamps as we are not modifying the DB diff --git a/heat/engine/watchrule.py b/heat/engine/watchrule.py index 33fa1e64..5d150655 100644 --- a/heat/engine/watchrule.py +++ b/heat/engine/watchrule.py @@ -17,8 +17,14 @@ import datetime from heat.openstack.common import log as logging from heat.openstack.common import timeutils +from heat.engine import resources +from heat.db import api as db_api +from heat.engine import parser +from heat.common import context as ctxtlib +import eventlet logger = logging.getLogger('heat.engine.watchrule') +greenpool = eventlet.GreenPool() class WatchRule(object): @@ -29,12 +35,63 @@ class WatchRule(object): NORMAL: 'OKActions', NODATA: 'InsufficientDataActions'} - def __init__(self, rule, dataset, last_evaluated, now): + created_at = resources.Timestamp(db_api.watch_rule_get, 'created_at') + updated_at = resources.Timestamp(db_api.watch_rule_get, 'updated_at') + + def __init__(self, context, watch_name, rule, stack_name, state=NORMAL, + wid=None, watch_data=[], last_evaluated=timeutils.utcnow()): + self.context = context + self.now = timeutils.utcnow() + self.name = watch_name + self.state = state self.rule = rule - self.data = dataset + self.stack_name = stack_name + self.timeperiod = datetime.timedelta(seconds=int(rule['Period'])) + self.id = wid + self.watch_data = watch_data self.last_evaluated = last_evaluated - self.now = now - self.timeperiod = datetime.timedelta(seconds=int(self.rule['Period'])) + + @classmethod + def load(cls, context, watch_name): + ''' + Load the watchrule from the DB by name + ''' + dbwr = None + try: + dbwr = db_api.watch_rule_get_by_name(context, watch_name) + except Exception as ex: + logger.warn('show_watch (%s) db error %s' % + (watch_name, str(ex))) + if not dbwr: + raise AttributeError('Unknown watch name %s' % watch_name) + else: + return cls(context=context, + watch_name=dbwr.name, + rule=dbwr.rule, + stack_name=dbwr.stack_name, + state=dbwr.state, + wid=dbwr.id, + watch_data=dbwr.watch_data, + last_evaluated=dbwr.last_evaluated) + + def store(self): + ''' + Store the watchrule in the database and return its ID + If self.id is set, we update the existing rule + ''' + + wr_values = { + 'name': self.name, + 'rule': self.rule, + 'state': self.state, + 'stack_name': self.stack_name + } + + if not self.id: + wr = db_api.watch_rule_create(self.context, wr_values) + self.id = wr.id + else: + db_api.watch_rule_update(self.context, self.id, wr_values) def do_data_cmp(self, data, threshold): op = self.rule['ComparisonOperator'] @@ -52,7 +109,7 @@ class WatchRule(object): def do_Maximum(self): data = 0 have_data = False - for d in self.data: + for d in self.watch_data: if d.created_at < self.now - self.timeperiod: continue if not have_data: @@ -73,7 +130,7 @@ class WatchRule(object): def do_Minimum(self): data = 0 have_data = False - for d in self.data: + for d in self.watch_data: if d.created_at < self.now - self.timeperiod: continue if not have_data: @@ -96,7 +153,7 @@ class WatchRule(object): count all samples within the specified period ''' data = 0 - for d in self.data: + for d in self.watch_data: if d.created_at < self.now - self.timeperiod: continue data = data + 1 @@ -110,7 +167,7 @@ class WatchRule(object): def do_Average(self): data = 0 samples = 0 - for d in self.data: + for d in self.watch_data: if d.created_at < self.now - self.timeperiod: continue samples = samples + 1 @@ -128,7 +185,7 @@ class WatchRule(object): def do_Sum(self): data = 0 - for d in self.data: + for d in self.watch_data: if d.created_at < self.now - self.timeperiod: logger.debug('ignoring %s' % str(d.data)) continue @@ -143,3 +200,76 @@ class WatchRule(object): def get_alarm_state(self): fn = getattr(self, 'do_%s' % self.rule['Statistic']) return fn() + + def evaluate(self): + # has enough time progressed to run the rule + self.now = timeutils.utcnow() + if self.now < (self.last_evaluated + self.timeperiod): + return + self.run_rule() + + def run_rule(self): + new_state = self.get_alarm_state() + + if new_state != self.state: + if self.rule_action(new_state): + self.state = new_state + + self.last_evaluated = self.now + self.store() + + def rule_action(self, new_state): + logger.warn('WATCH: stack:%s, watch_name:%s %s', + self.stack_name, self.name, new_state) + + actioned = False + if not self.ACTION_MAP[new_state] in self.rule: + logger.info('no action for new state %s', + new_state) + actioned = True + else: + s = db_api.stack_get_by_name(None, self.stack_name) + if s and s.status in (parser.Stack.CREATE_COMPLETE, + parser.Stack.UPDATE_COMPLETE): + user_creds = db_api.user_creds_get(s.user_creds_id) + ctxt = ctxtlib.RequestContext.from_dict(user_creds) + stack = parser.Stack.load(ctxt, s.id) + for a in self.rule[self.ACTION_MAP[new_state]]: + greenpool.spawn_n(stack[a].alarm) + actioned = True + else: + logger.warning("Could not process watch state %s for stack" % + new_state) + return actioned + + def create_watch_data(self, data): + if not self.rule['MetricName'] in data: + logger.warn('new data has incorrect metric:%s' % + (self.rule['MetricName'])) + raise AttributeError('MetricName %s missing' % + self.rule['MetricName']) + + watch_data = { + 'data': data, + 'watch_rule_id': self.id + } + wd = db_api.watch_data_create(None, watch_data) + logger.debug('new watch:%s data:%s' % (self.name, str(wd.data))) + if self.rule['Statistic'] == 'SampleCount': + self.run_rule() + + def set_watch_state(self, state): + ''' + Temporarily set the watch state + ''' + + if state not in self.WATCH_STATES: + raise AttributeError('Unknown watch state %s' % state) + + if state != self.state: + if self.rule_action(state): + logger.debug("Overriding state %s for watch %s with %s" % + (self.state, self.name, state)) + else: + logger.warning("Unable to override state %s for watch %s" % + (self.state, self.name)) diff --git a/heat/tests/test_watch.py b/heat/tests/test_watch.py index f0582f2a..3f3a0169 100644 --- a/heat/tests/test_watch.py +++ b/heat/tests/test_watch.py @@ -21,6 +21,8 @@ from nose import with_setup import unittest from nose.exc import SkipTest import logging +from heat.common import context +import heat.db as db_api from heat.openstack.common import timeutils try: @@ -39,10 +41,24 @@ class WatchData: 'Unit': 'Count'}} +@attr(tag=['unit', 'watchrule']) +@attr(speed='fast') class WatchRuleTest(unittest.TestCase): - @attr(tag=['unit', 'watchrule']) - @attr(speed='fast') + def setUp(self): + self.username = 'watchrule_test_user' + + self.m = mox.Mox() + + self.ctx = context.get_admin_context() + self.m.StubOutWithMock(self.ctx, 'username') + self.ctx.username = self.username + + self.m.ReplayAll() + + def tearDown(self): + self.m.UnsetStubs() + def test_minimum(self): rule = { 'EvaluationPeriods': '1', @@ -58,19 +74,27 @@ class WatchRuleTest(unittest.TestCase): data.append(WatchData(53, now - datetime.timedelta(seconds=150))) # all > 50 -> NORMAL - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') data.append(WatchData(25, now - datetime.timedelta(seconds=250))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'ALARM') - @attr(tag=['unit', 'watchrule']) - @attr(speed='fast') def test_maximum(self): rule = { 'EvaluationPeriods': '1', @@ -86,19 +110,29 @@ class WatchRuleTest(unittest.TestCase): data.append(WatchData(23, now - datetime.timedelta(seconds=150))) # all < 30 -> NORMAL - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') data.append(WatchData(35, now - datetime.timedelta(seconds=150))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'ALARM') - @attr(tag=['unit', 'watchrule']) - @attr(speed='fast') def test_samplecount(self): rule = { @@ -115,14 +149,26 @@ class WatchRuleTest(unittest.TestCase): data.append(WatchData(1, now - datetime.timedelta(seconds=150))) # only 2 samples -> NORMAL - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') # only 3 samples -> ALARM data.append(WatchData(1, now - datetime.timedelta(seconds=200))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'ALARM') @@ -130,13 +176,17 @@ class WatchRuleTest(unittest.TestCase): # only 3 samples (one old) -> NORMAL data.pop(0) data.append(WatchData(1, now - datetime.timedelta(seconds=400))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') - @attr(tag=['unit', 'watchrule']) - @attr(speed='fast') def test_sum(self): rule = { 'EvaluationPeriods': '1', @@ -152,20 +202,30 @@ class WatchRuleTest(unittest.TestCase): data.append(WatchData(23, now - datetime.timedelta(seconds=150))) # all < 40 -> NORMAL - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') # sum > 100 -> ALARM data.append(WatchData(85, now - datetime.timedelta(seconds=150))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'ALARM') - @attr(tag=['unit', 'watchrule']) - @attr(speed='fast') def test_ave(self): rule = { 'EvaluationPeriods': '1', @@ -180,13 +240,85 @@ class WatchRuleTest(unittest.TestCase): data = [WatchData(117, now - datetime.timedelta(seconds=100))] data.append(WatchData(23, now - datetime.timedelta(seconds=150))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'NORMAL') data.append(WatchData(195, now - datetime.timedelta(seconds=250))) - watcher = watchrule.WatchRule(rule, data, last, now) + watcher = watchrule.WatchRule(context=self.ctx, + watch_name="testwatch", + rule=rule, + stack_name="teststack", + watch_data=data, + last_evaluated=last) + watcher.now = now new_state = watcher.get_alarm_state() logger.info(new_state) self.assertEqual(new_state, 'ALARM') + + def test_load(self): + # Insert two dummy watch rules into the DB + values = {'stack_name': u'wordpress_ha', 'state': 'NORMAL', + 'name': u'HttpFailureAlarm', + 'rule': { + u'EvaluationPeriods': u'1', + u'AlarmActions': [u'WebServerRestartPolicy'], + u'AlarmDescription': u'Restart the WikiDatabase', + u'Namespace': u'system/linux', + u'Period': u'300', + u'ComparisonOperator': u'GreaterThanThreshold', + u'Statistic': u'SampleCount', + u'Threshold': u'2', + u'MetricName': u'ServiceFailure'}} + db_ret = db_api.watch_rule_create(self.ctx, values) + self.assertNotEqual(db_ret, None) + values['name'] = 'AnotherWatch' + db_ret = db_api.watch_rule_create(self.ctx, values) + self.assertNotEqual(db_ret, None) + + # Then use WatchRule.load() to retrieve each by name + # and check that the object properties match the data above + for wn in ('HttpFailureAlarm', 'AnotherWatch'): + wr = watchrule.WatchRule.load(self.ctx, wn) + self.assertEqual(type(wr), watchrule.WatchRule) + self.assertEqual(wr.name, wn) + self.assertEqual(wr.state, values['state']) + self.assertEqual(wr.rule, values['rule']) + self.assertEqual(wr.stack_name, values['stack_name']) + self.assertEqual(wr.timeperiod, datetime.timedelta( + seconds=int(values['rule']['Period']))) + + # Cleanup + db_api.watch_rule_delete(self.ctx, 'HttpFailureAlarm') + db_api.watch_rule_delete(self.ctx, 'AnotherWatch') + + def test_store(self): + rule = {u'EvaluationPeriods': u'1', + u'AlarmActions': [u'WebServerRestartPolicy'], + u'AlarmDescription': u'Restart the WikiDatabase', + u'Namespace': u'system/linux', + u'Period': u'300', + u'ComparisonOperator': u'GreaterThanThreshold', + u'Statistic': u'SampleCount', + u'Threshold': u'2', + u'MetricName': u'ServiceFailure'} + wr = watchrule.WatchRule(context=self.ctx, watch_name='storetest', + rule=rule, stack_name='teststack') + wr.store() + + dbwr = db_api.watch_rule_get_by_name(self.ctx, 'storetest') + self.assertNotEqual(dbwr, None) + self.assertEqual(dbwr.name, 'storetest') + self.assertEqual(dbwr.state, watchrule.WatchRule.NORMAL) + self.assertEqual(dbwr.stack_name, 'teststack') + self.assertEqual(dbwr.rule, rule) + + # Cleanup + db_api.watch_rule_delete(self.ctx, 'storetest') -- 2.45.2