logger.info('%s._resolve_attribute(%s) == %s' % (self.name, name, res))
return unicode(res) if res else None
- def _build_nics(self, network_interfaces, subnet_id=None):
+ def _build_nics(self, network_interfaces,
+ security_groups=None, subnet_id=None):
nics = None
'network_id': network_id,
'fixed_ips': [fixed_ip]
}
+
+ if security_groups:
+ props['security_groups'] = \
+ self._get_security_groups_id(security_groups)
+
port = neutronclient.create_port({'port': props})['port']
nics = [{'port-id': port['id']}]
return nics
+ def _get_security_groups_id(self, security_groups):
+ """Extract security_groups ids from security group list
+
+ This function will be deprecated if Neutron client resolves security
+ group name to id internally.
+
+ Args:
+ security_groups : A list contains security_groups ids or names
+ Returns:
+ A list of security_groups ids.
+ """
+ ids = []
+ response = self.neutron().list_security_groups(self.resource_id)
+ for item in response:
+ if item['security_groups'] is not None:
+ for security_group in security_groups:
+ for groups in item['security_groups']:
+ if groups['name'] == security_group \
+ and groups['id'] not in ids:
+ ids.append(groups['id'])
+ elif groups['id'] == security_group \
+ and groups['id'] not in ids:
+ ids.append(groups['id'])
+ return ids
+
def _get_security_groups(self):
security_groups = []
for property in ('SecurityGroups', 'SecurityGroupIds'):
scheduler_hints = None
nics = self._build_nics(self.properties['NetworkInterfaces'],
+ security_groups=security_groups,
subnet_id=self.properties['SubnetId'])
server = None
try:
from heat.engine import resource
from heat.engine import scheduler
from heat.engine.resources import instance as instances
+from heat.engine.resources import network_interface
from heat.engine.resources import nova_utils
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
+from neutronclient.v2_0 import client as neutronclient
+
wp_template = '''
{
'id5'
]))
+ def test_build_nics_with_security_groups(self):
+ """
+ Test the security groups defined in heat template can be associated
+ to a new created port.
+ """
+ return_server = self.fc.servers.list()[1]
+ instance = self._create_test_instance(return_server,
+ 'test_build_nics')
+
+ security_groups = ['security_group_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['fake_id_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['security_group_1', 'security_group_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['fake_id_1', 'fake_id_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['security_group_1', 'fake_id_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['security_group_1', 'fake_id_2']
+ self._test_security_groups(instance, security_groups, sg='two')
+
+ security_groups = ['wrong_group_id']
+ self._test_security_groups(instance, security_groups, sg='zero')
+
+ security_groups = ['wrong_group_id', 'fake_id_1']
+ self._test_security_groups(instance, security_groups)
+
+ security_groups = ['wrong_group_name']
+ self._test_security_groups(instance, security_groups, sg='zero')
+
+ security_groups = ['wrong_group_name', 'security_group_1']
+ self._test_security_groups(instance, security_groups)
+
+ def _test_security_groups(self, instance, security_groups, sg='one'):
+ fake_groups_list, props = self._get_fake_properties(sg)
+
+ def generate_sg_list():
+ yield fake_groups_list
+
+ nclient = neutronclient.Client()
+ self.m.StubOutWithMock(instance, 'neutron')
+ instance.neutron().MultipleTimes().AndReturn(nclient)
+
+ self.m.StubOutWithMock(neutronclient.Client, 'list_security_groups')
+ neutronclient.Client.list_security_groups(
+ instance.resource_id).AndReturn(generate_sg_list())
+
+ net_interface = network_interface.NetworkInterface
+ self.m.StubOutWithMock(net_interface, 'network_id_from_subnet_id')
+ net_interface.network_id_from_subnet_id(
+ nclient,
+ 'fake_subnet_id').MultipleTimes().AndReturn('fake_network_id')
+
+ self.m.StubOutWithMock(neutronclient.Client, 'create_port')
+ neutronclient.Client.create_port(
+ {'port': props}).MultipleTimes().AndReturn(
+ {'port': {'id': 'fake_port_id'}})
+
+ self.m.ReplayAll()
+
+ self.assertEqual(
+ [{'port-id': 'fake_port_id'}],
+ instance._build_nics(None,
+ security_groups=security_groups,
+ subnet_id='fake_subnet_id'))
+
+ self.m.VerifyAll()
+ self.m.UnsetStubs()
+
+ def _get_fake_properties(self, sg='one'):
+ fake_groups_list = {
+ 'security_groups': [
+ {
+ 'id': 'fake_id_1',
+ 'name': 'security_group_1',
+ 'security_group_rules': [],
+ 'description': 'no protocol'
+ },
+ {
+ 'id': 'fake_id_2',
+ 'name': 'security_group_2',
+ 'security_group_rules': [],
+ 'description': 'no protocol'
+ }
+ ]
+ }
+
+ fixed_ip = {'subnet_id': 'fake_subnet_id'}
+ props = {
+ 'admin_state_up': True,
+ 'network_id': 'fake_network_id',
+ 'fixed_ips': [fixed_ip],
+ 'security_groups': ['fake_id_1']
+ }
+
+ if sg == 'zero':
+ props['security_groups'] = []
+ elif sg == 'one':
+ props['security_groups'] = ['fake_id_1']
+ elif sg == 'two':
+ props['security_groups'] = ['fake_id_1', 'fake_id_2']
+
+ return fake_groups_list, props
+
def test_instance_without_ip_address(self):
return_server = self.fc.servers.list()[3]
instance = self._create_test_instance(return_server,