Contrary to the nose docs, the class can be a unittest.TestCase subclass
'''
@classmethod
- def setupAll(self):
+ def setupAll(cls):
print "SETUPALL"
template = 'WordPress_Single_Instance.template'
'DBUsername=dbuser',
'DBPassword=' + os.environ['OS_PASSWORD']])
- self.logical_resource_name = 'WikiDatabase'
- self.logical_resource_type = 'AWS::EC2::Instance'
- self.stack = util.Stack(template, 'F17', 'x86_64', 'cfntools',
+ cls.logical_resource_name = 'WikiDatabase'
+ cls.logical_resource_type = 'AWS::EC2::Instance'
+
+ # Just to get the assert*() methods
+ class CfnApiFunctions(unittest.TestCase):
+ @unittest.skip('Not a real test case')
+ def runTest(self):
+ pass
+
+ inst = CfnApiFunctions()
+ cls.stack = util.Stack(inst, template, 'F17', 'x86_64', 'cfntools',
stack_paramstr)
- self.WikiDatabase = util.Instance(self.logical_resource_name)
- self.stack.create()
- self.WikiDatabase.wait_for_boot()
- self.WikiDatabase.check_cfntools()
- self.WikiDatabase.wait_for_provisioning()
-
- self.logical_resource_status = "CREATE_COMPLETE"
-
- # Save some compiled regexes and strings for response validation
- self.stack_id_re = re.compile("^arn:openstack:heat::admin:stacks/"
- + self.stack.stackname)
- self.time_re = re.compile(
- "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$")
- self.description_re = re.compile("^AWS CloudFormation Sample Template")
- self.stack_status = "CREATE_COMPLETE"
- self.stack_status_reason = "Stack successfully created"
- self.stack_timeout = str(60)
- self.stack_disable_rollback = "True"
-
- # Match the expected format for physical resource ID for an instance
- self.phys_res_id_re = re.compile(
- "^[0-9a-z]*-[0-9a-z]*-[0-9a-z]*-[0-9a-z]*-[0-9a-z]*$")
+ cls.WikiDatabase = util.Instance(inst, cls.logical_resource_name)
+
+ try:
+ cls.stack.create()
+ cls.WikiDatabase.wait_for_boot()
+ cls.WikiDatabase.check_cfntools()
+ cls.WikiDatabase.wait_for_provisioning()
+
+ cls.logical_resource_status = "CREATE_COMPLETE"
+
+ # Save some compiled regexes and strings for response validation
+ cls.stack_id_re = re.compile("^arn:openstack:heat::admin:stacks/"
+ + cls.stack.stackname)
+ cls.time_re = re.compile(
+ "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$")
+ cls.description_re = re.compile(
+ "^AWS CloudFormation Sample Template")
+ cls.stack_status = "CREATE_COMPLETE"
+ cls.stack_status_reason = "Stack successfully created"
+ cls.stack_timeout = str(60)
+ cls.stack_disable_rollback = "True"
+
+ # Match the expected format for an instance's physical resource ID
+ cls.phys_res_id_re = re.compile(
+ "^[0-9a-z]*-[0-9a-z]*-[0-9a-z]*-[0-9a-z]*-[0-9a-z]*$")
+ except:
+ cls.stack.cleanup()
+ raise
@classmethod
- def teardownAll(self):
+ def teardownAll(cls):
print "TEARDOWNALL"
- self.stack.cleanup()
+ cls.stack.cleanup()
def test_instance(self):
# ensure wordpress was installed by checking for expected
from glance import client as glance_client
from novaclient.v1_1 import client as nova_client
+import heat
from heat import utils
from heat.engine import parser
from heat import client as heat_client
DEFAULT_STACKNAME = 'teststack'
+# this test is in heat/tests/functional, so go up 3 dirs
+basepath = os.path.join(heat.__path__[0], os.path.pardir)
+
+
class Instance(object):
- def __init__(self, instance_name, stackname=DEFAULT_STACKNAME):
+ def __init__(self, testcase, instance_name, stackname=DEFAULT_STACKNAME):
+ self.testcase = testcase
self.name = '%s.%s' % (stackname, instance_name)
# during nose test execution this file will be imported even if
except KeyError:
raise SkipTest('OS_AUTH_STRATEGY unset, skipping functional test')
- if os.environ['OS_AUTH_STRATEGY'] != 'keystone':
- print 'keystone authentication required'
- assert False
+ self.testcase.assertEqual(os.environ['OS_AUTH_STRATEGY'],
+ 'keystone',
+ 'keystone authentication required')
self.creds = dict(username=os.environ['OS_USERNAME'],
password=os.environ['OS_PASSWORD'],
strategy=os.environ['OS_AUTH_STRATEGY'])
dbusername = 'testuser'
- # this test is in heat/tests/functional, so go up 3 dirs
- basepath = os.path.abspath(
- os.path.dirname(os.path.realpath(__file__)) + '/../../..')
-
self.novaclient = nova_client.Client(self.creds['username'],
self.creds['password'], self.creds['tenant'],
self.creds['auth_url'], service_type='compute')
self.ip = address.items()[0][1][0]['addr']
time.sleep(10)
tries += 1
- assert tries < 500
+ self.testcase.assertTrue(tries < 500, 'Timed out')
print 'Instance (%s) ip (%s) status (%s)' % (self.name, self.ip,
server.status)
(self.name, self.ip))
time.sleep(10)
tries += 1
- assert tries < 50
+ self.testcase.assertTrue(tries < 50, 'Timed out')
else:
print 'Instance (%s) ip (%s) SSH detected.' % (self.name,
self.ip)
while True:
try:
tries += 1
- assert tries < 50
+ self.testcase.assertTrue(tries < 50, 'Timed out')
self.ssh.connect(self.ip, username='ec2-user',
allow_agent=True, look_for_keys=True, password='password')
except paramiko.AuthenticationException:
except IOError, e:
tries += 1
if e.errno == errno.ENOENT:
- assert tries < 50
+ self.testcase.assertTrue(tries < 50, 'Timed out')
print("Instance (%s) ip (%s) not booted, waiting..." %
(self.name, self.ip))
time.sleep(15)
print "Verifying file '%s' exists" % path
stdin, stdout, sterr = self.ssh.exec_command('ls "%s"' % path)
lines = stdout.readlines()
- assert len(lines) == 1
+ self.testcase.assertEqual(len(lines), 1)
result = lines.pop().rstrip()
return result == path
data = files.pop().split(' ')
cur_file = data[1].rstrip()
if cur_file in cfn_tools_files:
- assert data[0] == cfntools[cur_file]
+ self.testcase.assertEqual(data[0], cfntools[cur_file])
print 'Instance (%s) cfntools integrity verified.' % self.name
def wait_for_provisioning(self):
except IOError, e:
tries += 1
if e.errno == errno.ENOENT:
- assert tries < 500
+ self.testcase.assertTrue(tries < 500, 'Timed out')
print("Instance (%s) provisioning incomplete, waiting..." %
self.name)
time.sleep(15)
# sudo chmod 777 /var/lib/cloud/instance/user-data.txt.i\n')
# time.sleep(1) # necessary for sendall to complete
- f = open(self.basepath + '/templates/' + template_file)
+ f = open(basepath + '/templates/' + template_file)
t = json.loads(f.read())
f.close()
t_data_list.insert(len(t_data_list) - 1,
u'touch /var/lib/cloud/instance/provision-finished')
- assert t_data_list == remote_file_list_u
+ self.testcase.assertEqual(t_data_list, remote_file_list_u)
remote_file = self.sftp.open('/var/lib/cloud/instance/user-data.txt.i')
msg = email.message_from_file(remote_file)
remote_file.close()
filepaths = {
- 'cloud-config': self.basepath + '/heat/cloudinit/config',
- 'part-handler.py': self.basepath +
+ 'cloud-config': basepath + '/heat/cloudinit/config',
+ 'part-handler.py': basepath +
'/heat/cloudinit/part-handler.py'
}
if file in filepaths.keys():
with open(filepaths[file]) as f:
- assert data == f.read()
+ self.testcase.assertEqual(data, f.read())
def get_ssh_client(self):
if self.ssh.get_transport() != None:
class Stack(object):
- def __init__(self, template_file, distribution, arch, jeos_type,
+
+ def __init__(self, testcase, template_file, distribution, arch, jeos_type,
stack_paramstr, stackname=DEFAULT_STACKNAME):
+ self.testcase = testcase
self.stackname = stackname
self.template_file = template_file
self.distribution = distribution
self.stack_paramstr = stack_paramstr
+
+ self.creds = dict(username=os.environ['OS_USERNAME'],
+ password=os.environ['OS_PASSWORD'],
+ tenant=os.environ['OS_TENANT_NAME'],
+ auth_url=os.environ['OS_AUTH_URL'],
+ strategy=os.environ['OS_AUTH_STRATEGY'])
+ self.dbusername = 'testuser'
+
+ self.testcase.assertEqual(os.environ['OS_AUTH_STRATEGY'],
+ 'keystone',
+ 'keystone authentication required')
+
self.prepare_jeos(distribution, arch, jeos_type)
self.novaclient = nova_client.Client(self.creds['username'],
def create(self):
self.keyname = self.novaclient.keypairs.list().pop().name
- assert self.heatclient
+ self.testcase.assertTrue(self.heatclient)
full_paramstr = ';'.join([self.stack_paramstr,
'KeyName=' + self.keyname,
# Format parameters and create the stack
parameters = {}
parameters['StackName'] = self.stackname
- template_path = self.basepath + '/templates/' + self.template_file
+ template_path = os.path.join(basepath,
+ 'templates',
+ self.template_file)
parameters['TemplateBody'] = open(template_path).read()
parameters.update(self.heatclient.format_parameters(template_params))
result = self.heatclient.create_stack(**parameters)
tries = 0
print 'Waiting for stack creation to be completed'
- while self.in_state('CREATE_IN_PROGRESS'):
+ while self.get_state() == 'CREATE_IN_PROGRESS':
tries += 1
- assert tries < 500
+ self.testcase.assertTrue(tries < 500, 'Timed out')
time.sleep(10)
- assert self.in_state('CREATE_COMPLETE')
+ self.testcase.assertEqual(self.get_state(), 'CREATE_COMPLETE')
def _check_create_result(self, result):
# Check result looks OK
root = etree.fromstring(result)
create_list = root.xpath('/CreateStackResponse/CreateStackResult')
- assert create_list
- assert len(create_list) == 1
+ self.testcase.assertTrue(create_list)
+ self.testcase.assertEqual(len(create_list), 1)
# Extract StackId from the result, and check the StackName part
stackid = create_list[0].findtext('StackId')
idname = stackid.split('/')[1]
print "Checking %s contains name %s" % (stackid, self.stackname)
- assert idname == self.stackname
+ self.testcase.assertEqual(idname, self.stackname)
def _create_heat_client(self):
return heat_client.get_client('0.0.0.0', 8000,
self.creds['tenant'], self.creds['auth_url'],
self.creds['strategy'], None, None, False)
- # during nose test execution this file will be imported even if
- # the unit tag was specified
- try:
- os.environ['OS_AUTH_STRATEGY']
- except KeyError:
- raise SkipTest('OS_AUTH_STRATEGY not set, skipping functional test')
-
- if os.environ['OS_AUTH_STRATEGY'] != 'keystone':
- print 'keystone authentication required'
- assert False
-
- creds = dict(username=os.environ['OS_USERNAME'],
- password=os.environ['OS_PASSWORD'],
- tenant=os.environ['OS_TENANT_NAME'],
- auth_url=os.environ['OS_AUTH_URL'],
- strategy=os.environ['OS_AUTH_STRATEGY'])
- dbusername = 'testuser'
-
- # this test is in heat/tests/functional, so go up 3 dirs
- basepath = os.path.abspath(
- os.path.dirname(os.path.realpath(__file__)) + '/../../..')
-
- novaclient = None
- glanceclient = None
- heatclient = None
-
def get_state(self):
stack_list = self.heatclient.list_stacks(StackName=self.stackname)
root = etree.fromstring(stack_list)
if len(alist):
item = alist.pop()
result = item.findtext("StackStatus")
+ if result and result.find('FAILED') >= 0:
+ print stack_list
return result
- def in_state(self, state):
- return state == self.get_state()
-
def cleanup(self):
parameters = {'StackName': self.stackname}
c = self.get_heat_client()
print 'Waiting for stack deletion to be completed'
tries = 0
- while self.in_state('DELETE_IN_PROGRESS'):
+ while self.get_state() == 'DELETE_IN_PROGRESS':
tries += 1
- assert tries < 50
+ self.testcase.assertTrue(tries < 50, 'Timed out')
time.sleep(10)
# final state for all stacks is DELETE_COMPLETE, but then they
# dissappear hence no result from list_stacks/get_state
# depending on timing, we could get either result here
end_state = self.get_state()
- assert (end_state == 'DELETE_COMPLETE' or end_state == None)
+ if end_state is not None:
+ self.testcase.assertEqual(end_state, 'DELETE_COMPLETE')
def get_nova_client(self):
if self.novaclient != None:
# skip creating jeos if image already available
if not self.poll_glance(self.glanceclient, imagename, False):
- if os.geteuid() != 0:
- print 'test must be run as root to create jeos'
- assert False
+ self.testcase.assertEqual(os.geteuid(), 0,
+ 'No JEOS found - run as root to create')
# -d: debug, -G: register with glance
subprocess.call(['heat-jeos', '-d', '-G', 'create', imagename])
tries = 0
while imagelistname != imagename:
tries += 1
- assert tries < 50
+ self.testcase.assertTrue(tries < 50, 'Timed out')
if block:
time.sleep(15)
print "Checking glance for image registration"
'''
root = etree.fromstring(response)
output_list = root.xpath(prefix)
- assert output_list
- assert len(output_list) == 1
+ self.testcase.assertTrue(output_list)
+ self.testcase.assertEqual(len(output_list), 1)
output = output_list.pop()
value = output.findtext(key)
return value
auth_url=self.creds['auth_url'])
ksusers = keystone.users.list()
ksuid = [u.id for u in ksusers if u.name == self.creds['username']]
- assert len(ksuid) == 1
+ self.testcase.assertEqual(len(ksuid), 1)
ec2creds = keystone.ec2.list(ksuid[0])
- assert len(ec2creds) == 1
- assert ec2creds[0].access
- assert ec2creds[0].secret
+ self.testcase.assertEqual(len(ec2creds), 1)
+ self.testcase.assertTrue(ec2creds[0].access)
+ self.testcase.assertTrue(ec2creds[0].secret)
print "Got EC2 credentials from keystone"
# most of the arguments passed to heat_client_boto are for
return [e.physical_resource_id for e in events if match(e)]
def _find_stack_output(self, result, output_key):
- assert len(result) == 1
+ self.testcase.assertEqual(len(result), 1)
for o in result[0].outputs:
if o.key == output_key: