]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Ensures policy file is reloaded only if updated.
authorSalvatore Orlando <salv.orlando@gmail.com>
Thu, 19 Jul 2012 00:59:42 +0000 (17:59 -0700)
committerSalvatore Orlando <salv.orlando@gmail.com>
Thu, 19 Jul 2012 00:59:42 +0000 (17:59 -0700)
Fixes bug 1023649

The fix is 'inspired' (ie: copied) from nova. It is cached in memory unless a
change in policy file is detected. In that case, a reload is triggered.

This patch also adds a set of unit tests for quantum/policy.py, which was
previously not covered.

Change-Id: I337042ae418b518268acd30c26ef02559887a8be

quantum/common/utils.py
quantum/policy.py
quantum/tests/unit/test_policy.py [new file with mode: 0644]

index 0d30d327935950cb3fa1770f98c492b3edf3ab55..bc2f920253b9f7a7b0d86f6d5093588cc75e79a9 100644 (file)
@@ -77,6 +77,27 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
     return result
 
 
+def read_cached_file(filename, cache_info, reload_func=None):
+    """Read from a file if it has been modified.
+
+    :param cache_info: dictionary to hold opaque cache.
+    :param reload_func: optional function to be called with data when
+                        file is reloaded due to a modification.
+
+    :returns: data from file
+
+    """
+    mtime = os.path.getmtime(filename)
+    if not cache_info or mtime != cache_info.get('mtime'):
+        logging.debug(_("Reloading cached file %s") % filename)
+        with open(filename) as fap:
+            cache_info['data'] = fap.read()
+        cache_info['mtime'] = mtime
+        if reload_func:
+            reload_func(cache_info['data'])
+    return cache_info['data']
+
+
 class LazyPluggable(object):
     """A pluggable backend loaded lazily based on some value."""
 
index 16c09853f75f4511408356f762228b19d665bd26..bf0c6da25d4dcae43b3988f66814a3abc084bfae 100644 (file)
@@ -22,28 +22,33 @@ Policy engine for quantum.  Largely copied from nova.
 import os.path
 
 from quantum.common import exceptions
-from quantum.common.utils import find_config_file
 from quantum.openstack.common import cfg
+import quantum.common.utils as utils
 from quantum.openstack.common import policy
 
 
 _POLICY_PATH = None
+_POLICY_CACHE = {}
 
 
 def reset():
     global _POLICY_PATH
     _POLICY_PATH = None
+    _POLICY_CACHE = {}
     policy.reset()
 
 
 def init():
     global _POLICY_PATH
+    global _POLICY_CACHE
     if not _POLICY_PATH:
-        _POLICY_PATH = find_config_file({}, cfg.CONF.policy_file)
+        _POLICY_PATH = utils.find_config_file({}, cfg.CONF.policy_file)
         if not _POLICY_PATH:
             raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
-    with open(_POLICY_PATH) as f:
-        _set_brain(f.read())
+    # pass _set_brain to read_cached_file so that the policy brain
+    # is reset only if the file has changed
+    utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
+                           reload_func=_set_brain)
 
 
 def _set_brain(data):
