If a firewall rule was created with setting protocol as icmp
and using source/destination port no error was thrown, even though
source/destination port parameters are not used by icmp.
This patch adds a validation method that checks passed parameters
and throws an exception if they are incompatible.
Change-Id: I90a765856896395fcb6e9ddbd888b7bd80480674
Closes-Bug:
1327955
else:
return '%d:%d' % (min_port, max_port)
+ def _validate_fwr_protocol_parameters(self, fwr):
+ protocol = fwr['protocol']
+ if protocol not in (const.TCP, const.UDP):
+ if fwr['source_port'] or fwr['destination_port']:
+ raise firewall.FirewallRuleInvalidICMPParameter(
+ param="Source, destination port")
+
def create_firewall(self, context, firewall):
LOG.debug(_("create_firewall() called"))
fw = firewall['firewall']
def create_firewall_rule(self, context, firewall_rule):
LOG.debug(_("create_firewall_rule() called"))
fwr = firewall_rule['firewall_rule']
+ self._validate_fwr_protocol_parameters(fwr)
tenant_id = self._get_tenant_id_for_create(context, fwr)
src_port_min, src_port_max = self._get_min_max_ports_from_range(
fwr['source_port'])
"Only action values %(values)s are supported.")
+class FirewallRuleInvalidICMPParameter(qexception.InvalidInput):
+ message = _("%(param)s are not allowed when protocol "
+ "is set to ICMP.")
+
+
class FirewallInvalidPortValue(qexception.InvalidInput):
message = _("Invalid value for port %(port)s.")
for k, v in attrs.iteritems():
self.assertEqual(firewall_rule['firewall_rule'][k], v)
+ def test_create_firewall_rule_icmp_with_port(self):
+ attrs = self._get_test_firewall_rule_attrs()
+ attrs['protocol'] = 'icmp'
+ res = self._create_firewall_rule(self.fmt, **attrs)
+ self.assertEqual(400, res.status_int)
+
+ def test_create_firewall_rule_icmp_without_port(self):
+ attrs = self._get_test_firewall_rule_attrs()
+
+ attrs['protocol'] = 'icmp'
+ attrs['source_port'] = None
+ attrs['destination_port'] = None
+ with self.firewall_rule(source_port=None,
+ destination_port=None,
+ protocol='icmp') as firewall_rule:
+ for k, v in attrs.iteritems():
+ self.assertEqual(firewall_rule['firewall_rule'][k], v)
+
def test_show_firewall_rule_with_fw_policy_not_associated(self):
attrs = self._get_test_firewall_rule_attrs()
with self.firewall_rule() as fw_rule: