instance.create()
# this makes sure the auto increment worked on instance creation
- assert(instance.id > 0)
+ self.assertTrue(instance.id > 0)
def test_instance_create_delete(self):
f = open("%s/WordPress_Single_Instance_gold.template" % self.path)
instance.create()
# this makes sure the auto increment worked on instance creation
- assert(instance.id > 0)
+ self.assertTrue(instance.id > 0)
instance.delete()
- assert(instance.instance_id is None)
- assert(instance.state == instance.DELETE_COMPLETE)
+ self.assertTrue(instance.instance_id is None)
+ self.assertEqual(instance.state, instance.DELETE_COMPLETE)
# allows testing of the test directly, shown below
if __name__ == '__main__':
from nose.plugins.attrib import attr
from nose import with_setup
-from heat. common import context
+from heat.common import context
from heat.tests.v1_1 import fakes
from heat.engine import instance as instances
import heat.db as db_api
stack = self.start_wordpress_stack('test_stack')
self.m.ReplayAll()
stack.create()
- assert(stack.resources['WebServer'] is not None)
- assert(stack.resources['WebServer'].instance_id > 0)
- assert(stack.resources['WebServer'].ipaddress != '0.0.0.0')
+
+ self.assertNotEqual(stack.resources['WebServer'], None)
+ self.assertTrue(stack.resources['WebServer'].instance_id > 0)
+ self.assertNotEqual(stack.resources['WebServer'].ipaddress, '0.0.0.0')
def test_wordpress_single_instance_stack_delete(self):
stack = self.start_wordpress_stack('test_stack')
pt['raw_template_id'] = new_rt.id
new_pt = db_api.parsed_template_create(None, pt)
stack.create()
- assert(stack.resources['WebServer'] is not None)
- assert(stack.resources['WebServer'].instance_id > 0)
+ self.assertNotEqual(stack.resources['WebServer'], None)
+ self.assertTrue(stack.resources['WebServer'].instance_id > 0)
+
stack.delete()
- assert(stack.resources['WebServer'].state == 'DELETE_COMPLETE')
- assert(new_s.status == 'DELETE_COMPLETE')
+
+ self.assertEqual(stack.resources['WebServer'].state, 'DELETE_COMPLETE')
+ self.assertEqual(new_s.status, 'DELETE_COMPLETE')
def test_stack_event_list(self):
stack = self.start_wordpress_stack('test_event_list_stack')
pt['raw_template_id'] = new_rt.id
new_pt = db_api.parsed_template_create(None, pt)
stack.create()
- assert(stack.resources['WebServer'] is not None)
- assert(stack.resources['WebServer'].instance_id > 0)
+
+ self.assertNotEqual(stack.resources['WebServer'], None)
+ self.assertTrue(stack.resources['WebServer'].instance_id > 0)
m = manager.EngineManager()
events = db_api.event_get_all_by_stack(None, stack.id)
for ev in events:
result = m.parse_event(ev)
- assert(result['EventId'] > 0)
- assert(result['StackName'] == "test_event_list_stack")
- assert(result['ResourceStatus'] in ('IN_PROGRESS',
- 'CREATE_COMPLETE'))
- assert(result['ResourceType'] == 'AWS::EC2::Instance')
- assert(result['ResourceStatusReason'] == 'state changed')
- assert(result['LogicalResourceId'] == 'WebServer')
+ self.assertTrue(result['EventId'] > 0)
+ self.assertEqual(result['StackName'], "test_event_list_stack")
+ self.assertTrue(result['ResourceStatus'] in ('IN_PROGRESS',
+ 'CREATE_COMPLETE'))
+ self.assertEqual(result['ResourceType'], 'AWS::EC2::Instance')
+ self.assertEqual(result['ResourceStatusReason'], 'state changed')
+ self.assertEqual(result['LogicalResourceId'], 'WebServer')
# Big long user data field.. it mentions 'wordpress'
# a few times so this should work.
- assert(result['ResourceProperties']['UserData'].find('wordpress')
- != -1)
- assert(result['ResourceProperties']['ImageId']
- == 'F16-x86_64-gold')
- assert(result['ResourceProperties']['InstanceType'] == 'm1.large')
+ user_data = result['ResourceProperties']['UserData']
+ self.assertNotEqual(user_data.find('wordpress'), -1)
+ self.assertEqual(result['ResourceProperties']['ImageId'],
+ 'F16-x86_64-gold')
+ self.assertEqual(result['ResourceProperties']['InstanceType'],
+ 'm1.large')
def test_stack_list(self):
stack = self.start_wordpress_stack('test_stack_list')
man = manager.EngineManager()
sl = man.list_stacks(ctx, params)
- assert(len(sl) > 0)
+ self.assertTrue(len(sl['stacks']) > 0)
for s in sl['stacks']:
- assert(s['StackId'] > 0)
- assert(s['TemplateDescription'].find('WordPress') != -1)
+ self.assertTrue(s['StackId'] > 0)
+ self.assertNotEqual(s['TemplateDescription'].find('WordPress'), -1)
# allows testing of the test directly
if __name__ == '__main__':
self.m.ReplayAll()
volumeattach = stack.resources['MountPoint']
- assert(volumeattach.validate())
+ self.assertTrue(volumeattach.validate())
def test_validate_ref_valid(self):
t = json.loads(test_template_ref % 'WikiDatabase')
res = dict(manager.
validate_template(None, t, params)['ValidateTemplateResult'])
print 'res %s' % res
- assert (res['Description'] == 'Successfully validated')
+ self.assertEqual(res['Description'], 'Successfully validated')
def test_validate_ref_invalid(self):
t = json.loads(test_template_ref % 'WikiDatabasez')
manager = managers.EngineManager()
res = dict(manager.
validate_template(None, t, params)['ValidateTemplateResult'])
- assert (res['Description'] != 'Successfully validated')
+ self.assertNotEqual(res['Description'], 'Successfully validated')
def test_validate_findinmap_valid(self):
t = json.loads(test_template_findinmap_valid)
manager = managers.EngineManager()
res = dict(manager.
validate_template(None, t, params)['ValidateTemplateResult'])
- assert (res['Description'] == 'Successfully validated')
+ self.assertEqual(res['Description'], 'Successfully validated')
def test_validate_findinmap_invalid(self):
t = json.loads(test_template_findinmap_invalid)
manager = managers.EngineManager()
res = dict(manager.
validate_template(None, t, params)['ValidateTemplateResult'])
- assert (res['Description'] != 'Successfully validated')
+ self.assertNotEqual(res['Description'], 'Successfully validated')
# allows testing of the test directly, shown below
if __name__ == '__main__':
self.m.ReplayAll()
self.greenpool.spawn_n(stack.create)
eventlet.sleep(1)
- assert(stack.resources['WaitForTheHandle'].state == 'IN_PROGRESS')
+ self.assertEqual(stack.resources['WaitForTheHandle'].state,
+ 'IN_PROGRESS')
r = db_api.resource_get_by_name_and_stack(None, 'WaitHandle',
stack.id)
- assert(r.name == 'WaitHandle')
+ self.assertEqual(r.name, 'WaitHandle')
metadata = {"Status": "SUCCESS",
"Reason": "woot toot",
eventlet.sleep(2)
logger.debug('state %s' % stack.resources['WaitForTheHandle'].state)
- assert(stack.resources['WaitForTheHandle'].state == 'CREATE_COMPLETE')
+ self.assertEqual(stack.resources['WaitForTheHandle'].state,
+ 'CREATE_COMPLETE')
self.greenpool.waitall()
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
data.append(WatchData(25, now - datetime.timedelta(seconds=250)))
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'ALARM')
+ self.assertEqual(new_state, 'ALARM')
@attr(tag=['unit', 'watchrule'])
@attr(speed='fast')
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
data.append(WatchData(35, now - datetime.timedelta(seconds=150)))
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'ALARM')
+ self.assertEqual(new_state, 'ALARM')
@attr(tag=['unit', 'watchrule'])
@attr(speed='fast')
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
# only 3 samples -> ALARM
data.append(WatchData(1, now - datetime.timedelta(seconds=200)))
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'ALARM')
+ self.assertEqual(new_state, 'ALARM')
# only 3 samples (one old) -> NORMAL
data.pop(0)
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
@attr(tag=['unit', 'watchrule'])
@attr(speed='fast')
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
# sum > 100 -> ALARM
data.append(WatchData(85, now - datetime.timedelta(seconds=150)))
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'ALARM')
+ self.assertEqual(new_state, 'ALARM')
@attr(tag=['unit', 'watchrule'])
@attr(speed='fast')
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'NORMAL')
+ self.assertEqual(new_state, 'NORMAL')
data.append(WatchData(195, now - datetime.timedelta(seconds=250)))
watcher = watchrule.WatchRule(rule, data, last, now)
new_state = watcher.get_alarm_state()
logger.info(new_state)
- assert(new_state == 'ALARM')
+ self.assertEqual(new_state, 'ALARM')