diff --git a/quantum/tests/unit/test_policy.py b/quantum/tests/unit/test_policy.py
new file mode 100644 (file)
index 0000000..cc1410b
--- /dev/null
@@ -0,0 +1,208 @@
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test of Policy Engine For Quantum"""
+
+import contextlib
+import os.path
+import shutil
+import StringIO
+import tempfile
+import unittest2 as unittest
+import urllib2
+
+import mock
+
+import quantum
+from quantum.common import exceptions
+from quantum.common import utils
+from quantum import context
+from quantum.openstack.common import policy as common_policy
+from quantum import policy
+
+
+class PolicyFileTestCase(unittest.TestCase):
+    def setUp(self):
+        super(PolicyFileTestCase, self).setUp()
+        policy.reset()
+        self.context = context.Context('fake', 'fake')
+        self.target = {}
+
+    def tearDown(self):
+        super(PolicyFileTestCase, self).tearDown()
+        policy.reset()
+
+    @contextlib.contextmanager
+    def _tempdir(self, **kwargs):
+        tmpdir = tempfile.mkdtemp(**kwargs)
+        try:
+            yield tmpdir
+        finally:
+            try:
+                shutil.rmtree(tmpdir)
+            except OSError, e:
+                #TODO: fail test on raise
+                pass
+
+    def test_modified_policy_reloads(self):
+        with self._tempdir() as tmpdir:
+            def fake_find_config_file(_1, _2):
+                return os.path.join(tmpdir, 'policy')
+
+            with mock.patch.object(quantum.common.utils,
+                                   'find_config_file',
+                                   new=fake_find_config_file):
+                tmpfilename = os.path.join(tmpdir, 'policy')
+                action = "example:test"
+                with open(tmpfilename, "w") as policyfile:
+                    policyfile.write("""{"example:test": []}""")
+                policy.enforce(self.context, action, self.target)
+                with open(tmpfilename, "w") as policyfile:
+                    policyfile.write("""{"example:test": ["false:false"]}""")
+                # NOTE(vish): reset stored policy cache so we don't have to
+                # sleep(1)
+                policy._POLICY_CACHE = {}
+                self.assertRaises(exceptions.PolicyNotAuthorized,
+                                  policy.enforce,
+                                  self.context,
+                                  action,
+                                  self.target)
+
+
+class PolicyTestCase(unittest.TestCase):
+    def setUp(self):
+        super(PolicyTestCase, self).setUp()
+        policy.reset()
+        # NOTE(vish): preload rules to circumvent reloading from file
+        policy.init()
+        rules = {
+            "true": [],
+            "example:allowed": [],
+            "example:denied": [["false:false"]],
+            "example:get_http": [["http:http://www.example.com"]],
+            "example:my_file": [["role:compute_admin"],
+                                ["tenant_id:%(tenant_id)s"]],
+            "example:early_and_fail": [["false:false", "rule:true"]],
+            "example:early_or_success": [["rule:true"], ["false:false"]],
+            "example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
+            "example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
+        }
+        # NOTE(vish): then overload underlying brain
+        common_policy.set_brain(common_policy.HttpBrain(rules))
+        self.context = context.Context('fake', 'fake', roles=['member'])
+        self.target = {}
+
+    def tearDown(self):
+        policy.reset()
+        super(PolicyTestCase, self).tearDown()
+
+    def test_enforce_nonexistent_action_throws(self):
+        action = "example:noexist"
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, self.target)
+
+    def test_enforce_bad_action_throws(self):
+        action = "example:denied"
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, self.target)
+
+    def test_enforce_good_action(self):
+        action = "example:allowed"
+        policy.enforce(self.context, action, self.target)
+
+    def test_enforce_http_true(self):
+
+        def fakeurlopen(url, post_data):
+            return StringIO.StringIO("True")
+
+        with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
+            action = "example:get_http"
+            target = {}
+            result = policy.enforce(self.context, action, target)
+            self.assertEqual(result, None)
+
+    def test_enforce_http_false(self):
+
+        def fakeurlopen(url, post_data):
+            return StringIO.StringIO("False")
+
+        with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
+            action = "example:get_http"
+            target = {}
+            self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                              self.context, action, target)
+
+    def test_templatized_enforcement(self):
+        target_mine = {'tenant_id': 'fake'}
+        target_not_mine = {'tenant_id': 'another'}
+        action = "example:my_file"
+        policy.enforce(self.context, action, target_mine)
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, target_not_mine)
+
+    def test_early_AND_enforcement(self):
+        action = "example:early_and_fail"
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, action, self.target)
+
+    def test_early_OR_enforcement(self):
+        action = "example:early_or_success"
+        policy.enforce(self.context, action, self.target)
+
+    def test_ignore_case_role_check(self):
+        lowercase_action = "example:lowercase_admin"
+        uppercase_action = "example:uppercase_admin"
+        # NOTE(dprince) we mix case in the Admin role here to ensure
+        # case is ignored
+        admin_context = context.Context('admin', 'fake', roles=['AdMiN'])
+        policy.enforce(admin_context, lowercase_action, self.target)
+        policy.enforce(admin_context, uppercase_action, self.target)
+
+
+class DefaultPolicyTestCase(unittest.TestCase):
+
+    def setUp(self):
+        super(DefaultPolicyTestCase, self).setUp()
+        policy.reset()
+        policy.init()
+
+        self.rules = {
+            "default": [],
+            "example:exist": [["false:false"]]
+        }
+
+        self._set_brain('default')
+
+        self.context = context.Context('fake', 'fake')
+
+    def _set_brain(self, default_rule):
+        brain = common_policy.HttpBrain(self.rules, default_rule)
+        common_policy.set_brain(brain)
+
+    def tearDown(self):
+        super(DefaultPolicyTestCase, self).tearDown()
+        policy.reset()
+
+    def test_policy_called(self):
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, "example:exist", {})
+
+    def test_not_found_policy_calls_default(self):
+        policy.enforce(self.context, "example:noexist", {})
+
+    def test_default_not_found(self):
+        self._set_brain("default_noexist")
+        self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
+                          self.context, "example:noexist", {})