From 1d5aec19d72aecdf6158e90f55d42a9abe280dd5 Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Fri, 25 May 2012 19:24:29 +1000 Subject: [PATCH] Add the basic cloudwatch feature Fix watch db tables and silly programming errors. get basic posting data to metadata server working add watch_rule_get_all() check for alarms in a periodic task delete watch_data when the rule is deleted add a last_evaluated field to the watch_rule remove unused option to watch_data_get take better account of the sample period. - still much to be done here (evaluation periods). add some useful stats to cfn-push-stats fix how the metric is accessed fix a divide by zero Change-Id: Iaf98499d0e3ac6d6f951ea38b3b0f409669258da Signed-off-by: Angus Salkeld --- heat/cfntools/cfn-push-stats | 158 ++++++++++++++++++ heat/db/api.py | 28 ++++ heat/db/sqlalchemy/api.py | 63 +++++++ .../migrate_repo/versions/004_guest_watch.py | 57 +++++++ heat/db/sqlalchemy/models.py | 26 +++ heat/engine/cloud_watch.py | 92 ++++++++++ heat/engine/manager.py | 124 ++++++++++++++ heat/engine/parser.py | 2 + heat/metadata/api/v1/__init__.py | 8 + heat/metadata/api/v1/metadata.py | 21 +++ ...WordPress_Single_Instance_With_HA.template | 20 +++ 11 files changed, 599 insertions(+) create mode 100755 heat/cfntools/cfn-push-stats create mode 100644 heat/db/sqlalchemy/migrate_repo/versions/004_guest_watch.py create mode 100644 heat/engine/cloud_watch.py diff --git a/heat/cfntools/cfn-push-stats b/heat/cfntools/cfn-push-stats new file mode 100755 index 00000000..996adbe7 --- /dev/null +++ b/heat/cfntools/cfn-push-stats @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Implements cfn-signal CloudFormation functionality +""" +import argparse +import logging +import psutil +import os +import random +import sys + +# +# --aws-credential-file=PATH Specifies the location of the file with AWS +# credentials. +# --aws-access-key-id=VALUE Specifies the AWS access key ID to use to +# identify the caller. +# --aws-secret-key=VALUE Specifies the AWS secret key to use to sign +# the request. +# --from-cron Specifies that this script is running from cron. +# +# Examples +# +# To perform a simple test run without posting data to Amazon CloudWatch +# +# ./mon-put-instance-data.pl --mem-util --verify --verbose +# +# To set a five-minute cron schedule to report memory and disk space utilization to CloudWatch +# +# */5 * * * * ~/aws-scripts-mon/mon-put-instance-data.pl --mem-util --disk-space-util --disk-path=/ --from-cron +# + +if os.path.exists('/opt/aws/bin'): + sys.path.insert(0, '/opt/aws/bin') + from cfn_helper import * +else: + from heat.cfntools.cfn_helper import * + +KILO = 1024 +MEGA = 1048576 +GIGA = 1073741824 +unit_map = {'bytes': 1, + 'kilobytes': KILO, + 'megabytes': MEGA, + 'gigabytes': GIGA} + +description = " " +parser = argparse.ArgumentParser(description=description) +parser.add_argument('-v', '--verbose', action="store_true", + help="Verbose logging", required=False) +parser.add_argument('--service-failure', required=False, action="store_true", + help='Reports a service falure.') +parser.add_argument('--mem-util', required=False, action="store_true", + help='Reports memory utilization in percentages.') +parser.add_argument('--mem-used', required=False, action="store_true", + help='Reports memory used (excluding cache and buffers) in megabytes.') +parser.add_argument('--mem-avail', required=False, action="store_true", + help='Reports available memory (including cache and buffers) in megabytes.') +parser.add_argument('--swap-util', required=False, action="store_true", + help='Reports swap utilization in percentages.') +parser.add_argument('--swap-used', required=False, action="store_true", + help='Reports allocated swap space in megabytes.') +parser.add_argument('--disk-space-util', required=False, action="store_true", + help='Reports disk space utilization in percentages.') +parser.add_argument('--disk-space-used', required=False, action="store_true", + help='Reports allocated disk space in gigabytes.') +parser.add_argument('--disk-space-avail',required=False, action="store_true", + help='Reports available disk space in gigabytes.') +parser.add_argument('--memory-units', required=False, default='megabytes', + help='Specifies units for memory metrics.') +parser.add_argument('--disk-units', required=False, default='megabytes', + help='Specifies units for disk metrics.') +parser.add_argument('--disk-path', required=False, default='/', + help='Selects the disk by the path on which to report.') +parser.add_argument('url', + help='the url to post to') +args = parser.parse_args() + +log_format = '%(levelname)s [%(asctime)s] %(message)s' +logging.basicConfig(format=log_format, level=logging.DEBUG) +logger = logging.getLogger('cfn-push-stats') + +data = {'Namespace': 'system/linux'} + +# service failure +# =============== +if args.service_failure: + data['ServiceFailure'] = { + 'Value': 1, + 'Units': 'Counter'} + +# memory space +# ========== +if args.mem_util or args.mem_used or args.mem_avail: + mem = psutil.phymem_usage() +if args.mem_util: + data['MemoryUtilization'] = { + 'Value': mem.percent, + 'Units': 'Percent'} +if args.mem_used: + data['MemoryUsed'] = { + 'Value': mem.used / unit_map[args.memory_units], + 'Units': args.memory_units} +if args.mem_avail: + data['MemoryAvailable'] = { + 'Value': mem.free / unit_map[args.memory_units], + 'Units': args.memory_units} + +# swap space +# ========== +if args.swap_util or args.swap_used: + swap = psutil.virtmem_usage() +if args.swap_util: + data['SwapUtilization'] = { + 'Value': swap.percent, + 'Units': 'Percent'} +if args.swap_used: + data['SwapUsed'] = { + 'Value': swap.used / unit_map[args.memory_units], + 'Units': args.memory_units} + +# disk space +# ========== +if args.disk_space_util or args.disk_space_used or args.disk_space_avail: + disk = psutil.disk_usage(args.disk_path) +if args.disk_space_util: + data['DiskSpaceUtilization'] = { + 'Value': disk.percent, + 'Units': 'Percent'} +if args.disk_space_used: + data['DiskSpaceUsed'] = { + 'Value': disk.used / unit_map[args.disk_units], + 'Units': args.disk_units} +if args.disk_space_avail: + data['DiskSpaceAvailable'] = { + 'Value': disk.free / unit_map[args.disk_units], + 'Units': args.disk_units} + +logger.info(str(data)) + +cmd_str = "curl -X POST -H \'Content-Type:\' --data-binary \'%s\' %s" % \ + (json.dumps(data), args.url) + +cmd = CommandRunner(cmd_str) +cmd.run() +print cmd.stdout diff --git a/heat/db/api.py b/heat/db/api.py index c5fd544e..665d1ee1 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -125,3 +125,31 @@ def event_get_all_by_stack(context, stack_id): def event_create(context, values): return IMPL.event_create(context, values) + + +def watch_rule_get(context, watch_rule_name): + return IMPL.watch_rule_get(context, watch_rule_name) + + +def watch_rule_get_all(context): + return IMPL.watch_rule_get_all(context) + + +def watch_rule_create(context, values): + return IMPL.watch_rule_create(context, values) + + +def watch_rule_delete(context, watch_rule_name): + return IMPL.watch_rule_delete(context, watch_rule_name) + + +def watch_data_create(context, values): + return IMPL.watch_data_create(context, values) + + +def watch_data_get_all(context, watch_id): + return IMPL.watch_data_get_all(context, watch_id) + + +def watch_data_delete(context, watch_name): + return IMPL.watch_data_delete(context, watch_name) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 61b5a295..88aa33bf 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -195,3 +195,66 @@ def event_create(context, values): event_ref.update(values) event_ref.save() return event_ref + + +def watch_rule_get(context, watch_rule_name): + result = model_query(context, models.WatchRule).\ + filter_by(name=watch_rule_name).first() + return result + + +def watch_rule_get_all(context): + results = model_query(context, models.WatchRule).all() + return results + + +def watch_rule_create(context, values): + obj_ref = models.WatchRule() + obj_ref.update(values) + obj_ref.save() + return obj_ref + + +def watch_rule_delete(context, watch_name): + wr = model_query(context, models.WatchRule).\ + filter_by(name=watch_name).first() + + if not wr: + raise Exception('Attempt to delete a watch_rule with name: %s %s' % \ + (watch_name, 'that does not exist')) + + session = Session.object_session(wr) + + for d in wr.watch_data: + session.delete(d) + + session.delete(wr) + session.flush() + + +def watch_data_create(context, values): + obj_ref = models.WatchData() + obj_ref.update(values) + obj_ref.save() + return obj_ref + + +def watch_data_get_all(context, watch_id): + # get dataset ordered by creation_at (most recient first) + results = model_query(context, models.WatchData).\ + filter_by(watch_rule_id=watch_id).all() + return results + + +def watch_data_delete(context, watch_name): + ds = model_query(context, models.WatchRule).\ + filter_by(name=watch_name).all() + + if not ds: + raise Exception('Attempt to delete watch_data with name: %s %s' % \ + (watch_name, 'that does not exist')) + + session = Session.object_session(ds) + for d in ds: + session.delete(d) + session.flush() diff --git a/heat/db/sqlalchemy/migrate_repo/versions/004_guest_watch.py b/heat/db/sqlalchemy/migrate_repo/versions/004_guest_watch.py new file mode 100644 index 00000000..ac4b89f7 --- /dev/null +++ b/heat/db/sqlalchemy/migrate_repo/versions/004_guest_watch.py @@ -0,0 +1,57 @@ +from sqlalchemy import * +from migrate import * + + +def upgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + watch_rule = Table( + 'watch_rule', meta, + Column('id', Integer, primary_key=True), + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('stack_name', String(length=255, convert_unicode=False, + assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('name', String(length=255, convert_unicode=False, + assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('state', String(length=255, convert_unicode=False, + assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('rule', Text()), + Column('last_evaluated', DateTime(timezone=False)), + ) + + try: + watch_rule.create() + except Exception: + meta.drop_all(tables=tables) + raise + + watch_data = Table( + 'watch_data', meta, + Column('id', Integer, primary_key=True), + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('data', Text()), + Column('watch_rule_id', Integer, ForeignKey("watch_rule.id"), + nullable=False), + ) + + try: + watch_data.create() + except Exception: + meta.drop_all(tables=tables) + raise + + +def downgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + watch_rule = Table('watch_rule', meta, autoload=True) + watch_rule.drop() + watch_data = Table('watch_data', meta, autoload=True) + watch_data.drop() diff --git a/heat/db/sqlalchemy/models.py b/heat/db/sqlalchemy/models.py index e7730aac..445b9edb 100644 --- a/heat/db/sqlalchemy/models.py +++ b/heat/db/sqlalchemy/models.py @@ -182,3 +182,29 @@ class Resource(BASE, HeatBase): stack = relationship(Stack, backref=backref('resources')) depends_on = Column(Integer) + + +class WatchRule(BASE, HeatBase): + """Represents a watch_rule created by the heat engine.""" + + __tablename__ = 'watch_rule' + + id = Column(Integer, primary_key=True) + name = Column('name', String, nullable=False) + rule = Column('rule', Json) + state = Column('state', String) + stack_name = Column('stack_name', String) + last_evaluated = Column(DateTime, default=timeutils.utcnow) + + +class WatchData(BASE, HeatBase): + """Represents a watch_data created by the heat engine.""" + + __tablename__ = 'watch_data' + + id = Column(Integer, primary_key=True) + data = Column('data', Json) + + watch_rule_id = Column(Integer, ForeignKey('watch_rule.id'),\ + nullable=False) + watch_rule = relationship(WatchRule, backref=backref('watch_data')) diff --git a/heat/engine/cloud_watch.py b/heat/engine/cloud_watch.py new file mode 100644 index 00000000..6f5d38c6 --- /dev/null +++ b/heat/engine/cloud_watch.py @@ -0,0 +1,92 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import logging +import json +import os + +from heat.common import exception +from heat.db import api as db_api +from heat.engine.resources import Resource + +logger = logging.getLogger('heat.engine.cloud_watch') + + +class CloudWatchAlarm(Resource): + properties_schema = {'ComparisonOperator': {'Type': 'String', + 'AllowedValues': ['GreaterThanOrEqualToThreshold', + 'GreaterThanThreshold', 'LessThanThreshold', + 'LessThanOrEqualToThreshold']}, + 'EvaluationPeriods': {'Type': 'String'}, + 'MetricName': {'Type': 'String'}, + 'Namespace': {'Type': 'String'}, + 'Period': {'Type': 'String'}, + 'Statistic': {'Type': 'String', + 'AllowedValues': ['SampleCount', 'Average', 'Sum', + 'Minimum', 'Maximum']}, + 'Threshold': {'Type': 'String'}, + 'Units': {'Type': 'String', + 'AllowedValues': ['Seconds', 'Microseconds', 'Milliseconds', + 'Bytes', 'Kilobytes', 'Megabytes', 'Gigabytes', + 'Terabytes', 'Bits', 'Kilobits', 'Megabits', 'Gigabits', + 'Terabits', 'Percent', 'Count', 'Bytes/Second', + 'Kilobytes/Second', 'Megabytes/Second', 'Gigabytes/Second', + 'Terabytes/Second', 'Bits/Second', 'Kilobits/Second', + 'Megabits/Second', 'Gigabits/Second', 'Terabits/Second', + 'Count/Second', None]}} + + def __init__(self, name, json_snippet, stack): + super(CloudWatchAlarm, self).__init__(name, json_snippet, stack) + self.instance_id = '' + + def validate(self): + ''' + Validate the Properties + ''' + return Resource.validate(self) + + def create(self): + if self.state != None: + return + self.state_set(self.CREATE_IN_PROGRESS) + Resource.create(self) + + wr_values = { + 'name': self.name, + 'rule': self.t['Properties'], + 'state': 'NORMAL', + 'stack_name': self.stack.name + } + + wr = db_api.watch_rule_create(None, wr_values) + self.instance_id = wr.id + + self.state_set(self.CREATE_COMPLETE) + + def delete(self): + if self.state == self.DELETE_IN_PROGRESS or \ + self.state == self.DELETE_COMPLETE: + return + + self.state_set(self.DELETE_IN_PROGRESS) + + db_api.watch_rule_delete(None, self.name) + + Resource.delete(self) + self.state_set(self.DELETE_COMPLETE) + + def FnGetRefId(self): + return unicode(self.name) diff --git a/heat/engine/manager.py b/heat/engine/manager.py index c97e207f..ce6919c9 100644 --- a/heat/engine/manager.py +++ b/heat/engine/manager.py @@ -16,6 +16,7 @@ import contextlib from copy import deepcopy +import datetime import functools import os import socket @@ -30,6 +31,7 @@ from heat.common import config from heat.engine import parser from heat.engine import resources from heat.db import api as db_api +from heat.openstack.common import timeutils logger = logging.getLogger('heat.engine.manager') @@ -324,3 +326,125 @@ class EngineManager(manager.Manager): pt.template = t pt.save() return [None, metadata] + + def do_data_cmp(self, rule, data, threshold): + op = rule['ComparisonOperator'] + if op == 'GreaterThanThreshold': + return data > threshold + elif op == 'GreaterThanOrEqualToThreshold': + return data >= threshold + elif op == 'LessThanThreshold': + return data < threshold + elif op == 'LessThanOrEqualToThreshold': + return data <= threshold + else: + return False + + def do_data_calc(self, rule, rolling, metric): + + stat = rule['Statistic'] + if stat == 'Maximum': + if metric > rolling: + return metric + else: + return rolling + elif stat == 'Minimum': + if metric < rolling: + return metric + else: + return rolling + else: + return metric + rolling + + @manager.periodic_task + def _periodic_watcher_task(self, context): + + now = timeutils.utcnow() + wrs = db_api.watch_rule_get_all(context) + for wr in wrs: + logger.debug('_periodic_watcher_task %s' % wr.name) + # has enough time progressed to run the rule + dt_period = datetime.timedelta(seconds=int(wr.rule['Period'])) + if now < (wr.last_evaluated + dt_period): + continue + + # get dataset ordered by creation_at + # - most recient first + periods = int(wr.rule['EvaluationPeriods']) + + # TODO fix this + # initial assumption: all samples are in this period + period = int(wr.rule['Period']) + #wds = db_api.watch_data_get_all(context, wr.id) + wds = wr.watch_data + + stat = wr.rule['Statistic'] + data = 0 + samples = 0 + for d in wds: + if d.created_at < wr.last_evaluated: + logger.debug('ignoring old data %s: %s < %s' % \ + (wr.rule['MetricName'], + str(d.created_at), + str(wr.last_evaluated))) + continue + samples = samples + 1 + metric = 1 + data = samples + if stat != 'SampleCount': + metric = int(d.data[wr.rule['MetricName']]['Value']) + data = self.do_data_calc(wr.rule, data, metric) + logger.debug('%s: %d/%d' % (wr.rule['MetricName'], + metric, data)) + + if stat == 'Average' and samples > 0: + data = data / samples + + alarming = self.do_data_cmp(wr.rule, data, + int(wr.rule['Threshold'])) + logger.debug('%s: %d/%d => %d (current state:%s)' % \ + (wr.rule['MetricName'], + int(wr.rule['Threshold']), + data, alarming, wr.state)) + if alarming and wr.state != 'ALARM': + wr.state = 'ALARM' + wr.save() + logger.info('ALARM> stack:%s, watch_name:%s', + wr.stack_name, wr.name) + #s = db_api.stack_get(None, wr.stack_name) + #if s: + # ps = parser.Stack(s.name, + # s.raw_template.parsed_template.template, + # s.id, + # params) + # for a in wr.rule['AlarmActions']: + # ps.resources[a].alarm() + + elif not alarming and wr.state == 'ALARM': + wr.state = 'NORMAL' + wr.save() + logger.info('NORMAL> stack:%s, watch_name:%s', + wr.stack_name, wr.name) + + wr.last_evaluated = now + + def create_watch_data(self, context, watch_name, stats_data): + ''' + This could be used by CloudWatch and WaitConditions + and treat HA service events like any other CloudWatch. + ''' + + wr = db_api.watch_rule_get(context, watch_name) + if wr is None: + return ['NoSuch Watch Rule', None] + + if not wr.rule['MetricName'] in stats_data: + return ['MetricName %s missing' % wr.rule['MetricName'], None] + + watch_data = { + 'data': stats_data, + 'watch_rule_id': wr.id + } + wd = db_api.watch_data_create(context, watch_data) + + return [None, wd.data] diff --git a/heat/engine/parser.py b/heat/engine/parser.py index 7c9f2a4e..56fc524c 100644 --- a/heat/engine/parser.py +++ b/heat/engine/parser.py @@ -19,6 +19,7 @@ import logging import sys from heat.common import exception from heat.engine import checkeddict +from heat.engine import cloud_watch from heat.engine import eip from heat.engine import instance from heat.engine import resources @@ -27,6 +28,7 @@ from heat.engine import user from heat.engine import volume from heat.engine import wait_condition from heat.db import api as db_api + logger = logging.getLogger(__file__) diff --git a/heat/metadata/api/v1/__init__.py b/heat/metadata/api/v1/__init__.py index 0eabaad1..4c0eba3f 100644 --- a/heat/metadata/api/v1/__init__.py +++ b/heat/metadata/api/v1/__init__.py @@ -52,6 +52,14 @@ class API(wsgi.Router): mapper.connect('/events/', controller=metadata_controller, action='create_event', conditions=dict(method=['POST'])) + mapper.connect('/stats/:watch_name/data/', + controller=metadata_controller, + action='create_watch_data', + conditions=dict(method=['POST'])) +# mapper.connect('/stats/:watch_name/data/', +# controller=metadata_controller, +# action='list_watch_data', +# conditions=dict(method=['GET'])) # TODO(shadower): make sure all responses are JSON-encoded # currently, calling an unknown route uses the default handler which diff --git a/heat/metadata/api/v1/metadata.py b/heat/metadata/api/v1/metadata.py index f02c451b..40b406b3 100644 --- a/heat/metadata/api/v1/metadata.py +++ b/heat/metadata/api/v1/metadata.py @@ -106,6 +106,27 @@ class MetadataController: return json_error(400, error) return json_response(201, event) + def create_watch_data(self, req, body, watch_name): + con = context.get_admin_context() + [error, watch_data] = rpc.call(con, 'engine', + {'method': 'create_watch_data', + 'args': {'watch_name': watch_name, + 'stats_data': body}}) + if error: + return json_error(400, error) + return json_response(201, watch_data) + + def list_watch_data(self, req, watch_name): + con = context.get_admin_context() + data = rpc.call(con, 'engine', + {'method': 'list_watch_data', + 'args': {'watch_name': watch_name}}) + if data: + return data + else: + return json_error(404, + 'The watch "%s" does not exist.' % watch_name) + def create_resource(options): """ diff --git a/templates/WordPress_Single_Instance_With_HA.template b/templates/WordPress_Single_Instance_With_HA.template index cd3d2eb0..34286dff 100644 --- a/templates/WordPress_Single_Instance_With_HA.template +++ b/templates/WordPress_Single_Instance_With_HA.template @@ -97,6 +97,26 @@ }, "Resources" : { + "WebServerRestartPolicy" : { + "Type" : "HEAT::Recovery::EscalationPolicy", + "Properties" : { + "Instance" : { "Ref" : "WikiDatabase" }, + } + }, + "HttpFailureAlarm": { + "Type": "AWS::CloudWatch::Alarm", + "Properties": { + "AlarmDescription": "Restart the WikiDatabase if httpd fails > 3 times in 10 minutes", + "MetricName": "ProcessRestartCount", + "Namespace": "HEAT", + "Statistic": "Maximum", + "Period": "300", + "EvaluationPeriods": "2", + "Threshold": "3", + "AlarmActions": [ { "Ref": "WebServerRestartPolicy" } ], + "ComparisonOperator": "GreaterThanThreshold" + } + }, "WikiDatabase": { "Type": "AWS::EC2::Instance", "Metadata" : { -- 2.45.2