]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Disallow regular user to update firewall's shared attribute
authorEugene Nikanorov <enikanorov@mirantis.com>
Tue, 27 May 2014 22:08:17 +0000 (02:08 +0400)
committerEugene Nikanorov <enikanorov@mirantis.com>
Wed, 28 May 2014 13:37:17 +0000 (17:37 +0400)
Shared firewalls should only be operable by  admins.
Currently only admin can provide shared attribute at firewall creation,
so update_firewall should be consistent with that as well.

Change-Id: I093743514637824207b375d724404d51f778d012
Closes-Bug: #1323322

etc/policy.json
neutron/tests/unit/services/firewall/test_fwaas_plugin.py
neutron/tests/unit/test_db_plugin.py

index 922657b2d087313b689d1cd7146910857887c245..369e0a80d2d6e7be2a79a5224c2f6adea4e7a1bc 100644 (file)
@@ -70,6 +70,7 @@
     "create_firewall:shared": "rule:admin_only",
     "get_firewall:shared": "rule:admin_only",
     "update_firewall": "rule:admin_or_owner",
+    "update_firewall:shared": "rule:admin_only",
     "delete_firewall": "rule:admin_or_owner",
 
     "create_firewall_policy": "",
index 9fc0aa47ffdcd5d3004bf74a71feed36869dca9e..40c91de9698511d8560de3421a541bb3c56baddc 100644 (file)
@@ -248,6 +248,25 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
                 res = req.get_response(self.ext_api)
                 self.assertEqual(res.status_int, exc.HTTPConflict.code)
 
+    def test_update_firewall_shared_fails_for_non_admin(self):
+        ctx = context.get_admin_context()
+        with self.firewall_policy() as fwp:
+            fwp_id = fwp['firewall_policy']['id']
+            with self.firewall(firewall_policy_id=fwp_id,
+                               admin_state_up=
+                               test_db_firewall.ADMIN_STATE_UP,
+                               tenant_id='noadmin') as firewall:
+                fw_id = firewall['firewall']['id']
+                self.callbacks.set_firewall_status(ctx, fw_id,
+                                                   const.ACTIVE)
+                data = {'firewall': {'shared': True}}
+                req = self.new_update_request(
+                    'firewalls', data, fw_id,
+                    context=context.Context('', 'noadmin'))
+                res = req.get_response(self.ext_api)
+                # returns 404 due to security reasons
+                self.assertEqual(res.status_int, exc.HTTPNotFound.code)
+
     def test_update_firewall_policy_fails_when_firewall_pending(self):
         name = "new_firewall1"
         attrs = self._get_test_firewall_attrs(name)
index 1b7059bc9c561a78a8ee53d281e6ae4cc4d678bd..d1ff9a90ce1d904593a737ee928163c774eb9daa 100644 (file)
@@ -158,7 +158,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
         super(NeutronDbPluginV2TestCase, self).tearDown()
 
     def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
-             action=None, subresource=None, sub_id=None):
+             action=None, subresource=None, sub_id=None, context=None):
         fmt = fmt or self.fmt
 
         path = '/%s.%s' % (
@@ -176,7 +176,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
         if data is not None:  # empty dict is valid
             body = self.serialize(data)
         return testlib_api.create_request(path, body, content_type, method,
-                                          query_string=params)
+                                          query_string=params, context=context)
 
     def new_create_request(self, resource, data, fmt=None, id=None,
                            subresource=None):
@@ -211,9 +211,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
         )
 
     def new_update_request(self, resource, data, id, fmt=None,
-                           subresource=None):
+                           subresource=None, context=None):
         return self._req(
-            'PUT', resource, data, fmt, id=id, subresource=subresource
+            'PUT', resource, data, fmt, id=id, subresource=subresource,
+            context=context
         )
 
     def new_action_request(self, resource, data, id, action, fmt=None,