message = _("Default security group already exists.")
-class SecurityGroupRuleInvalidProtocol(qexception.InUse):
- message = _("Security group rule protocol %(protocol)s not supported "
- "only protocol values %(values)s supported.")
+class SecurityGroupRuleInvalidProtocol(qexception.InvalidInput):
+ message = _("Security group rule protocol %(protocol)s not supported. "
+ "Only protocol values %(values)s supported.")
class SecurityGroupRulesNotSingleTenant(qexception.InvalidInput):
message = _("external_id wrong type %(data)s")
+def convert_protocol_to_case_insensitive(value):
+ if value is None:
+ return value
+ try:
+ return value.lower()
+ except AttributeError:
+ raise SecurityGroupRuleInvalidProtocol(
+ protocol=value, values=sg_supported_protocols)
+
+
+def convert_ethertype_to_case_insensitive(value):
+ if isinstance(value, basestring):
+ for ethertype in sg_supported_ethertypes:
+ if ethertype.lower() == value.lower():
+ return ethertype
+
+
def convert_validate_port_value(port):
if port is None:
return port
'validate': {'type:values': ['ingress', 'egress']}},
'protocol': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
+ 'convert_to': convert_protocol_to_case_insensitive,
'validate': {'type:values': sg_supported_protocols}},
'port_range_min': {'allow_post': True, 'allow_put': False,
'convert_to': convert_validate_port_value,
'default': None, 'is_visible': True},
'ethertype': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': 'IPv4',
+ 'convert_to': convert_ethertype_to_case_insensitive,
'validate': {'type:values': sg_supported_ethertypes}},
'source_ip_prefix': {'allow_post': True, 'allow_put': False,
'default': None, 'is_visible': True},
else:
self.assertEquals(len(group['security_group_rules']), 0)
+ def test_create_security_group_rule_ethertype_invalid_as_number(self):
+ name = 'webservers'
+ description = 'my webservers'
+ with self.security_group(name, description) as sg:
+ security_group_id = sg['security_group']['id']
+ ethertype = 2
+ rule = self._build_security_group_rule(
+ security_group_id, 'ingress', 'tcp', '22', '22', None, None,
+ ethertype=ethertype)
+ res = self._create_security_group_rule('json', rule)
+ self.deserialize('json', res)
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_security_group_rule_protocol_invalid_as_number(self):
+ name = 'webservers'
+ description = 'my webservers'
+ with self.security_group(name, description) as sg:
+ security_group_id = sg['security_group']['id']
+ protocol = 2
+ rule = self._build_security_group_rule(
+ security_group_id, 'ingress', protocol, '22', '22',
+ None, None)
+ res = self._create_security_group_rule('json', rule)
+ self.deserialize('json', res)
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_security_group_rule_case_insensitive(self):
+ name = 'webservers'
+ description = 'my webservers'
+ with self.security_group(name, description) as sg:
+ security_group_id = sg['security_group']['id']
+ direction = "ingress"
+ source_ip_prefix = "10.0.0.0/24"
+ protocol = 'TCP'
+ port_range_min = 22
+ port_range_max = 22
+ ethertype = 'ipV4'
+ with self.security_group_rule(security_group_id, direction,
+ protocol, port_range_min,
+ port_range_max,
+ source_ip_prefix,
+ ethertype=ethertype) as rule:
+
+ # the lower case value will be return
+ self.assertEquals(rule['security_group_rule']['protocol'],
+ protocol.lower())
+ self.assertEquals(rule['security_group_rule']['ethertype'],
+ 'IPv4')
+
def test_get_security_group(self):
name = 'webservers'
description = 'my webservers'