"""Utilities and helper functions."""
import datetime
+import functools
import hashlib
import logging as std_logging
import os
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
+class cache_method_results(object):
+ """This decorator is intended for object methods only."""
+
+ def __init__(self, func):
+ self.func = func
+ functools.update_wrapper(self, func)
+ self._first_call = True
+ self._not_cached = object()
+
+ def _get_from_cache(self, target_self, *args, **kwargs):
+ func_name = "%(module)s.%(class)s.%(func_name)s" % {
+ 'module': target_self.__module__,
+ 'class': target_self.__class__.__name__,
+ 'func_name': self.func.__name__,
+ }
+ key = (func_name,) + args
+ if kwargs:
+ key += dict2tuple(kwargs)
+ try:
+ item = target_self._cache.get(key, self._not_cached)
+ except TypeError:
+ LOG.debug(_("Method %(func_name)s cannot be cached due to "
+ "unhashable parameters: args: %(args)s, kwargs: "
+ "%(kwargs)s"),
+ {'func_name': func_name,
+ 'args': args,
+ 'kwargs': kwargs})
+ return self.func(target_self, *args, **kwargs)
+
+ if item is self._not_cached:
+ item = self.func(target_self, *args, **kwargs)
+ target_self._cache.set(key, item, None)
+
+ return item
+
+ def __call__(self, target_self, *args, **kwargs):
+ if not hasattr(target_self, '_cache'):
+ raise NotImplementedError(
+ "Instance of class %(module)s.%(class)s must contain _cache "
+ "attribute" % {
+ 'module': target_self.__module__,
+ 'class': target_self.__class__.__name__})
+ if not target_self._cache:
+ if self._first_call:
+ LOG.debug(_("Instance of class %(module)s.%(class)s doesn't "
+ "contain attribute _cache therefore results "
+ "cannot be cached for %(func_name)s."),
+ {'module': target_self.__module__,
+ 'class': target_self.__class__.__name__,
+ 'func_name': self.func.__name__})
+ self._first_call = False
+ return self.func(target_self, *args, **kwargs)
+ return self._get_from_cache(target_self, *args, **kwargs)
+
+ def __get__(self, obj, objtype):
+ return functools.partial(self.__call__, obj)
+
+
def read_cached_file(filename, cache_info, reload_func=None):
"""Read from a file if it has been modified.
return res_dict
+def dict2tuple(d):
+ items = d.items()
+ items.sort()
+ return tuple(items)
+
+
def diff_list_of_dict(old_list, new_list):
new_set = set([dict2str(l) for l in new_list])
old_set = set([dict2str(l) for l in old_list])
# License for the specific language governing permissions and limitations
# under the License.
+import mock
import testtools
from neutron.common import exceptions as n_exc
added, removed = utils.diff_list_of_dict(old_list, new_list)
self.assertEqual(added, [dict(key4="value4")])
self.assertEqual(removed, [dict(key3="value3")])
+
+
+class _CachingDecorator(object):
+ def __init__(self):
+ self.func_retval = 'bar'
+ self._cache = mock.Mock()
+
+ @utils.cache_method_results
+ def func(self, *args, **kwargs):
+ return self.func_retval
+
+
+class TestCachingDecorator(base.BaseTestCase):
+ def setUp(self):
+ super(TestCachingDecorator, self).setUp()
+ self.decor = _CachingDecorator()
+ self.func_name = '%(module)s._CachingDecorator.func' % {
+ 'module': self.__module__
+ }
+ self.not_cached = self.decor.func.func.im_self._not_cached
+
+ def test_cache_miss(self):
+ expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
+ args = (1, 2)
+ kwargs = {'foo': 'bar'}
+ self.decor._cache.get.return_value = self.not_cached
+ retval = self.decor.func(*args, **kwargs)
+ self.decor._cache.set.assert_called_once_with(
+ expected_key, self.decor.func_retval, None)
+ self.assertEqual(self.decor.func_retval, retval)
+
+ def test_cache_hit(self):
+ expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
+ args = (1, 2)
+ kwargs = {'foo': 'bar'}
+ retval = self.decor.func(*args, **kwargs)
+ self.assertFalse(self.decor._cache.set.called)
+ self.assertEqual(self.decor._cache.get.return_value, retval)
+ self.decor._cache.get.assert_called_once_with(expected_key,
+ self.not_cached)
+
+ def test_get_unhashable(self):
+ expected_key = (self.func_name, [1], 2)
+ self.decor._cache.get.side_effect = TypeError
+ retval = self.decor.func([1], 2)
+ self.assertFalse(self.decor._cache.set.called)
+ self.assertEqual(self.decor.func_retval, retval)
+ self.decor._cache.get.assert_called_once_with(expected_key,
+ self.not_cached)
+
+ def test_missing_cache(self):
+ delattr(self.decor, '_cache')
+ self.assertRaises(NotImplementedError, self.decor.func, (1, 2))
+
+ def test_no_cache(self):
+ self.decor._cache = False
+ retval = self.decor.func((1, 2))
+ self.assertEqual(self.decor.func_retval, retval)
+
+
+class TestDict2Tuples(base.BaseTestCase):
+ def test_dict(self):
+ input_dict = {'foo': 'bar', 42: 'baz', 'aaa': 'zzz'}
+ expected = ((42, 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
+ output_tuple = utils.dict2tuple(input_dict)
+ self.assertEqual(expected, output_tuple)