import datetime
+import functools
import hashlib
import os
import time
def test_convert_version_to_tuple(self):
self.assertEqual(utils.convert_version_to_tuple('6.7.0'), (6, 7, 0))
+
+
+class LogTracingTestCase(test.TestCase):
+
+ def test_utils_setup_tracing(self):
+ self.mock_object(utils, 'LOG')
+
+ utils.setup_tracing(None)
+ self.assertFalse(utils.TRACE_API)
+ self.assertFalse(utils.TRACE_METHOD)
+ self.assertEqual(0, utils.LOG.warning.call_count)
+
+ utils.setup_tracing(['method'])
+ self.assertFalse(utils.TRACE_API)
+ self.assertTrue(utils.TRACE_METHOD)
+ self.assertEqual(0, utils.LOG.warning.call_count)
+
+ utils.setup_tracing(['method', 'api'])
+ self.assertTrue(utils.TRACE_API)
+ self.assertTrue(utils.TRACE_METHOD)
+ self.assertEqual(0, utils.LOG.warning.call_count)
+
+ def test_utils_setup_tracing_invalid_key(self):
+ self.mock_object(utils, 'LOG')
+
+ utils.setup_tracing(['fake'])
+
+ self.assertFalse(utils.TRACE_API)
+ self.assertFalse(utils.TRACE_METHOD)
+ self.assertEqual(1, utils.LOG.warning.call_count)
+
+ def test_utils_setup_tracing_valid_and_invalid_key(self):
+ self.mock_object(utils, 'LOG')
+
+ utils.setup_tracing(['method', 'fake'])
+
+ self.assertFalse(utils.TRACE_API)
+ self.assertTrue(utils.TRACE_METHOD)
+ self.assertEqual(1, utils.LOG.warning.call_count)
+
+ def test_trace_no_tracing(self):
+ self.mock_object(utils, 'LOG')
+
+ @utils.trace_method
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(None)
+
+ result = _trace_test_method()
+
+ self.assertEqual('OK', result)
+ self.assertEqual(0, utils.LOG.debug.call_count)
+
+ def test_utils_trace_method(self):
+ self.mock_object(utils, 'LOG')
+
+ @utils.trace_method
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method()
+ self.assertEqual('OK', result)
+ self.assertEqual(2, utils.LOG.debug.call_count)
+
+ def test_utils_trace_api(self):
+ self.mock_object(utils, 'LOG')
+
+ @utils.trace_api
+ def _trace_test_api(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['api'])
+
+ result = _trace_test_api()
+ self.assertEqual('OK', result)
+ self.assertEqual(2, utils.LOG.debug.call_count)
+
+ def test_utils_trace_method_default_logger(self):
+ mock_log = self.mock_object(utils, 'LOG')
+
+ @utils.trace_method
+ def _trace_test_method_custom_logger(*args, **kwargs):
+ return 'OK'
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method_custom_logger()
+
+ self.assertEqual('OK', result)
+ self.assertEqual(2, mock_log.debug.call_count)
+
+ def test_utils_trace_method_inner_decorator(self):
+ mock_logging = self.mock_object(utils, 'logging')
+ mock_log = mock.Mock()
+ mock_log.isEnabledFor = lambda x: True
+ mock_logging.getLogger = mock.Mock(return_value=mock_log)
+
+ def _test_decorator(f):
+ def blah(*args, **kwargs):
+ return f(*args, **kwargs)
+ return blah
+
+ @_test_decorator
+ @utils.trace_method
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method(self)
+
+ self.assertEqual('OK', result)
+ self.assertEqual(2, mock_log.debug.call_count)
+ # Ensure the correct function name was logged
+ for call in mock_log.debug.call_args_list:
+ self.assertTrue('_trace_test_method' in str(call))
+ self.assertFalse('blah' in str(call))
+
+ def test_utils_trace_method_outer_decorator(self):
+ mock_logging = self.mock_object(utils, 'logging')
+ mock_log = mock.Mock()
+ mock_log.isEnabledFor = lambda x: True
+ mock_logging.getLogger = mock.Mock(return_value=mock_log)
+
+ def _test_decorator(f):
+ def blah(*args, **kwargs):
+ return f(*args, **kwargs)
+ return blah
+
+ @utils.trace_method
+ @_test_decorator
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method(self)
+
+ self.assertEqual('OK', result)
+ self.assertEqual(2, mock_log.debug.call_count)
+ # Ensure the incorrect function name was logged
+ for call in mock_log.debug.call_args_list:
+ self.assertFalse('_trace_test_method' in str(call))
+ self.assertTrue('blah' in str(call))
+
+ def test_utils_trace_method_outer_decorator_with_functools(self):
+ mock_log = mock.Mock()
+ mock_log.isEnabledFor = lambda x: True
+ self.mock_object(utils.logging, 'getLogger', mock_log)
+ mock_log = self.mock_object(utils, 'LOG')
+
+ def _test_decorator(f):
+ @functools.wraps(f)
+ def wraps(*args, **kwargs):
+ return f(*args, **kwargs)
+ return wraps
+
+ @utils.trace_method
+ @_test_decorator
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method()
+
+ self.assertEqual('OK', result)
+ self.assertEqual(2, mock_log.debug.call_count)
+ # Ensure the incorrect function name was logged
+ for call in mock_log.debug.call_args_list:
+ self.assertTrue('_trace_test_method' in str(call))
+ self.assertFalse('wraps' in str(call))
+
+ def test_utils_trace_method_with_exception(self):
+ self.LOG = self.mock_object(utils, 'LOG')
+
+ @utils.trace_method
+ def _trace_test_method(*args, **kwargs):
+ raise exception.APITimeout('test message')
+
+ utils.setup_tracing(['method'])
+
+ self.assertRaises(exception.APITimeout, _trace_test_method)
+
+ exception_log = self.LOG.debug.call_args_list[1]
+ self.assertTrue('exception' in str(exception_log))
+ self.assertTrue('test message' in str(exception_log))
+
+ def test_utils_trace_method_with_time(self):
+ mock_logging = self.mock_object(utils, 'logging')
+ mock_log = mock.Mock()
+ mock_log.isEnabledFor = lambda x: True
+ mock_logging.getLogger = mock.Mock(return_value=mock_log)
+
+ mock_time = mock.Mock(side_effect=[3.1, 6])
+ self.mock_object(time, 'time', mock_time)
+
+ @utils.trace_method
+ def _trace_test_method(*args, **kwargs):
+ return 'OK'
+
+ utils.setup_tracing(['method'])
+
+ result = _trace_test_method(self)
+
+ self.assertEqual('OK', result)
+ return_log = mock_log.debug.call_args_list[1]
+ self.assertTrue('2900' in str(return_log))
+
+ def test_utils_trace_wrapper_class(self):
+ mock_logging = self.mock_object(utils, 'logging')
+ mock_log = mock.Mock()
+ mock_log.isEnabledFor = lambda x: True
+ mock_logging.getLogger = mock.Mock(return_value=mock_log)
+
+ utils.setup_tracing(['method'])
+
+ @six.add_metaclass(utils.TraceWrapperMetaclass)
+ class MyClass(object):
+ def trace_test_method(self):
+ return 'OK'
+
+ test_class = MyClass()
+ result = test_class.trace_test_method()
+
+ self.assertEqual('OK', result)
+ self.assertEqual(2, mock_log.debug.call_count)
"""Utilities and helper functions."""
+import abc
import contextlib
import datetime
+import functools
import hashlib
import inspect
+import logging as py_logging
import os
import pyclbr
import re
import stat
import sys
import tempfile
+import time
+import types
from xml.dom import minidom
from xml.parsers import expat
from xml import sax
import six
from cinder import exception
-from cinder.i18n import _, _LE
+from cinder.i18n import _, _LE, _LW
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
+VALID_TRACE_FLAGS = {'method', 'api'}
+TRACE_METHOD = False
+TRACE_API = False
synchronized = lockutils.synchronized_with_prefix('cinder-')
return text.decode('utf-8')
else:
return text
+
+
+def trace_method(f):
+ """Decorates a function if TRACE_METHOD is true."""
+ @functools.wraps(f)
+ def trace_method_logging_wrapper(*args, **kwargs):
+ if TRACE_METHOD:
+ return trace(f)(*args, **kwargs)
+ return f(*args, **kwargs)
+ return trace_method_logging_wrapper
+
+
+def trace_api(f):
+ """Decorates a function if TRACE_API is true."""
+ @functools.wraps(f)
+ def trace_api_logging_wrapper(*args, **kwargs):
+ if TRACE_API:
+ return trace(f)(*args, **kwargs)
+ return f(*args, **kwargs)
+ return trace_api_logging_wrapper
+
+
+def trace(f):
+ """Trace calls to the decorated function.
+
+ This decorator should always be defined as the outermost decorator so it
+ is defined last. This is important so it does not interfere
+ with other decorators.
+
+ Using this decorator on a function will cause its execution to be logged at
+ `DEBUG` level with arguments, return values, and exceptions.
+
+ :returns a function decorator
+ """
+
+ func_name = f.__name__
+
+ @functools.wraps(f)
+ def trace_logging_wrapper(*args, **kwargs):
+ if len(args) > 0:
+ maybe_self = args[0]
+ else:
+ maybe_self = kwargs.get('self', None)
+
+ if maybe_self and hasattr(maybe_self, '__module__'):
+ logger = logging.getLogger(maybe_self.__module__)
+ else:
+ logger = LOG
+
+ # NOTE(ameade): Don't bother going any further if DEBUG log level
+ # is not enabled for the logger.
+ if not logger.isEnabledFor(py_logging.DEBUG):
+ return f(*args, **kwargs)
+
+ all_args = inspect.getcallargs(f, *args, **kwargs)
+ logger.debug('==> %(func)s: call %(all_args)r',
+ {'func': func_name, 'all_args': all_args})
+
+ start_time = time.time() * 1000
+ try:
+ result = f(*args, **kwargs)
+ except Exception as exc:
+ total_time = int(round(time.time() * 1000)) - start_time
+ logger.debug('<== %(func)s: exception (%(time)dms) %(exc)r',
+ {'func': func_name,
+ 'time': total_time,
+ 'exc': exc})
+ raise
+ total_time = int(round(time.time() * 1000)) - start_time
+
+ logger.debug('<== %(func)s: return (%(time)dms) %(result)r',
+ {'func': func_name,
+ 'time': total_time,
+ 'result': result})
+ return result
+ return trace_logging_wrapper
+
+
+class TraceWrapperMetaclass(type):
+ """Metaclass that wraps all methods of a class with trace_method.
+
+ This metaclass will cause every function inside of the class to be
+ decorated with the trace_method decorator.
+
+ To use the metaclass you define a class like so:
+ @six.add_metaclass(utils.TraceWrapperMetaclass)
+ class MyClass(object):
+ """
+ def __new__(meta, classname, bases, classDict):
+ newClassDict = {}
+ for attributeName, attribute in classDict.items():
+ if isinstance(attribute, types.FunctionType):
+ # replace it with a wrapped version
+ attribute = functools.update_wrapper(trace_method(attribute),
+ attribute)
+ newClassDict[attributeName] = attribute
+
+ return type.__new__(meta, classname, bases, newClassDict)
+
+
+class TraceWrapperWithABCMetaclass(abc.ABCMeta, TraceWrapperMetaclass):
+ """Metaclass that wraps all methods of a class with trace."""
+ pass
+
+
+def setup_tracing(trace_flags):
+ """Set global variables for each trace flag.
+
+ Sets variables TRACE_METHOD and TRACE_API, which represent
+ whether to log method and api traces.
+
+ :param trace_flags: a list of strings
+ """
+ global TRACE_METHOD
+ global TRACE_API
+ try:
+ trace_flags = [flag.strip() for flag in trace_flags]
+ except TypeError: # Handle when trace_flags is None or a test mock
+ trace_flags = []
+ for invalid_flag in (set(trace_flags) - VALID_TRACE_FLAGS):
+ LOG.warning(_LW('Invalid trace flag: %s'), invalid_flag)
+ TRACE_METHOD = 'method' in trace_flags
+ TRACE_API = 'api' in trace_flags