]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Don't allow user to set firewall rule with port and no protocol
authorBertrand Lallau <bertrand.lallau@thalesgroup.com>
Tue, 9 Sep 2014 12:56:59 +0000 (14:56 +0200)
committerBertrand Lallau <bertrand.lallau@thalesgroup.com>
Wed, 10 Sep 2014 09:47:54 +0000 (11:47 +0200)
Creating firewall rules specifying a destination port and/or a source
port without a protocol, generates rules without src or dest port
restriction. This was a real security issue for cloud users.

This patch generates a 400 Bad request "Source/destination port
requires a protocol" in case of creation/update of firewall rules
specifying a destination port and/or a source port and without protocol.

DocImpact
Closes-Bug: #1365961

Change-Id: I4a3a1d9ae7ec4b2a864b3edc83d65ef7f80cbba5

neutron/db/firewall/firewall_db.py
neutron/extensions/firewall.py
neutron/tests/unit/db/firewall/test_db_firewall.py

index 275e1d4f92df936932d4205bb9c82919f7b27d31..40ccf5d3d6fa736c6b119945d78a70ba92640cd2 100644 (file)
@@ -355,6 +355,9 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
         fwr = firewall_rule['firewall_rule']
         self._validate_fwr_protocol_parameters(fwr)
         tenant_id = self._get_tenant_id_for_create(context, fwr)
+        if not fwr['protocol'] and (fwr['source_port'] or
+                fwr['destination_port']):
+            raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
         src_port_min, src_port_max = self._get_min_max_ports_from_range(
             fwr['source_port'])
         dst_port_min, dst_port_max = self._get_min_max_ports_from_range(
@@ -396,6 +399,14 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
             del fwr['destination_port']
         with context.session.begin(subtransactions=True):
             fwr_db = self._get_firewall_rule(context, id)
+            protocol = fwr.get('protocol', fwr_db['protocol'])
+            if not protocol:
+                sport = fwr.get('source_port_range_min',
+                                fwr_db['source_port_range_min'])
+                dport = fwr.get('destination_port_range_min',
+                                fwr_db['destination_port_range_min'])
+                if sport or dport:
+                    raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
             fwr_db.update(fwr)
             if fwr_db.firewall_policy_id:
                 fwp_db = self._get_firewall_policy(context,
index ff0fd39fb1fbc9e3e034903a04bba9b3e6114846..fb66aaa4a4f67580cbaeb77445669d104a82e8ba 100644 (file)
@@ -83,6 +83,10 @@ class FirewallRuleInvalidICMPParameter(qexception.InvalidInput):
                 "is set to ICMP.")
 
 
+class FirewallRuleWithPortWithoutProtocolInvalid(qexception.InvalidInput):
+    message = _("Source/destination port requires a protocol")
+
+
 class FirewallInvalidPortValue(qexception.InvalidInput):
     message = _("Invalid value for port %(port)s.")
 
index 5e27f5fc46c938e0815580b6c70db8250b04e214..f070caf1a2d56f1d4a5113febf06b495a65487a2 100644 (file)
@@ -614,6 +614,20 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
             for k, v in attrs.iteritems():
                 self.assertEqual(firewall_rule['firewall_rule'][k], v)
 
+    def test_create_firewall_rule_without_protocol_with_dport(self):
+        attrs = self._get_test_firewall_rule_attrs()
+        attrs['protocol'] = None
+        attrs['source_port'] = None
+        res = self._create_firewall_rule(self.fmt, **attrs)
+        self.assertEqual(400, res.status_int)
+
+    def test_create_firewall_rule_without_protocol_with_sport(self):
+        attrs = self._get_test_firewall_rule_attrs()
+        attrs['protocol'] = None
+        attrs['destination_port'] = None
+        res = self._create_firewall_rule(self.fmt, **attrs)
+        self.assertEqual(400, res.status_int)
+
     def test_show_firewall_rule_with_fw_policy_not_associated(self):
         attrs = self._get_test_firewall_rule_attrs()
         with self.firewall_rule() as fw_rule:
@@ -709,6 +723,46 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
             for k, v in attrs.iteritems():
                 self.assertEqual(res['firewall_rule'][k], v)
 
+    def test_update_firewall_rule_with_port_and_no_proto(self):
+        with self.firewall_rule() as fwr:
+            data = {'firewall_rule': {'protocol': None,
+                                      'destination_port': 80}}
+            req = self.new_update_request('firewall_rules', data,
+                                          fwr['firewall_rule']['id'])
+            res = req.get_response(self.ext_api)
+            self.assertEqual(400, res.status_int)
+
+    def test_update_firewall_rule_without_ports_and_no_proto(self):
+        with self.firewall_rule() as fwr:
+            data = {'firewall_rule': {'protocol': None,
+                                      'destination_port': None,
+                                      'source_port': None}}
+            req = self.new_update_request('firewall_rules', data,
+                                          fwr['firewall_rule']['id'])
+            res = req.get_response(self.ext_api)
+            self.assertEqual(200, res.status_int)
+
+    def test_update_firewall_rule_with_port(self):
+        with self.firewall_rule(source_port=None,
+                                destination_port=None,
+                                protocol=None) as fwr:
+            data = {'firewall_rule': {'destination_port': 80}}
+            req = self.new_update_request('firewall_rules', data,
+                                          fwr['firewall_rule']['id'])
+            res = req.get_response(self.ext_api)
+            self.assertEqual(400, res.status_int)
+
+    def test_update_firewall_rule_with_port_and_protocol(self):
+        with self.firewall_rule(source_port=None,
+                                destination_port=None,
+                                protocol=None) as fwr:
+            data = {'firewall_rule': {'destination_port': 80,
+                                      'protocol': 'tcp'}}
+            req = self.new_update_request('firewall_rules', data,
+                                          fwr['firewall_rule']['id'])
+            res = req.get_response(self.ext_api)
+            self.assertEqual(200, res.status_int)
+
     def test_update_firewall_rule_with_policy_associated(self):
         name = "new_firewall_rule1"
         attrs = self._get_test_firewall_rule_attrs(name)