]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add task/flow listener support around the volume api flow
authorJoshua Harlow <harlowja@yahoo-inc.com>
Thu, 12 Jun 2014 21:35:35 +0000 (14:35 -0700)
committerJoshua Harlow <harlowja@gmail.com>
Sat, 14 Jun 2014 06:33:17 +0000 (23:33 -0700)
This will add in logging of what is occurring around the
actions taken by taskflow when taskflow executes the initial
create_volume workflow.

Since the oslo logging module will include the request_id of
the currently active request (per greenthread) this will make
it easier to understand and debug the actions occurring inside
taskflow's engine concept.

Part of blueprint task-logging

Change-Id: I57a6f85ecac37fc4f97033eb3b7a0dc8bd35a886

cinder/volume/api.py
cinder/volume/flows/common.py
requirements.txt

index 1ce4ecbb0f05c6fdf727417d42fe0faf7a1cd6d8..e5aff02c3902d951ac0f0ac9e54a66ebe844890c 100644 (file)
@@ -39,6 +39,7 @@ from cinder import quota_utils
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import utils
 from cinder.volume.flows.api import create_volume
+from cinder.volume.flows import common as flow_common
 from cinder.volume import qos_specs
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
@@ -186,9 +187,12 @@ class API(base.Base):
             raise exception.CinderException(
                 _("Failed to create api volume flow"))
 
-        flow_engine.run()
-        volume = flow_engine.storage.fetch('volume')
-        return volume
+        # Attaching this listener will capture all of the notifications that
+        # taskflow sends out and redirect them to a more useful log for
+        # cinders debugging (or error reporting) usage.
+        with flow_common.DynamicLogListener(flow_engine, logger=LOG):
+            flow_engine.run()
+            return flow_engine.storage.fetch('volume')
 
     @wrap_check_policy
     def delete(self, context, volume, force=False, unmanage_only=False):
index d3dffaef615a4be1b4516d3f361d20a01871cbac..fd3ba5abe42fa853b661ca0b34981aa7b0f64300 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import logging as base_logging
+
 import six
+from taskflow.listeners import base as base_listener
+from taskflow import states
+from taskflow.utils import misc
 
 from cinder import exception
 from cinder.openstack.common import log as logging
@@ -29,6 +34,91 @@ LOG = logging.getLogger(__name__)
 REASON_LENGTH = 128
 
 
+class DynamicLogListener(base_listener.ListenerBase):
+    """This is used to attach to taskflow engines while they are running.
+
+    It provides a bunch of useful features that expose the actions happening
+    inside a taskflow engine, which can be useful for developers for debugging,
+    for operations folks for monitoring and tracking of the resource actions
+    and more...
+    """
+
+    def __init__(self, engine,
+                 task_listen_for=(misc.Notifier.ANY,),
+                 flow_listen_for=(misc.Notifier.ANY,),
+                 logger=None):
+        super(DynamicLogListener, self).__init__(
+            engine,
+            task_listen_for=task_listen_for,
+            flow_listen_for=flow_listen_for)
+        if logger is None:
+            self._logger = LOG
+        else:
+            self._logger = logger
+
+    def _flow_receiver(self, state, details):
+        # Gets called on flow state changes.
+        level = base_logging.DEBUG
+        if state in (states.FAILURE, states.REVERTED):
+            level = base_logging.WARNING
+        self._logger.log(level,
+                         _("Flow '%(flow_name)s' (%(flow_uuid)s) transitioned"
+                           " into state '%(state)s' from state"
+                           " '%(old_state)s'") %
+                         {'flow_name': details['flow_name'],
+                          'flow_uuid': details['flow_uuid'],
+                          'state': state,
+                          'old_state': details.get('old_state')})
+
+    def _task_receiver(self, state, details):
+        # Gets called on task state changes.
+        if 'result' in details and state in base_listener.FINISH_STATES:
+            # If the task failed, it's useful to show the exception traceback
+            # and any other available exception information.
+            result = details.get('result')
+            if isinstance(result, misc.Failure):
+                self._logger.warn(_("Task '%(task_name)s' (%(task_uuid)s)"
+                                    " transitioned into state '%(state)s'") %
+                                  {'task_name': details['task_name'],
+                                   'task_uuid': details['task_uuid'],
+                                   'state': state},
+                                  exc_info=tuple(result.exc_info))
+            else:
+                # Otherwise, depending on the enabled logging level/state we
+                # will show or hide results that the task may have produced
+                # during execution.
+                level = base_logging.DEBUG
+                if state == states.FAILURE:
+                    level = base_logging.WARNING
+                if (self._logger.isEnabledFor(base_logging.DEBUG) or
+                        state == states.FAILURE):
+                    self._logger.log(level,
+                                     _("Task '%(task_name)s' (%(task_uuid)s)"
+                                       " transitioned into state '%(state)s'"
+                                       " with result %(result)s") %
+                                     {'task_name': details['task_name'],
+                                      'task_uuid': details['task_uuid'],
+                                      'state': state, 'result': result})
+                else:
+                    self._logger.log(level,
+                                     _("Task '%(task_name)s' (%(task_uuid)s)"
+                                       " transitioned into state"
+                                       " '%(state)s'") %
+                                     {'task_name': details['task_name'],
+                                      'task_uuid': details['task_uuid'],
+                                      'state': state})
+        else:
+            level = base_logging.DEBUG
+            if state in (states.REVERTING, states.RETRYING):
+                level = base_logging.WARNING
+            self._logger.log(level,
+                             _("Task '%(task_name)s' (%(task_uuid)s)"
+                               " transitioned into state '%(state)s'") %
+                             {'task_name': details['task_name'],
+                              'task_uuid': details['task_uuid'],
+                              'state': state})
+
+
 def make_pretty_name(method):
     """Makes a pretty name for a function/method."""
     meth_pieces = [method.__name__]
index f7ccf57b7c7bf318e385fbcd4a4cd9c41e9b26b1..88817a11b8afeed022fcbe0d3dd66cce059e3cf8 100644 (file)
@@ -20,7 +20,7 @@ python-novaclient>=2.17.0
 python-swiftclient>=2.0.2
 requests>=1.1
 Routes>=1.12.3
-taskflow>=0.1.3,<0.3
+taskflow>=0.3,<0.4
 rtslib-fb>=2.1.39
 six>=1.6.0
 SQLAlchemy>=0.7.8,<=0.9.99