import logging
import webob
from heat import manager
+from heat.db import api as db_api
from heat.common import config
from heat.engine import parser
from heat.engine import resources
-from heat.db import api as db_api
+from heat.engine import watchrule
from heat.openstack.common import timeutils
from novaclient.v1_1 import client
pt.save()
return [None, metadata]
- def do_data_cmp(self, rule, data, threshold):
- op = rule['ComparisonOperator']
- if op == 'GreaterThanThreshold':
- return data > threshold
- elif op == 'GreaterThanOrEqualToThreshold':
- return data >= threshold
- elif op == 'LessThanThreshold':
- return data < threshold
- elif op == 'LessThanOrEqualToThreshold':
- return data <= threshold
- else:
- return False
-
- def do_data_calc(self, rule, rolling, metric):
-
- stat = rule['Statistic']
- if stat == 'Maximum':
- if metric > rolling:
- return metric
- else:
- return rolling
- elif stat == 'Minimum':
- if metric < rolling:
- return metric
- else:
- return rolling
- else:
- return metric + rolling
-
@manager.periodic_task
def _periodic_watcher_task(self, context):
now = timeutils.utcnow()
wrs = db_api.watch_rule_get_all(context)
for wr in wrs:
- logger.debug('_periodic_watcher_task %s' % wr.name)
# has enough time progressed to run the rule
dt_period = datetime.timedelta(seconds=int(wr.rule['Period']))
if now < (wr.last_evaluated + dt_period):
continue
- # get dataset ordered by creation_at
- # - most recient first
- periods = int(wr.rule['EvaluationPeriods'])
-
- # TODO fix this
- # initial assumption: all samples are in this period
- period = int(wr.rule['Period'])
- #wds = db_api.watch_data_get_all(context, wr.id)
- wds = wr.watch_data
-
- stat = wr.rule['Statistic']
- data = 0
- samples = 0
- for d in wds:
- if d.created_at < wr.last_evaluated:
- continue
- samples = samples + 1
- metric = 1
- data = samples
- if stat != 'SampleCount':
- metric = int(d.data[wr.rule['MetricName']]['Value'])
- data = self.do_data_calc(wr.rule, data, metric)
-
- if stat == 'Average' and samples > 0:
- data = data / samples
-
- alarming = self.do_data_cmp(wr.rule, data,
- int(wr.rule['Threshold']))
- logger.debug('%s: %d/%d => %d (current state:%s)' %
- (wr.rule['MetricName'],
- int(wr.rule['Threshold']),
- data, alarming, wr.state))
- if alarming and wr.state != 'ALARM':
- wr.state = 'ALARM'
- wr.save()
- logger.warn('ALARM> stack:%s, watch_name:%s',
- wr.stack_name, wr.name)
- #s = db_api.stack_get(None, wr.stack_name)
- #if s:
- # ps = parser.Stack(s.name,
- # s.raw_template.parsed_template.template,
- # s.id,
- # params)
- # for a in wr.rule['AlarmActions']:
- # ps.resources[a].alarm()
-
- elif not alarming and wr.state == 'ALARM':
- wr.state = 'NORMAL'
- wr.save()
- logger.info('NORMAL> stack:%s, watch_name:%s',
- wr.stack_name, wr.name)
-
- wr.last_evaluated = now
+ self.run_rule(context, wr, now)
+
+ def run_rule(self, context, wr, now=timeutils.utcnow()):
+ action_map = {'ALARM': 'AlarmActions',
+ 'NORMAL': 'OKActions',
+ 'NODATA': 'InsufficientDataActions'}
+
+ watcher = watchrule.WatchRule(wr.rule, wr.watch_data,
+ wr.last_evaluated, now)
+ new_state = watcher.get_alarm_state()
+
+ if new_state != wr.state:
+ wr.state = new_state
+ wr.save()
+ logger.warn('WATCH: stack:%s, watch_name:%s %s',
+ wr.stack_name, wr.name, new_state)
+
+ if not action_map[new_state] in wr.rule:
+ logger.info('no action for new state %s',
+ new_state)
+ else:
+ s = db_api.stack_get(None, wr.stack_name)
+ if s:
+ ps = parser.Stack(context, s.name,
+ s.raw_template.parsed_template.template,
+ s.id)
+ for a in wr.rule[action_map[new_state]]:
+ ps.resources[a].alarm()
+
+ wr.last_evaluated = now
def create_watch_data(self, context, watch_name, stats_data):
'''
This could be used by CloudWatch and WaitConditions
and treat HA service events like any other CloudWatch.
'''
-
wr = db_api.watch_rule_get(context, watch_name)
if wr is None:
+ logger.warn('NoSuch watch:%s' % (watch_name))
return ['NoSuch Watch Rule', None]
if not wr.rule['MetricName'] in stats_data:
+ logger.warn('new data has incorrect metric:%s' %
+ (wr.rule['MetricName']))
return ['MetricName %s missing' % wr.rule['MetricName'], None]
watch_data = {
'watch_rule_id': wr.id
}
wd = db_api.watch_data_create(context, watch_data)
+ logger.debug('new watch:%s data:%s' % (watch_name, str(wd.data)))
+ if wr.rule['Statistic'] == 'SampleCount':
+ self.run_rule(context, wr)
return [None, wd.data]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import logging
+from heat.openstack.common import timeutils
+
+logger = logging.getLogger('heat.engine.watchrule')
+
+
+class WatchRule(object):
+ ALARM = 'ALARM'
+ NORMAL = 'NORMAL'
+ NODATA = 'NODATA'
+
+ def __init__(self, rule, dataset, last_evaluated, now):
+ self.rule = rule
+ self.data = dataset
+ self.last_evaluated = last_evaluated
+ self.now = now
+ self.timeperiod = datetime.timedelta(seconds=int(self.rule['Period']))
+
+ def do_data_cmp(self, data, threshold):
+ op = self.rule['ComparisonOperator']
+ if op == 'GreaterThanThreshold':
+ return data > threshold
+ elif op == 'GreaterThanOrEqualToThreshold':
+ return data >= threshold
+ elif op == 'LessThanThreshold':
+ return data < threshold
+ elif op == 'LessThanOrEqualToThreshold':
+ return data <= threshold
+ else:
+ return False
+
+ def do_Maximum(self):
+ data = 0
+ have_data = False
+ for d in self.data:
+ if d.created_at < self.now - self.timeperiod:
+ continue
+ if not have_data:
+ data = int(d.data[self.rule['MetricName']]['Value'])
+ have_data = True
+ if int(d.data[self.rule['MetricName']]['Value']) > data:
+ data = int(d.data[self.rule['MetricName']]['Value'])
+
+ if not have_data:
+ return self.NODATA
+
+ if self.do_data_cmp(data,
+ int(self.rule['Threshold'])):
+ return self.ALARM
+ else:
+ return self.NORMAL
+
+ def do_Minimum(self):
+ data = 0
+ have_data = False
+ for d in self.data:
+ if d.created_at < self.now - self.timeperiod:
+ continue
+ if not have_data:
+ data = int(d.data[self.rule['MetricName']]['Value'])
+ have_data = True
+ elif int(d.data[self.rule['MetricName']]['Value']) < data:
+ data = int(d.data[self.rule['MetricName']]['Value'])
+
+ if not have_data:
+ return self.NODATA
+
+ if self.do_data_cmp(data,
+ int(self.rule['Threshold'])):
+ return self.ALARM
+ else:
+ return self.NORMAL
+
+ def do_SampleCount(self):
+ '''
+ count all samples within the specified period
+ '''
+ data = 0
+ for d in self.data:
+ if d.created_at < self.now - self.timeperiod:
+ continue
+ data = data + 1
+
+ if self.do_data_cmp(data,
+ int(self.rule['Threshold'])):
+ return self.ALARM
+ else:
+ return self.NORMAL
+
+ def do_Average(self):
+ data = 0
+ samples = 0
+ for d in self.data:
+ if d.created_at < self.now - self.timeperiod:
+ continue
+ samples = samples + 1
+ data = data + int(d.data[self.rule['MetricName']]['Value'])
+
+ if samples == 0:
+ return self.NODATA
+
+ data = data / samples
+ if self.do_data_cmp(data,
+ int(self.rule['Threshold'])):
+ return self.ALARM
+ else:
+ return self.NORMAL
+
+ def do_Sum(self):
+ data = 0
+ for d in self.data:
+ if d.created_at < self.now - self.timeperiod:
+ logger.debug('ignoring %s' % str(d.data))
+ continue
+ data = data + int(d.data[self.rule['MetricName']]['Value'])
+
+ if self.do_data_cmp(data,
+ int(self.rule['Threshold'])):
+ return self.ALARM
+ else:
+ return self.NORMAL
+
+ def get_alarm_state(self):
+ fn = getattr(self, 'do_%s' % self.rule['Statistic'])
+ return fn()
--- /dev/null
+import datetime
+import mox
+import nose
+from nose.plugins.attrib import attr
+from nose import with_setup
+import unittest
+from nose.exc import SkipTest
+import logging
+
+from heat.openstack.common import timeutils
+try:
+ from heat.engine import watchrule
+except:
+ raise SkipTest("unable to import watchrule, skipping")
+
+
+logger = logging.getLogger('test_watch')
+
+
+class WatchData:
+ def __init__(self, data, created_at):
+ self.created_at = created_at
+ self.data = {'test_metric': {'Value': data,
+ 'Unit': 'Count'}}
+
+
+class WatchRuleTest(unittest.TestCase):
+
+ @attr(tag=['unit', 'watchrule'])
+ @attr(speed='fast')
+ def test_minimum(self):
+ rule = {
+ 'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Minimum',
+ 'ComparisonOperator': 'LessThanOrEqualToThreshold',
+ 'Threshold': '50'}
+
+ now = timeutils.utcnow()
+ last = now - datetime.timedelta(seconds=320)
+ data = [WatchData(77, now - datetime.timedelta(seconds=100))]
+ data.append(WatchData(53, now - datetime.timedelta(seconds=150)))
+
+ # all > 50 -> NORMAL
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ data.append(WatchData(25, now - datetime.timedelta(seconds=250)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'ALARM')
+
+ @attr(tag=['unit', 'watchrule'])
+ @attr(speed='fast')
+ def test_maximum(self):
+ rule = {
+ 'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Maximum',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '30'}
+
+ now = timeutils.utcnow()
+ last = now - datetime.timedelta(seconds=320)
+ data = [WatchData(7, now - datetime.timedelta(seconds=100))]
+ data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
+
+ # all < 30 -> NORMAL
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ data.append(WatchData(35, now - datetime.timedelta(seconds=150)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'ALARM')
+
+ @attr(tag=['unit', 'watchrule'])
+ @attr(speed='fast')
+ def test_samplecount(self):
+
+ rule = {
+ 'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'SampleCount',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '3'}
+
+ now = timeutils.utcnow()
+ last = now - datetime.timedelta(seconds=320)
+ data = [WatchData(1, now - datetime.timedelta(seconds=100))]
+ data.append(WatchData(1, now - datetime.timedelta(seconds=150)))
+
+ # only 2 samples -> NORMAL
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ # only 3 samples -> ALARM
+ data.append(WatchData(1, now - datetime.timedelta(seconds=200)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'ALARM')
+
+ # only 3 samples (one old) -> NORMAL
+ data.pop(0)
+ data.append(WatchData(1, now - datetime.timedelta(seconds=400)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ @attr(tag=['unit', 'watchrule'])
+ @attr(speed='fast')
+ def test_sum(self):
+ rule = {
+ 'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Sum',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '100'}
+
+ now = timeutils.utcnow()
+ last = now - datetime.timedelta(seconds=320)
+ data = [WatchData(17, now - datetime.timedelta(seconds=100))]
+ data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
+
+ # all < 40 -> NORMAL
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ # sum > 100 -> ALARM
+ data.append(WatchData(85, now - datetime.timedelta(seconds=150)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'ALARM')
+
+ @attr(tag=['unit', 'watchrule'])
+ @attr(speed='fast')
+ def test_ave(self):
+ rule = {
+ 'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Average',
+ 'ComparisonOperator': 'GreaterThanThreshold',
+ 'Threshold': '100'}
+
+ now = timeutils.utcnow()
+ last = now - datetime.timedelta(seconds=320)
+ data = [WatchData(117, now - datetime.timedelta(seconds=100))]
+ data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
+
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'NORMAL')
+
+ data.append(WatchData(195, now - datetime.timedelta(seconds=250)))
+ watcher = watchrule.WatchRule(rule, data, last, now)
+ new_state = watcher.get_alarm_state()
+ logger.info(new_state)
+ assert(new_state == 'ALARM')