def get_security_group(self, context, id, fields=None, tenant_id=None):
"""Tenant id is given to handle the case when we
- are creating a security group or security group rule on behalf of
- another use.
+ are creating a security group rule on behalf of another use.
"""
if tenant_id:
context.tenant_id = tenant_id
try:
- ret = self._make_security_group_dict(self._get_security_group(
- context, id), fields)
+ with context.session.begin(subtransactions=True):
+ ret = self._make_security_group_dict(self._get_security_group(
+ context, id), fields)
+ ret['security_group_rules'] = self.get_security_group_rules(
+ context, {'security_group_id': [id]})
finally:
if tenant_id:
context.tenant_id = tmp_context_tenant_id
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
+ 'security_group_rules': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
},
'security_group_rules': {
'id': {'allow_post': False, 'allow_put': False,
with self.security_group(name, description) as sg:
source_group_id = sg['security_group']['id']
res = self.new_show_request('security-groups', source_group_id)
- group = self.deserialize('json', res.get_response(self.ext_api))
- self.assertEqual(group['security_group']['id'], source_group_id)
+
+ security_group_id = sg['security_group']['id']
+ direction = "ingress"
+ source_ip_prefix = "10.0.0.0/24"
+ protocol = 'tcp'
+ port_range_min = 22
+ port_range_max = 22
+ keys = [('source_ip_prefix', source_ip_prefix),
+ ('security_group_id', security_group_id),
+ ('direction', direction),
+ ('protocol', protocol),
+ ('port_range_min', port_range_min),
+ ('port_range_max', port_range_max)]
+ with self.security_group_rule(security_group_id, direction,
+ protocol, port_range_min,
+ port_range_max,
+ source_ip_prefix):
+
+ group = self.deserialize(
+ 'json', res.get_response(self.ext_api))
+ sg_rule = group['security_group']['security_group_rules']
+ self.assertEqual(group['security_group']['id'],
+ source_group_id)
+ self.assertEqual(len(sg_rule), 1)
+ for k, v, in keys:
+ self.assertEqual(sg_rule[0][k], v)
def test_delete_security_group(self):
name = 'webservers'