From: Angus Salkeld Date: Tue, 5 Jun 2012 00:04:16 +0000 (+1000) Subject: Restructure watchrules to make them more testable X-Git-Tag: 2014.1~1758 X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=2d4d5529f9045ca48f8d98b31f5a119f828acca7;p=openstack-build%2Fheat-build.git Restructure watchrules to make them more testable Change-Id: Ic8085de3f5692249d82e68462bbed02da787712f Signed-off-by: Angus Salkeld --- diff --git a/heat/engine/manager.py b/heat/engine/manager.py index c1e147fa..9c3dbfad 100644 --- a/heat/engine/manager.py +++ b/heat/engine/manager.py @@ -20,10 +20,11 @@ import datetime import logging import webob from heat import manager +from heat.db import api as db_api from heat.common import config from heat.engine import parser from heat.engine import resources -from heat.db import api as db_api +from heat.engine import watchrule from heat.openstack.common import timeutils from novaclient.v1_1 import client @@ -363,112 +364,61 @@ class EngineManager(manager.Manager): pt.save() return [None, metadata] - def do_data_cmp(self, rule, data, threshold): - op = rule['ComparisonOperator'] - if op == 'GreaterThanThreshold': - return data > threshold - elif op == 'GreaterThanOrEqualToThreshold': - return data >= threshold - elif op == 'LessThanThreshold': - return data < threshold - elif op == 'LessThanOrEqualToThreshold': - return data <= threshold - else: - return False - - def do_data_calc(self, rule, rolling, metric): - - stat = rule['Statistic'] - if stat == 'Maximum': - if metric > rolling: - return metric - else: - return rolling - elif stat == 'Minimum': - if metric < rolling: - return metric - else: - return rolling - else: - return metric + rolling - @manager.periodic_task def _periodic_watcher_task(self, context): now = timeutils.utcnow() wrs = db_api.watch_rule_get_all(context) for wr in wrs: - logger.debug('_periodic_watcher_task %s' % wr.name) # has enough time progressed to run the rule dt_period = datetime.timedelta(seconds=int(wr.rule['Period'])) if now < (wr.last_evaluated + dt_period): continue - # get dataset ordered by creation_at - # - most recient first - periods = int(wr.rule['EvaluationPeriods']) - - # TODO fix this - # initial assumption: all samples are in this period - period = int(wr.rule['Period']) - #wds = db_api.watch_data_get_all(context, wr.id) - wds = wr.watch_data - - stat = wr.rule['Statistic'] - data = 0 - samples = 0 - for d in wds: - if d.created_at < wr.last_evaluated: - continue - samples = samples + 1 - metric = 1 - data = samples - if stat != 'SampleCount': - metric = int(d.data[wr.rule['MetricName']]['Value']) - data = self.do_data_calc(wr.rule, data, metric) - - if stat == 'Average' and samples > 0: - data = data / samples - - alarming = self.do_data_cmp(wr.rule, data, - int(wr.rule['Threshold'])) - logger.debug('%s: %d/%d => %d (current state:%s)' % - (wr.rule['MetricName'], - int(wr.rule['Threshold']), - data, alarming, wr.state)) - if alarming and wr.state != 'ALARM': - wr.state = 'ALARM' - wr.save() - logger.warn('ALARM> stack:%s, watch_name:%s', - wr.stack_name, wr.name) - #s = db_api.stack_get(None, wr.stack_name) - #if s: - # ps = parser.Stack(s.name, - # s.raw_template.parsed_template.template, - # s.id, - # params) - # for a in wr.rule['AlarmActions']: - # ps.resources[a].alarm() - - elif not alarming and wr.state == 'ALARM': - wr.state = 'NORMAL' - wr.save() - logger.info('NORMAL> stack:%s, watch_name:%s', - wr.stack_name, wr.name) - - wr.last_evaluated = now + self.run_rule(context, wr, now) + + def run_rule(self, context, wr, now=timeutils.utcnow()): + action_map = {'ALARM': 'AlarmActions', + 'NORMAL': 'OKActions', + 'NODATA': 'InsufficientDataActions'} + + watcher = watchrule.WatchRule(wr.rule, wr.watch_data, + wr.last_evaluated, now) + new_state = watcher.get_alarm_state() + + if new_state != wr.state: + wr.state = new_state + wr.save() + logger.warn('WATCH: stack:%s, watch_name:%s %s', + wr.stack_name, wr.name, new_state) + + if not action_map[new_state] in wr.rule: + logger.info('no action for new state %s', + new_state) + else: + s = db_api.stack_get(None, wr.stack_name) + if s: + ps = parser.Stack(context, s.name, + s.raw_template.parsed_template.template, + s.id) + for a in wr.rule[action_map[new_state]]: + ps.resources[a].alarm() + + wr.last_evaluated = now def create_watch_data(self, context, watch_name, stats_data): ''' This could be used by CloudWatch and WaitConditions and treat HA service events like any other CloudWatch. ''' - wr = db_api.watch_rule_get(context, watch_name) if wr is None: + logger.warn('NoSuch watch:%s' % (watch_name)) return ['NoSuch Watch Rule', None] if not wr.rule['MetricName'] in stats_data: + logger.warn('new data has incorrect metric:%s' % + (wr.rule['MetricName'])) return ['MetricName %s missing' % wr.rule['MetricName'], None] watch_data = { @@ -476,5 +426,8 @@ class EngineManager(manager.Manager): 'watch_rule_id': wr.id } wd = db_api.watch_data_create(context, watch_data) + logger.debug('new watch:%s data:%s' % (watch_name, str(wd.data))) + if wr.rule['Statistic'] == 'SampleCount': + self.run_rule(context, wr) return [None, wd.data] diff --git a/heat/engine/watchrule.py b/heat/engine/watchrule.py new file mode 100644 index 00000000..e11ccfd1 --- /dev/null +++ b/heat/engine/watchrule.py @@ -0,0 +1,142 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import datetime +import logging +from heat.openstack.common import timeutils + +logger = logging.getLogger('heat.engine.watchrule') + + +class WatchRule(object): + ALARM = 'ALARM' + NORMAL = 'NORMAL' + NODATA = 'NODATA' + + def __init__(self, rule, dataset, last_evaluated, now): + self.rule = rule + self.data = dataset + self.last_evaluated = last_evaluated + self.now = now + self.timeperiod = datetime.timedelta(seconds=int(self.rule['Period'])) + + def do_data_cmp(self, data, threshold): + op = self.rule['ComparisonOperator'] + if op == 'GreaterThanThreshold': + return data > threshold + elif op == 'GreaterThanOrEqualToThreshold': + return data >= threshold + elif op == 'LessThanThreshold': + return data < threshold + elif op == 'LessThanOrEqualToThreshold': + return data <= threshold + else: + return False + + def do_Maximum(self): + data = 0 + have_data = False + for d in self.data: + if d.created_at < self.now - self.timeperiod: + continue + if not have_data: + data = int(d.data[self.rule['MetricName']]['Value']) + have_data = True + if int(d.data[self.rule['MetricName']]['Value']) > data: + data = int(d.data[self.rule['MetricName']]['Value']) + + if not have_data: + return self.NODATA + + if self.do_data_cmp(data, + int(self.rule['Threshold'])): + return self.ALARM + else: + return self.NORMAL + + def do_Minimum(self): + data = 0 + have_data = False + for d in self.data: + if d.created_at < self.now - self.timeperiod: + continue + if not have_data: + data = int(d.data[self.rule['MetricName']]['Value']) + have_data = True + elif int(d.data[self.rule['MetricName']]['Value']) < data: + data = int(d.data[self.rule['MetricName']]['Value']) + + if not have_data: + return self.NODATA + + if self.do_data_cmp(data, + int(self.rule['Threshold'])): + return self.ALARM + else: + return self.NORMAL + + def do_SampleCount(self): + ''' + count all samples within the specified period + ''' + data = 0 + for d in self.data: + if d.created_at < self.now - self.timeperiod: + continue + data = data + 1 + + if self.do_data_cmp(data, + int(self.rule['Threshold'])): + return self.ALARM + else: + return self.NORMAL + + def do_Average(self): + data = 0 + samples = 0 + for d in self.data: + if d.created_at < self.now - self.timeperiod: + continue + samples = samples + 1 + data = data + int(d.data[self.rule['MetricName']]['Value']) + + if samples == 0: + return self.NODATA + + data = data / samples + if self.do_data_cmp(data, + int(self.rule['Threshold'])): + return self.ALARM + else: + return self.NORMAL + + def do_Sum(self): + data = 0 + for d in self.data: + if d.created_at < self.now - self.timeperiod: + logger.debug('ignoring %s' % str(d.data)) + continue + data = data + int(d.data[self.rule['MetricName']]['Value']) + + if self.do_data_cmp(data, + int(self.rule['Threshold'])): + return self.ALARM + else: + return self.NORMAL + + def get_alarm_state(self): + fn = getattr(self, 'do_%s' % self.rule['Statistic']) + return fn() diff --git a/heat/tests/test_watch.py b/heat/tests/test_watch.py new file mode 100644 index 00000000..229becf4 --- /dev/null +++ b/heat/tests/test_watch.py @@ -0,0 +1,177 @@ +import datetime +import mox +import nose +from nose.plugins.attrib import attr +from nose import with_setup +import unittest +from nose.exc import SkipTest +import logging + +from heat.openstack.common import timeutils +try: + from heat.engine import watchrule +except: + raise SkipTest("unable to import watchrule, skipping") + + +logger = logging.getLogger('test_watch') + + +class WatchData: + def __init__(self, data, created_at): + self.created_at = created_at + self.data = {'test_metric': {'Value': data, + 'Unit': 'Count'}} + + +class WatchRuleTest(unittest.TestCase): + + @attr(tag=['unit', 'watchrule']) + @attr(speed='fast') + def test_minimum(self): + rule = { + 'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'Period': '300', + 'Statistic': 'Minimum', + 'ComparisonOperator': 'LessThanOrEqualToThreshold', + 'Threshold': '50'} + + now = timeutils.utcnow() + last = now - datetime.timedelta(seconds=320) + data = [WatchData(77, now - datetime.timedelta(seconds=100))] + data.append(WatchData(53, now - datetime.timedelta(seconds=150))) + + # all > 50 -> NORMAL + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + data.append(WatchData(25, now - datetime.timedelta(seconds=250))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'ALARM') + + @attr(tag=['unit', 'watchrule']) + @attr(speed='fast') + def test_maximum(self): + rule = { + 'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'Period': '300', + 'Statistic': 'Maximum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '30'} + + now = timeutils.utcnow() + last = now - datetime.timedelta(seconds=320) + data = [WatchData(7, now - datetime.timedelta(seconds=100))] + data.append(WatchData(23, now - datetime.timedelta(seconds=150))) + + # all < 30 -> NORMAL + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + data.append(WatchData(35, now - datetime.timedelta(seconds=150))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'ALARM') + + @attr(tag=['unit', 'watchrule']) + @attr(speed='fast') + def test_samplecount(self): + + rule = { + 'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'Period': '300', + 'Statistic': 'SampleCount', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '3'} + + now = timeutils.utcnow() + last = now - datetime.timedelta(seconds=320) + data = [WatchData(1, now - datetime.timedelta(seconds=100))] + data.append(WatchData(1, now - datetime.timedelta(seconds=150))) + + # only 2 samples -> NORMAL + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + # only 3 samples -> ALARM + data.append(WatchData(1, now - datetime.timedelta(seconds=200))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'ALARM') + + # only 3 samples (one old) -> NORMAL + data.pop(0) + data.append(WatchData(1, now - datetime.timedelta(seconds=400))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + @attr(tag=['unit', 'watchrule']) + @attr(speed='fast') + def test_sum(self): + rule = { + 'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'Period': '300', + 'Statistic': 'Sum', + 'ComparisonOperator': 'GreaterThanOrEqualToThreshold', + 'Threshold': '100'} + + now = timeutils.utcnow() + last = now - datetime.timedelta(seconds=320) + data = [WatchData(17, now - datetime.timedelta(seconds=100))] + data.append(WatchData(23, now - datetime.timedelta(seconds=150))) + + # all < 40 -> NORMAL + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + # sum > 100 -> ALARM + data.append(WatchData(85, now - datetime.timedelta(seconds=150))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'ALARM') + + @attr(tag=['unit', 'watchrule']) + @attr(speed='fast') + def test_ave(self): + rule = { + 'EvaluationPeriods': '1', + 'MetricName': 'test_metric', + 'Period': '300', + 'Statistic': 'Average', + 'ComparisonOperator': 'GreaterThanThreshold', + 'Threshold': '100'} + + now = timeutils.utcnow() + last = now - datetime.timedelta(seconds=320) + data = [WatchData(117, now - datetime.timedelta(seconds=100))] + data.append(WatchData(23, now - datetime.timedelta(seconds=150))) + + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'NORMAL') + + data.append(WatchData(195, now - datetime.timedelta(seconds=250))) + watcher = watchrule.WatchRule(rule, data, last, now) + new_state = watcher.get_alarm_state() + logger.info(new_state) + assert(new_state == 'ALARM')