"""Mixin class to add security group to db_plugin_base_v2."""
__native_bulk_support = True
- sg_supported_protocols = ['tcp', 'udp', 'icmp']
- sg_supported_ethertypes = ['IPv4', 'IPv6']
def create_security_group_bulk(self, context, security_group_rule):
return self._create_bulk('security_group', context,
external_id=s.get('external_id'))
context.session.add(security_group_db)
if s.get('name') == 'default':
- for ethertype in self.sg_supported_ethertypes:
+ for ethertype in ext_sg.sg_supported_ethertypes:
# Allow intercommunication
db = SecurityGroupRule(
id=uuidutils.generate_uuid(), tenant_id=tenant_id,
rule.get('external_id')):
raise ext_sg.SecurityGroupNotProxyMode()
- # Check that protocol/ethertype are valid
protocol = rule.get('protocol')
- if protocol and protocol not in self.sg_supported_protocols:
- raise ext_sg.SecurityGroupInvalidProtocolType(value=protocol)
ethertype = rule.get('ethertype')
- if ethertype and ethertype not in self.sg_supported_ethertypes:
- raise ext_sg.SecurityGroupInvalidEtherType(value=ethertype)
# Check that port_range's are valid
if (rule['port_range_min'] is None and
message = _("Security group %(name)s id %(external_id)s already exists")
-class SecurityGroupInvalidProtocolType(qexception.InvalidInput):
- message = _("Invalid protocol type %(value)s")
-
-
-class SecurityGroupInvalidEtherType(qexception.InvalidInput):
- message = _("Invalid/Unsupported ethertype %(value)s")
-
-
class SecurityGroupInvalidPortRange(qexception.InvalidInput):
message = _("For TCP/UDP protocols, port_range_min must be "
"<= port_range_max")
attr.validators['type:name_not_default'] = _validate_name_not_default
attr.validators['type:external_id_and_mode'] = _validate_external_id_and_mode
+sg_supported_protocols = [None, 'tcp', 'udp', 'icmp']
+sg_supported_ethertypes = ['IPv4', 'IPv6']
+
# Attribute Map
RESOURCE_ATTRIBUTE_MAP = {
'security_groups': {
'is_visible': True,
'validate': {'type:values': ['ingress', 'egress']}},
'protocol': {'allow_post': True, 'allow_put': False,
- 'is_visible': True, 'default': None},
+ 'is_visible': True, 'default': None,
+ 'validate': {'type:values': sg_supported_protocols}},
'port_range_min': {'allow_post': True, 'allow_put': False,
'convert_to': convert_validate_port_value,
'default': None, 'is_visible': True},
'convert_to': convert_validate_port_value,
'default': None, 'is_visible': True},
'ethertype': {'allow_post': True, 'allow_put': False,
- 'is_visible': True, 'default': 'IPv4'},
+ 'is_visible': True, 'default': 'IPv4',
+ 'validate': {'type:values': sg_supported_ethertypes}},
'source_ip_prefix': {'allow_post': True, 'allow_put': False,
'default': None, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
def _build_security_group_rule(self, security_group_id, direction,
protocol, port_range_min, port_range_max,
source_ip_prefix=None, source_group_id=None,
- external_id=None, tenant_id='test_tenant'):
+ external_id=None, tenant_id='test_tenant',
+ ethertype='IPv4'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': protocol,
'port_range_min': port_range_min,
'port_range_max': port_range_max,
- 'tenant_id': tenant_id}}
+ 'tenant_id': tenant_id,
+ 'ethertype': ethertype}}
if external_id:
data['security_group_rule']['external_id'] = external_id
direction='ingress', protocol='tcp',
port_range_min='22', port_range_max='22',
source_ip_prefix=None, source_group_id=None,
- external_id=None, fmt='json', no_delete=False):
+ external_id=None, fmt='json', no_delete=False,
+ ethertype='IPv4'):
rule = self._build_security_group_rule(security_group_id,
direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id,
- external_id)
+ external_id,
+ ethertype=ethertype)
security_group_rule = self._make_security_group_rule('json', rule)
try:
yield security_group_rule
res = self._create_security_group_rule('json', rules)
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)
+
+ def test_create_security_group_rule_with_invalid_ethertype(self):
+ security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
+ direction = "ingress"
+ source_ip_prefix = "10.0.0.0/24"
+ protocol = 'tcp'
+ port_range_min = 22
+ port_range_max = 22
+ source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
+ rule = self._build_security_group_rule(security_group_id, direction,
+ protocol, port_range_min,
+ port_range_max,
+ source_ip_prefix,
+ source_group_id,
+ ethertype='IPv5')
+ res = self._create_security_group_rule('json', rule)
+ self.deserialize('json', res)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_security_group_rule_with_invalid_protocol(self):
+ security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
+ direction = "ingress"
+ source_ip_prefix = "10.0.0.0/24"
+ protocol = 'tcp/ip'
+ port_range_min = 22
+ port_range_max = 22
+ source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
+ rule = self._build_security_group_rule(security_group_id, direction,
+ protocol, port_range_min,
+ port_range_max,
+ source_ip_prefix,
+ source_group_id)
+ res = self._create_security_group_rule('json', rule)
+ self.deserialize('json', res)
+ self.assertEquals(res.status_int, 400)