Fix watch db tables and silly programming errors.
get basic posting data to metadata server working
add watch_rule_get_all()
check for alarms in a periodic task
delete watch_data when the rule is deleted
add a last_evaluated field to the watch_rule
remove unused option to watch_data_get
take better account of the sample period.
- still much to be done here (evaluation periods).
add some useful stats to cfn-push-stats
fix how the metric is accessed
fix a divide by zero
Change-Id: Iaf98499d0e3ac6d6f951ea38b3b0f409669258da
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
--- /dev/null
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Implements cfn-signal CloudFormation functionality
+"""
+import argparse
+import logging
+import psutil
+import os
+import random
+import sys
+
+#
+# --aws-credential-file=PATH Specifies the location of the file with AWS
+# credentials.
+# --aws-access-key-id=VALUE Specifies the AWS access key ID to use to
+# identify the caller.
+# --aws-secret-key=VALUE Specifies the AWS secret key to use to sign
+# the request.
+# --from-cron Specifies that this script is running from cron.
+#
+# Examples
+#
+# To perform a simple test run without posting data to Amazon CloudWatch
+#
+# ./mon-put-instance-data.pl --mem-util --verify --verbose
+#
+# To set a five-minute cron schedule to report memory and disk space utilization to CloudWatch
+#
+# */5 * * * * ~/aws-scripts-mon/mon-put-instance-data.pl --mem-util --disk-space-util --disk-path=/ --from-cron
+#
+
+if os.path.exists('/opt/aws/bin'):
+ sys.path.insert(0, '/opt/aws/bin')
+ from cfn_helper import *
+else:
+ from heat.cfntools.cfn_helper import *
+
+KILO = 1024
+MEGA = 1048576
+GIGA = 1073741824
+unit_map = {'bytes': 1,
+ 'kilobytes': KILO,
+ 'megabytes': MEGA,
+ 'gigabytes': GIGA}
+
+description = " "
+parser = argparse.ArgumentParser(description=description)
+parser.add_argument('-v', '--verbose', action="store_true",
+ help="Verbose logging", required=False)
+parser.add_argument('--service-failure', required=False, action="store_true",
+ help='Reports a service falure.')
+parser.add_argument('--mem-util', required=False, action="store_true",
+ help='Reports memory utilization in percentages.')
+parser.add_argument('--mem-used', required=False, action="store_true",
+ help='Reports memory used (excluding cache and buffers) in megabytes.')
+parser.add_argument('--mem-avail', required=False, action="store_true",
+ help='Reports available memory (including cache and buffers) in megabytes.')
+parser.add_argument('--swap-util', required=False, action="store_true",
+ help='Reports swap utilization in percentages.')
+parser.add_argument('--swap-used', required=False, action="store_true",
+ help='Reports allocated swap space in megabytes.')
+parser.add_argument('--disk-space-util', required=False, action="store_true",
+ help='Reports disk space utilization in percentages.')
+parser.add_argument('--disk-space-used', required=False, action="store_true",
+ help='Reports allocated disk space in gigabytes.')
+parser.add_argument('--disk-space-avail',required=False, action="store_true",
+ help='Reports available disk space in gigabytes.')
+parser.add_argument('--memory-units', required=False, default='megabytes',
+ help='Specifies units for memory metrics.')
+parser.add_argument('--disk-units', required=False, default='megabytes',
+ help='Specifies units for disk metrics.')
+parser.add_argument('--disk-path', required=False, default='/',
+ help='Selects the disk by the path on which to report.')
+parser.add_argument('url',
+ help='the url to post to')
+args = parser.parse_args()
+
+log_format = '%(levelname)s [%(asctime)s] %(message)s'
+logging.basicConfig(format=log_format, level=logging.DEBUG)
+logger = logging.getLogger('cfn-push-stats')
+
+data = {'Namespace': 'system/linux'}
+
+# service failure
+# ===============
+if args.service_failure:
+ data['ServiceFailure'] = {
+ 'Value': 1,
+ 'Units': 'Counter'}
+
+# memory space
+# ==========
+if args.mem_util or args.mem_used or args.mem_avail:
+ mem = psutil.phymem_usage()
+if args.mem_util:
+ data['MemoryUtilization'] = {
+ 'Value': mem.percent,
+ 'Units': 'Percent'}
+if args.mem_used:
+ data['MemoryUsed'] = {
+ 'Value': mem.used / unit_map[args.memory_units],
+ 'Units': args.memory_units}
+if args.mem_avail:
+ data['MemoryAvailable'] = {
+ 'Value': mem.free / unit_map[args.memory_units],
+ 'Units': args.memory_units}
+
+# swap space
+# ==========
+if args.swap_util or args.swap_used:
+ swap = psutil.virtmem_usage()
+if args.swap_util:
+ data['SwapUtilization'] = {
+ 'Value': swap.percent,
+ 'Units': 'Percent'}
+if args.swap_used:
+ data['SwapUsed'] = {
+ 'Value': swap.used / unit_map[args.memory_units],
+ 'Units': args.memory_units}
+
+# disk space
+# ==========
+if args.disk_space_util or args.disk_space_used or args.disk_space_avail:
+ disk = psutil.disk_usage(args.disk_path)
+if args.disk_space_util:
+ data['DiskSpaceUtilization'] = {
+ 'Value': disk.percent,
+ 'Units': 'Percent'}
+if args.disk_space_used:
+ data['DiskSpaceUsed'] = {
+ 'Value': disk.used / unit_map[args.disk_units],
+ 'Units': args.disk_units}
+if args.disk_space_avail:
+ data['DiskSpaceAvailable'] = {
+ 'Value': disk.free / unit_map[args.disk_units],
+ 'Units': args.disk_units}
+
+logger.info(str(data))
+
+cmd_str = "curl -X POST -H \'Content-Type:\' --data-binary \'%s\' %s" % \
+ (json.dumps(data), args.url)
+
+cmd = CommandRunner(cmd_str)
+cmd.run()
+print cmd.stdout
def event_create(context, values):
return IMPL.event_create(context, values)
+
+
+def watch_rule_get(context, watch_rule_name):
+ return IMPL.watch_rule_get(context, watch_rule_name)
+
+
+def watch_rule_get_all(context):
+ return IMPL.watch_rule_get_all(context)
+
+
+def watch_rule_create(context, values):
+ return IMPL.watch_rule_create(context, values)
+
+
+def watch_rule_delete(context, watch_rule_name):
+ return IMPL.watch_rule_delete(context, watch_rule_name)
+
+
+def watch_data_create(context, values):
+ return IMPL.watch_data_create(context, values)
+
+
+def watch_data_get_all(context, watch_id):
+ return IMPL.watch_data_get_all(context, watch_id)
+
+
+def watch_data_delete(context, watch_name):
+ return IMPL.watch_data_delete(context, watch_name)
event_ref.update(values)
event_ref.save()
return event_ref
+
+
+def watch_rule_get(context, watch_rule_name):
+ result = model_query(context, models.WatchRule).\
+ filter_by(name=watch_rule_name).first()
+ return result
+
+
+def watch_rule_get_all(context):
+ results = model_query(context, models.WatchRule).all()
+ return results
+
+
+def watch_rule_create(context, values):
+ obj_ref = models.WatchRule()
+ obj_ref.update(values)
+ obj_ref.save()
+ return obj_ref
+
+
+def watch_rule_delete(context, watch_name):
+ wr = model_query(context, models.WatchRule).\
+ filter_by(name=watch_name).first()
+
+ if not wr:
+ raise Exception('Attempt to delete a watch_rule with name: %s %s' % \
+ (watch_name, 'that does not exist'))
+
+ session = Session.object_session(wr)
+
+ for d in wr.watch_data:
+ session.delete(d)
+
+ session.delete(wr)
+ session.flush()
+
+
+def watch_data_create(context, values):
+ obj_ref = models.WatchData()
+ obj_ref.update(values)
+ obj_ref.save()
+ return obj_ref
+
+
+def watch_data_get_all(context, watch_id):
+ # get dataset ordered by creation_at (most recient first)
+ results = model_query(context, models.WatchData).\
+ filter_by(watch_rule_id=watch_id).all()
+ return results
+
+
+def watch_data_delete(context, watch_name):
+ ds = model_query(context, models.WatchRule).\
+ filter_by(name=watch_name).all()
+
+ if not ds:
+ raise Exception('Attempt to delete watch_data with name: %s %s' % \
+ (watch_name, 'that does not exist'))
+
+ session = Session.object_session(ds)
+ for d in ds:
+ session.delete(d)
+ session.flush()
--- /dev/null
+from sqlalchemy import *
+from migrate import *
+
+
+def upgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ watch_rule = Table(
+ 'watch_rule', meta,
+ Column('id', Integer, primary_key=True),
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('stack_name', String(length=255, convert_unicode=False,
+ assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('name', String(length=255, convert_unicode=False,
+ assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('state', String(length=255, convert_unicode=False,
+ assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('rule', Text()),
+ Column('last_evaluated', DateTime(timezone=False)),
+ )
+
+ try:
+ watch_rule.create()
+ except Exception:
+ meta.drop_all(tables=tables)
+ raise
+
+ watch_data = Table(
+ 'watch_data', meta,
+ Column('id', Integer, primary_key=True),
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('data', Text()),
+ Column('watch_rule_id', Integer, ForeignKey("watch_rule.id"),
+ nullable=False),
+ )
+
+ try:
+ watch_data.create()
+ except Exception:
+ meta.drop_all(tables=tables)
+ raise
+
+
+def downgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ watch_rule = Table('watch_rule', meta, autoload=True)
+ watch_rule.drop()
+ watch_data = Table('watch_data', meta, autoload=True)
+ watch_data.drop()
stack = relationship(Stack, backref=backref('resources'))
depends_on = Column(Integer)
+
+
+class WatchRule(BASE, HeatBase):
+ """Represents a watch_rule created by the heat engine."""
+
+ __tablename__ = 'watch_rule'
+
+ id = Column(Integer, primary_key=True)
+ name = Column('name', String, nullable=False)
+ rule = Column('rule', Json)
+ state = Column('state', String)
+ stack_name = Column('stack_name', String)
+ last_evaluated = Column(DateTime, default=timeutils.utcnow)
+
+
+class WatchData(BASE, HeatBase):
+ """Represents a watch_data created by the heat engine."""
+
+ __tablename__ = 'watch_data'
+
+ id = Column(Integer, primary_key=True)
+ data = Column('data', Json)
+
+ watch_rule_id = Column(Integer, ForeignKey('watch_rule.id'),\
+ nullable=False)
+ watch_rule = relationship(WatchRule, backref=backref('watch_data'))
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+import logging
+import json
+import os
+
+from heat.common import exception
+from heat.db import api as db_api
+from heat.engine.resources import Resource
+
+logger = logging.getLogger('heat.engine.cloud_watch')
+
+
+class CloudWatchAlarm(Resource):
+ properties_schema = {'ComparisonOperator': {'Type': 'String',
+ 'AllowedValues': ['GreaterThanOrEqualToThreshold',
+ 'GreaterThanThreshold', 'LessThanThreshold',
+ 'LessThanOrEqualToThreshold']},
+ 'EvaluationPeriods': {'Type': 'String'},
+ 'MetricName': {'Type': 'String'},
+ 'Namespace': {'Type': 'String'},
+ 'Period': {'Type': 'String'},
+ 'Statistic': {'Type': 'String',
+ 'AllowedValues': ['SampleCount', 'Average', 'Sum',
+ 'Minimum', 'Maximum']},
+ 'Threshold': {'Type': 'String'},
+ 'Units': {'Type': 'String',
+ 'AllowedValues': ['Seconds', 'Microseconds', 'Milliseconds',
+ 'Bytes', 'Kilobytes', 'Megabytes', 'Gigabytes',
+ 'Terabytes', 'Bits', 'Kilobits', 'Megabits', 'Gigabits',
+ 'Terabits', 'Percent', 'Count', 'Bytes/Second',
+ 'Kilobytes/Second', 'Megabytes/Second', 'Gigabytes/Second',
+ 'Terabytes/Second', 'Bits/Second', 'Kilobits/Second',
+ 'Megabits/Second', 'Gigabits/Second', 'Terabits/Second',
+ 'Count/Second', None]}}
+
+ def __init__(self, name, json_snippet, stack):
+ super(CloudWatchAlarm, self).__init__(name, json_snippet, stack)
+ self.instance_id = ''
+
+ def validate(self):
+ '''
+ Validate the Properties
+ '''
+ return Resource.validate(self)
+
+ def create(self):
+ if self.state != None:
+ return
+ self.state_set(self.CREATE_IN_PROGRESS)
+ Resource.create(self)
+
+ wr_values = {
+ 'name': self.name,
+ 'rule': self.t['Properties'],
+ 'state': 'NORMAL',
+ 'stack_name': self.stack.name
+ }
+
+ wr = db_api.watch_rule_create(None, wr_values)
+ self.instance_id = wr.id
+
+ self.state_set(self.CREATE_COMPLETE)
+
+ def delete(self):
+ if self.state == self.DELETE_IN_PROGRESS or \
+ self.state == self.DELETE_COMPLETE:
+ return
+
+ self.state_set(self.DELETE_IN_PROGRESS)
+
+ db_api.watch_rule_delete(None, self.name)
+
+ Resource.delete(self)
+ self.state_set(self.DELETE_COMPLETE)
+
+ def FnGetRefId(self):
+ return unicode(self.name)
import contextlib
from copy import deepcopy
+import datetime
import functools
import os
import socket
from heat.engine import parser
from heat.engine import resources
from heat.db import api as db_api
+from heat.openstack.common import timeutils
logger = logging.getLogger('heat.engine.manager')
pt.template = t
pt.save()
return [None, metadata]
+
+ def do_data_cmp(self, rule, data, threshold):
+ op = rule['ComparisonOperator']
+ if op == 'GreaterThanThreshold':
+ return data > threshold
+ elif op == 'GreaterThanOrEqualToThreshold':
+ return data >= threshold
+ elif op == 'LessThanThreshold':
+ return data < threshold
+ elif op == 'LessThanOrEqualToThreshold':
+ return data <= threshold
+ else:
+ return False
+
+ def do_data_calc(self, rule, rolling, metric):
+
+ stat = rule['Statistic']
+ if stat == 'Maximum':
+ if metric > rolling:
+ return metric
+ else:
+ return rolling
+ elif stat == 'Minimum':
+ if metric < rolling:
+ return metric
+ else:
+ return rolling
+ else:
+ return metric + rolling
+
+ @manager.periodic_task
+ def _periodic_watcher_task(self, context):
+
+ now = timeutils.utcnow()
+ wrs = db_api.watch_rule_get_all(context)
+ for wr in wrs:
+ logger.debug('_periodic_watcher_task %s' % wr.name)
+ # has enough time progressed to run the rule
+ dt_period = datetime.timedelta(seconds=int(wr.rule['Period']))
+ if now < (wr.last_evaluated + dt_period):
+ continue
+
+ # get dataset ordered by creation_at
+ # - most recient first
+ periods = int(wr.rule['EvaluationPeriods'])
+
+ # TODO fix this
+ # initial assumption: all samples are in this period
+ period = int(wr.rule['Period'])
+ #wds = db_api.watch_data_get_all(context, wr.id)
+ wds = wr.watch_data
+
+ stat = wr.rule['Statistic']
+ data = 0
+ samples = 0
+ for d in wds:
+ if d.created_at < wr.last_evaluated:
+ logger.debug('ignoring old data %s: %s < %s' % \
+ (wr.rule['MetricName'],
+ str(d.created_at),
+ str(wr.last_evaluated)))
+ continue
+ samples = samples + 1
+ metric = 1
+ data = samples
+ if stat != 'SampleCount':
+ metric = int(d.data[wr.rule['MetricName']]['Value'])
+ data = self.do_data_calc(wr.rule, data, metric)
+ logger.debug('%s: %d/%d' % (wr.rule['MetricName'],
+ metric, data))
+
+ if stat == 'Average' and samples > 0:
+ data = data / samples
+
+ alarming = self.do_data_cmp(wr.rule, data,
+ int(wr.rule['Threshold']))
+ logger.debug('%s: %d/%d => %d (current state:%s)' % \
+ (wr.rule['MetricName'],
+ int(wr.rule['Threshold']),
+ data, alarming, wr.state))
+ if alarming and wr.state != 'ALARM':
+ wr.state = 'ALARM'
+ wr.save()
+ logger.info('ALARM> stack:%s, watch_name:%s',
+ wr.stack_name, wr.name)
+ #s = db_api.stack_get(None, wr.stack_name)
+ #if s:
+ # ps = parser.Stack(s.name,
+ # s.raw_template.parsed_template.template,
+ # s.id,
+ # params)
+ # for a in wr.rule['AlarmActions']:
+ # ps.resources[a].alarm()
+
+ elif not alarming and wr.state == 'ALARM':
+ wr.state = 'NORMAL'
+ wr.save()
+ logger.info('NORMAL> stack:%s, watch_name:%s',
+ wr.stack_name, wr.name)
+
+ wr.last_evaluated = now
+
+ def create_watch_data(self, context, watch_name, stats_data):
+ '''
+ This could be used by CloudWatch and WaitConditions
+ and treat HA service events like any other CloudWatch.
+ '''
+
+ wr = db_api.watch_rule_get(context, watch_name)
+ if wr is None:
+ return ['NoSuch Watch Rule', None]
+
+ if not wr.rule['MetricName'] in stats_data:
+ return ['MetricName %s missing' % wr.rule['MetricName'], None]
+
+ watch_data = {
+ 'data': stats_data,
+ 'watch_rule_id': wr.id
+ }
+ wd = db_api.watch_data_create(context, watch_data)
+
+ return [None, wd.data]
import sys
from heat.common import exception
from heat.engine import checkeddict
+from heat.engine import cloud_watch
from heat.engine import eip
from heat.engine import instance
from heat.engine import resources
from heat.engine import volume
from heat.engine import wait_condition
from heat.db import api as db_api
+
logger = logging.getLogger(__file__)
mapper.connect('/events/',
controller=metadata_controller, action='create_event',
conditions=dict(method=['POST']))
+ mapper.connect('/stats/:watch_name/data/',
+ controller=metadata_controller,
+ action='create_watch_data',
+ conditions=dict(method=['POST']))
+# mapper.connect('/stats/:watch_name/data/',
+# controller=metadata_controller,
+# action='list_watch_data',
+# conditions=dict(method=['GET']))
# TODO(shadower): make sure all responses are JSON-encoded
# currently, calling an unknown route uses the default handler which
return json_error(400, error)
return json_response(201, event)
+ def create_watch_data(self, req, body, watch_name):
+ con = context.get_admin_context()
+ [error, watch_data] = rpc.call(con, 'engine',
+ {'method': 'create_watch_data',
+ 'args': {'watch_name': watch_name,
+ 'stats_data': body}})
+ if error:
+ return json_error(400, error)
+ return json_response(201, watch_data)
+
+ def list_watch_data(self, req, watch_name):
+ con = context.get_admin_context()
+ data = rpc.call(con, 'engine',
+ {'method': 'list_watch_data',
+ 'args': {'watch_name': watch_name}})
+ if data:
+ return data
+ else:
+ return json_error(404,
+ 'The watch "%s" does not exist.' % watch_name)
+
def create_resource(options):
"""
},
"Resources" : {
+ "WebServerRestartPolicy" : {
+ "Type" : "HEAT::Recovery::EscalationPolicy",
+ "Properties" : {
+ "Instance" : { "Ref" : "WikiDatabase" },
+ }
+ },
+ "HttpFailureAlarm": {
+ "Type": "AWS::CloudWatch::Alarm",
+ "Properties": {
+ "AlarmDescription": "Restart the WikiDatabase if httpd fails > 3 times in 10 minutes",
+ "MetricName": "ProcessRestartCount",
+ "Namespace": "HEAT",
+ "Statistic": "Maximum",
+ "Period": "300",
+ "EvaluationPeriods": "2",
+ "Threshold": "3",
+ "AlarmActions": [ { "Ref": "WebServerRestartPolicy" } ],
+ "ComparisonOperator": "GreaterThanThreshold"
+ }
+ },
"WikiDatabase": {
"Type": "AWS::EC2::Instance",
"Metadata" : {