tenant_id=tenant_id,
name=s['name'])
context.session.add(security_group_db)
- if s.get('name') == 'default':
- for ethertype in ext_sg.sg_supported_ethertypes:
+ for ethertype in ext_sg.sg_supported_ethertypes:
+ if s.get('name') == 'default':
# Allow intercommunication
- db = SecurityGroupRule(
+ ingress_rule = SecurityGroupRule(
id=uuidutils.generate_uuid(), tenant_id=tenant_id,
security_group=security_group_db,
direction='ingress',
ethertype=ethertype,
source_group=security_group_db)
- context.session.add(db)
+ context.session.add(ingress_rule)
+
+ egress_rule = SecurityGroupRule(
+ id=uuidutils.generate_uuid(), tenant_id=tenant_id,
+ security_group=security_group_db,
+ direction='egress',
+ ethertype=ethertype)
+ context.session.add(egress_rule)
return self._make_security_group_dict(security_group_db)
port['security_group_rules'] = updated_rule
return ports
- def _add_default_egress_rule(self, port, ethertype, ips):
- """ Adding default egress rule which allows all egress traffic. """
- egress_rule = [r for r in port['security_group_rules']
- if (r['direction'] == 'egress' and
- r['ethertype'] == ethertype)]
- if len(egress_rule) > 0:
- return
- for ip in port['fixed_ips']:
- version = netaddr.IPAddress(ip).version
- if "IPv%s" % version == ethertype:
- default_egress_rule = {'direction': 'egress',
- 'ethertype': ethertype}
- port['security_group_rules'].append(default_egress_rule)
- return
-
def _add_ingress_dhcp_rule(self, port, ips):
dhcp_ips = ips.get(port['network_id'])
for dhcp_ip in dhcp_ips:
network_ids = self._select_network_ids(ports)
ips = self._select_dhcp_ips_for_network_ids(context, network_ids)
for port in ports.values():
- self._add_default_egress_rule(port, q_const.IPv4, ips)
- self._add_default_egress_rule(port, q_const.IPv6, ips)
self._add_ingress_ra_rule(port, ips)
self._add_ingress_dhcp_rule(port, ips)
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
+ def _delete_default_security_group_egress_rules(self, security_group_id):
+ """Deletes default egress rules given a security group ID"""
+ res = self._list(
+ 'security-group-rules',
+ query_params='security_group_id=%s' % security_group_id)
+
+ for r in res['security_group_rules']:
+ if (r['direction'] == 'egress' and not r['port_range_max'] and
+ not r['port_range_min'] and not r['protocol']
+ and not r['remote_ip_prefix']):
+ self._delete('security-group-rules', r['id'])
+
+ def _assert_sg_rule_has_kvs(self, security_group_rule, expected_kvs):
+ """Asserts that the sg rule has expected key/value pairs passed
+ in as expected_kvs dictionary
+ """
+ for k, v in expected_kvs.iteritems():
+ self.assertEquals(security_group_rule[k], v)
+
class SecurityGroupsTestCaseXML(SecurityGroupsTestCase):
fmt = 'xml'
for k, v, in keys:
self.assertEqual(security_group['security_group'][k], v)
+ # Verify that default egress rules have been created
+
+ sg_rules = security_group['security_group']['security_group_rules']
+ self.assertEquals(len(sg_rules), 2)
+
+ v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules)
+ self.assertEquals(len(v4_rules), 1)
+ v4_rule = v4_rules[0]
+ expected = {'direction': 'egress',
+ 'ethertype': 'IPv4',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v4_rule, expected)
+
+ v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules)
+ self.assertEquals(len(v6_rules), 1)
+ v6_rule = v6_rules[0]
+ expected = {'direction': 'egress',
+ 'ethertype': 'IPv6',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v6_rule, expected)
+
def test_default_security_group(self):
with self.network():
res = self.new_list_request('security-groups')
sg_rule = group['security_group']['security_group_rules']
self.assertEqual(group['security_group']['id'],
remote_group_id)
- self.assertEqual(len(sg_rule), 1)
+ self.assertEqual(len(sg_rule), 3)
+ sg_rule = filter(lambda x: x['direction'] == 'ingress',
+ sg_rule)
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 1)
+ security_group_id = groups['security_groups'][0]['id']
res = self.new_list_request('security-group-rules')
rules = self.deserialize(self.fmt, res.get_response(self.ext_api))
- self.assertEqual(len(rules['security_group_rules']), 2)
- # just generic rules to allow default egress and
- # intergroup communicartion
- for rule in rules['security_group_rules']:
- self.assertEqual(rule['port_range_max'], None)
- self.assertEqual(rule['port_range_min'], None)
- self.assertEqual(rule['protocol'], None)
+ self.assertEqual(len(rules['security_group_rules']), 4)
+
+ # Verify default rule for v4 egress
+ sg_rules = rules['security_group_rules']
+ rules = filter(
+ lambda x: (
+ x['direction'] == 'egress' and x['ethertype'] == 'IPv4'),
+ sg_rules)
+ self.assertEqual(len(rules), 1)
+ v4_egress = rules[0]
+
+ expected = {'direction': 'egress',
+ 'ethertype': 'IPv4',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v4_egress, expected)
+
+ # Verify default rule for v6 egress
+ rules = filter(
+ lambda x: (
+ x['direction'] == 'egress' and x['ethertype'] == 'IPv6'),
+ sg_rules)
+ self.assertEqual(len(rules), 1)
+ v6_egress = rules[0]
+
+ expected = {'direction': 'egress',
+ 'ethertype': 'IPv6',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v6_egress, expected)
+
+ # Verify default rule for v4 ingress
+ rules = filter(
+ lambda x: (
+ x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'),
+ sg_rules)
+ self.assertEqual(len(rules), 1)
+ v4_ingress = rules[0]
+
+ expected = {'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'remote_group_id': security_group_id,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v4_ingress, expected)
+
+ # Verify default rule for v6 ingress
+ rules = filter(
+ lambda x: (
+ x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'),
+ sg_rules)
+ self.assertEqual(len(rules), 1)
+ v6_ingress = rules[0]
+
+ expected = {'direction': 'ingress',
+ 'ethertype': 'IPv6',
+ 'remote_group_id': security_group_id,
+ 'remote_ip_prefix': None,
+ 'protocol': None,
+ 'port_range_max': None,
+ 'port_range_min': None}
+ self._assert_sg_rule_has_kvs(v6_ingress, expected)
def test_create_security_group_rule_remote_ip_prefix(self):
name = 'webservers'
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
+
+ # Delete default rules as they would fail the following
+ # assertion at the end.
+ self._delete_default_security_group_egress_rules(
+ security_group_id)
+
+ q = 'direction=egress&security_group_id=' + security_group_id
self._test_list_resources('security-group-rule',
[sgr1, sgr2, sgr3],
- query_params="direction=egress")
+ query_params=q)
def test_list_security_group_rules_with_sort(self):
with self.security_group(name='sg') as sg:
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
+
+ # Delete default rules as they would fail the following
+ # assertion at the end.
+ self._delete_default_security_group_egress_rules(
+ security_group_id)
+
+ q = 'direction=egress&security_group_id=' + security_group_id
self._test_list_with_sort('security-group-rule',
(sgr3, sgr2, sgr1),
[('port_range_max', 'desc')],
- query_params='direction=egress')
+ query_params=q)
def test_list_security_group_rules_with_pagination(self):
with self.security_group(name='sg') as sg:
port_range_min=24,
port_range_max=24)
) as (sgr1, sgr2, sgr3):
+
+ # Delete default rules as they would fail the following
+ # assertion at the end.
+ self._delete_default_security_group_egress_rules(
+ security_group_id)
+
+ q = 'direction=egress&security_group_id=' + security_group_id
self._test_list_with_pagination(
'security-group-rule', (sgr3, sgr2, sgr1),
('port_range_max', 'desc'), 2, 2,
- query_params='direction=egress')
+ query_params=q)
def test_list_security_group_rules_with_pagination_reverse(self):
with self.security_group(name='sg') as sg:
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'ingress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv4',
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
- {'ethertype': 'IPv4', 'direction': 'egress'},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv4',
'port_range_max': 22,
'security_group_id': sg1_id,
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': u'ingress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg2_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg2_id},
+ {'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32',
'protocol': u'tcp', 'ethertype': u'IPv4',
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
- {'ethertype': 'IPv4', 'direction': 'egress'},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'ingress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv6',
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
- {'ethertype': 'IPv6', 'direction': 'egress'},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv6',
'port_range_max': 22,
'security_group_id': sg1_id,
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'ingress',
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg1_id},
+ {'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg2_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg2_id},
+ {'direction': 'ingress',
'source_ip_prefix': 'fe80::3/128',
'protocol': 'tcp', 'ethertype': 'IPv6',
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
- {'ethertype': 'IPv6', 'direction': 'egress'},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)