return nics
- def handle_create(self):
+ def _get_security_groups(self):
security_groups = []
for property in ('SecurityGroups', 'SecurityGroupIds'):
if self.properties.get(property) is not None:
security_groups.append(sg)
if not security_groups:
security_groups = None
+ return security_groups
+
+ def handle_create(self):
+ security_groups = self._get_security_groups()
userdata = self.properties['UserData'] or ''
flavor = self.properties['InstanceType']
return res
# check validity of key
- try:
- key_name = self.properties['KeyName']
- if key_name is None:
- return
- except ValueError:
- return
- else:
+ key_name = self.properties.get('KeyName', None)
+ if key_name:
keypairs = self.nova().keypairs.list()
- for k in keypairs:
- if k.name == key_name:
- return
- return {'Error':
- 'Provided KeyName is not registered with nova'}
+ if not any(k.name == key_name for k in keypairs):
+ return {'Error':
+ 'Provided KeyName is not registered with nova'}
+
+ # check validity of security groups vs. network interfaces
+ security_groups = self._get_security_groups()
+ if security_groups and self.properties.get('NetworkInterfaces'):
+ return {'Error':
+ 'Cannot define both SecurityGroups/SecurityGroupIds and '
+ 'NetworkInterfaces properties.'}
+
+ return
def _delete_server(self, server):
'''
}
'''
+test_unregistered_key = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "test.",
+ "Parameters" : {
+
+ "KeyName" : {
+''' + \
+ '"Description" : "Name of an existing EC2' + \
+ 'KeyPair to enable SSH access to the instances",' + \
+ '''
+ "Type" : "String"
+ }
+ },
+
+ "Resources" : {
+ "Instance": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ "ImageId": "image_name",
+ "InstanceType": "m1.large",
+ "KeyName": { "Ref" : "KeyName" }
+ }
+ }
+ }
+ }
+ '''
+
+test_template_invalid_secgroups = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "test.",
+ "Parameters" : {
+
+ "KeyName" : {
+''' + \
+ '"Description" : "Name of an existing EC2' + \
+ 'KeyPair to enable SSH access to the instances",' + \
+ '''
+ "Type" : "String"
+ }
+ },
+
+ "Resources" : {
+ "Instance": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ "ImageId": "image_name",
+ "InstanceType": "m1.large",
+ "KeyName": { "Ref" : "KeyName" },
+ "SecurityGroups": [ "default" ],
+ "NetworkInterfaces": [ "mgmt", "data" ]
+ }
+ }
+ }
+ }
+ '''
+
+test_template_invalid_secgroupids = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "test.",
+ "Parameters" : {
+
+ "KeyName" : {
+''' + \
+ '"Description" : "Name of an existing EC2' + \
+ 'KeyPair to enable SSH access to the instances",' + \
+ '''
+ "Type" : "String"
+ }
+ },
+
+ "Resources" : {
+ "Instance": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ "ImageId": "image_name",
+ "InstanceType": "m1.large",
+ "KeyName": { "Ref" : "KeyName" },
+ "SecurityGroupIds": [ "default" ],
+ "NetworkInterfaces": [ "mgmt", "data" ]
+ }
+ }
+ }
+ }
+ '''
+
class validateTest(HeatTestCase):
def setUp(self):
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t))
self.assertEqual(res, {'Description': u'test.', 'Parameters': {}})
+
+ def test_unregistered_key(self):
+ t = template_format.parse(test_unregistered_key)
+ template = parser.Template(t)
+ params = parser.Parameters(
+ 'test_stack', template, {'KeyName': 'not_registered'})
+ stack = parser.Stack(None, 'test_stack', template, params)
+
+ self.m.StubOutWithMock(instances.Instance, 'nova')
+ instances.Instance.nova().AndReturn(self.fc)
+ instances.Instance.nova().AndReturn(self.fc)
+ self.m.ReplayAll()
+
+ resource = stack.resources['Instance']
+ self.assertNotEqual(resource.validate(), None)
+
+ def test_invalid_security_groups_with_nics(self):
+ t = template_format.parse(test_template_invalid_secgroups)
+ template = parser.Template(t)
+ params = parser.Parameters('test_stack', template, {'KeyName': 'test'})
+ stack = parser.Stack(None, 'test_stack', template, params)
+
+ self.m.StubOutWithMock(instances.Instance, 'nova')
+ instances.Instance.nova().AndReturn(self.fc)
+ self.m.ReplayAll()
+
+ resource = stack.resources['Instance']
+ self.assertNotEqual(resource.validate(), None)
+
+ def test_invalid_security_group_ids_with_nics(self):
+ t = template_format.parse(test_template_invalid_secgroupids)
+ template = parser.Template(t)
+ params = parser.Parameters('test_stack', template, {'KeyName': 'test'})
+ stack = parser.Stack(None, 'test_stack', template, params)
+
+ self.m.StubOutWithMock(instances.Instance, 'nova')
+ instances.Instance.nova().AndReturn(self.fc)
+ self.m.ReplayAll()
+
+ resource = stack.resources['Instance']
+ self.assertNotEqual(resource.validate(), None)