return super(NvpPluginV2, self).delete_security_group(
context, security_group_id)
+ def _validate_security_group_rules(self, context, rules):
+ for rule in rules['security_group_rules']:
+ r = rule.get('security_group_rule')
+ port_based_proto = (self._get_ip_proto_number(r['protocol'])
+ in securitygroups_db.IP_PROTOCOL_MAP.values())
+ if (not port_based_proto and
+ (r['port_range_min'] is not None or
+ r['port_range_max'] is not None)):
+ msg = (_("Port values not valid for "
+ "protocol: %s") % r['protocol'])
+ raise q_exc.BadRequest(resource='security_group_rule',
+ msg=msg)
+ return super(NvpPluginV2, self)._validate_security_group_rules(context,
+ rules)
+
def create_security_group_rule(self, context, security_group_rule):
"""Create a single security group rule."""
bulk_rule = {'security_group_rules': [security_group_rule]}
# Assert Neutron name is not truncated
self.assertEqual(sg['security_group']['name'], name)
+ def test_create_security_group_rule_bad_input(self):
+ name = 'foo security group'
+ description = 'foo description'
+ with self.security_group(name, description) as sg:
+ security_group_id = sg['security_group']['id']
+ protocol = 200
+ min_range = 32
+ max_range = 4343
+ rule = self._build_security_group_rule(
+ security_group_id, 'ingress', protocol,
+ min_range, max_range)
+ res = self._create_security_group_rule(self.fmt, rule)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, 400)
+
class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
NiciraPluginV2TestCase):
context.Context('', kwargs['tenant_id']))
return security_group_req.get_response(self.ext_api)
- def _build_security_group_rule(self, security_group_id, direction,
- protocol, port_range_min, port_range_max,
+ def _build_security_group_rule(self, security_group_id, direction, proto,
+ port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
tenant_id='test_tenant',
ethertype='IPv4'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
- 'protocol': protocol,
+ 'protocol': proto,
'ethertype': ethertype,
- 'port_range_min': port_range_min,
- 'port_range_max': port_range_max,
'tenant_id': tenant_id,
'ethertype': ethertype}}
+ if port_range_min:
+ data['security_group_rule']['port_range_min'] = port_range_min
+
+ if port_range_max:
+ data['security_group_rule']['port_range_max'] = port_range_max
+
if remote_ip_prefix:
data['security_group_rule']['remote_ip_prefix'] = remote_ip_prefix
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
+ def test_create_security_group_rule_tcp_protocol_as_number(self):
+ name = 'webservers'
+ description = 'my webservers'
+ with self.security_group(name, description) as sg:
+ security_group_id = sg['security_group']['id']
+ protocol = 6 # TCP
+ rule = self._build_security_group_rule(
+ security_group_id, 'ingress', protocol, '22', '22')
+ res = self._create_security_group_rule(self.fmt, rule)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, 201)
+
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
description = 'my webservers'
security_group_id = sg['security_group']['id']
protocol = 2
rule = self._build_security_group_rule(
- security_group_id, 'ingress', protocol, '22', '22',
- None, None)
+ security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)