self.m.StubOutWithMock(quantumclient.Client, 'remove_interface_router')
self.m.StubOutWithMock(quantumclient.Client, 'show_subnet')
self.m.StubOutWithMock(quantumclient.Client, 'show_network')
+ self.m.StubOutWithMock(quantumclient.Client, 'create_security_group')
+ self.m.StubOutWithMock(quantumclient.Client, 'show_security_group')
+ self.m.StubOutWithMock(quantumclient.Client, 'delete_security_group')
+ self.m.StubOutWithMock(
+ quantumclient.Client, 'create_security_group_rule')
+ self.m.StubOutWithMock(
+ quantumclient.Client, 'delete_security_group_rule')
def create_stack(self, template):
t = template_format.parse(template)
u'bbbb',
{'subnet_id': 'cccc'}).AndReturn(None)
+ def mock_create_security_group(self):
+ quantumclient.Client.create_security_group({
+ 'security_group': {
+ 'name': 'test_stack.the_sg',
+ 'description': 'SSH access'
+ }
+ }).AndReturn({
+ 'security_group': {
+ 'tenant_id': 'c1210485b2424d48804aad5d39c61b8f',
+ 'name': 'test_stack.the_sg',
+ 'description': 'SSH access',
+ 'security_group_rules': [],
+ 'id': 'aaaa'
+ }
+ })
+
+ quantumclient.Client.create_security_group_rule({
+ 'security_group_rule': {
+ 'direction': 'ingress',
+ 'remote_ip_prefix': '0.0.0.0/0',
+ 'port_range_min': 22,
+ 'ethertype': 'IPv4',
+ 'port_range_max': 22,
+ 'protocol': 'tcp',
+ 'security_group_id': 'aaaa'
+ }
+ }).AndReturn({
+ 'security_group_rule': {
+ 'direction': 'ingress',
+ 'remote_ip_prefix': '0.0.0.0/0',
+ 'port_range_min': 22,
+ 'ethertype': 'IPv4',
+ 'port_range_max': 22,
+ 'protocol': 'tcp',
+ 'security_group_id': 'aaaa',
+ 'id': 'bbbb'
+ }
+ })
+
+ def mock_delete_security_group(self):
+ quantumclient.Client.show_security_group('aaaa').AndReturn({
+ 'security_group': {
+ 'tenant_id': 'c1210485b2424d48804aad5d39c61b8f',
+ 'name': 'sc1',
+ 'description': '',
+ 'security_group_rules': [{
+ 'direction': 'ingress',
+ 'protocol': 'tcp',
+ 'port_range_max': 22,
+ 'id': 'bbbb',
+ 'ethertype': 'IPv4',
+ 'security_group_id': 'aaaa',
+ 'remote_ip_prefix': '0.0.0.0/0',
+ 'tenant_id': 'c1210485b2424d48804aad5d39c61b8f',
+ 'port_range_min': 22
+ }],
+ 'id': 'aaaa'}})
+ quantumclient.Client.delete_security_group_rule('bbbb').AndReturn(None)
+ quantumclient.Client.delete_security_group('aaaa').AndReturn(None)
+
def mock_delete_network(self):
quantumclient.Client.delete_router('bbbb').AndReturn(None)
quantumclient.Client.delete_network('aaaa').AndReturn(None)
test_template = '''
HeatTemplateFormatVersion: '2012-12-12'
Resources:
+ the_sg:
+ Type: AWS::EC2::SecurityGroup
+ Properties:
+ VpcId: {Ref: the_vpc}
+ GroupDescription: SSH access
+ SecurityGroupIngress:
+ - IpProtocol: tcp
+ FromPort: 22
+ ToPort: 22
+ CidrIp: 0.0.0.0/0
the_vpc:
Type: AWS::EC2::VPC
Properties: {CidrBlock: '10.0.0.0/16'}
Properties:
PrivateIpAddress: 10.0.0.100
SubnetId: {Ref: the_subnet}
+ GroupSet:
+ - Ref: the_sg
+'''
+
+ test_template_error = '''
+HeatTemplateFormatVersion: '2012-12-12'
+Resources:
+ the_sg:
+ Type: AWS::EC2::SecurityGroup
+ Properties:
+ VpcId: {Ref: the_vpc}
+ GroupDescription: SSH access
+ SecurityGroupIngress:
+ - IpProtocol: tcp
+ FromPort: 22
+ ToPort: 22
+ CidrIp: 0.0.0.0/0
+ the_vpc:
+ Type: AWS::EC2::VPC
+ Properties: {CidrBlock: '10.0.0.0/16'}
+ the_subnet:
+ Type: AWS::EC2::Subnet
+ Properties:
+ CidrBlock: 10.0.0.0/24
+ VpcId: {Ref: the_vpc}
+ AvailabilityZone: moon
+ the_nic:
+ Type: AWS::EC2::NetworkInterface
+ Properties:
+ PrivateIpAddress: 10.0.0.100
+ SubnetId: {Ref: the_subnet}
+ GroupSet:
+ - Ref: INVALID-REF-IN-TEMPLATE
'''
def mock_create_network_interface(self):
'ip_address': u'10.0.0.100'
}],
'name': u'test_stack.the_nic',
- 'admin_state_up': True
+ 'admin_state_up': True,
+ 'security_groups': ['aaaa']
}}).AndReturn({
'port': {
'admin_state_up': True,
quantumclient.Client.delete_port('dddd').AndReturn(None)
def test_network_interface(self):
+ self.mock_create_security_group()
self.mock_create_network()
self.mock_create_subnet()
self.mock_create_network_interface()
self.mock_delete_network_interface()
self.mock_delete_subnet()
self.mock_delete_network()
+ self.mock_delete_security_group()
self.m.ReplayAll()
stack.delete()
self.m.VerifyAll()
+ def test_network_interface_error(self):
+ self.assertRaises(
+ KeyError,
+ self.create_stack,
+ self.test_template_error)
+
class InternetGatewayTest(VPCTestBase):