]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Attach log listeners to other engines
authorJoshua Harlow <harlowja@yahoo-inc.com>
Sat, 21 Jun 2014 01:54:55 +0000 (18:54 -0700)
committerJoshua Harlow <harlowja@yahoo-inc.com>
Sat, 21 Jun 2014 02:09:48 +0000 (19:09 -0700)
Attach the created task/flow/engine listener to
the other usages of taskflow that exist in cinder
so that those locations can also benefit from
the same logging of state and activity.

Part of blueprint task-logging

Change-Id: I4ba7fe625a88967607adaa18d329bec56825201c

cinder/flow_utils.py
cinder/scheduler/manager.py
cinder/volume/api.py
cinder/volume/flows/common.py
cinder/volume/manager.py

index 05a89c32e1071edddf4ac150433c205e19c11dd2..f30a1f54bc424a7adf6ca5445112083ec0c3fd0e 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import logging as base_logging
+
 # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
+from taskflow.listeners import base as base_listener
+from taskflow import states
 from taskflow import task
+from taskflow.utils import misc
+
+from cinder.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
 
 
 def _make_task_name(cls, addons=None):
@@ -34,3 +43,88 @@ class CinderTask(task.Task):
         super(CinderTask, self).__init__(_make_task_name(self.__class__,
                                                          addons),
                                          **kwargs)
+
+
+class DynamicLogListener(base_listener.ListenerBase):
+    """This is used to attach to taskflow engines while they are running.
+
+    It provides a bunch of useful features that expose the actions happening
+    inside a taskflow engine, which can be useful for developers for debugging,
+    for operations folks for monitoring and tracking of the resource actions
+    and more...
+    """
+
+    def __init__(self, engine,
+                 task_listen_for=(misc.Notifier.ANY,),
+                 flow_listen_for=(misc.Notifier.ANY,),
+                 logger=None):
+        super(DynamicLogListener, self).__init__(
+            engine,
+            task_listen_for=task_listen_for,
+            flow_listen_for=flow_listen_for)
+        if logger is None:
+            self._logger = LOG
+        else:
+            self._logger = logger
+
+    def _flow_receiver(self, state, details):
+        # Gets called on flow state changes.
+        level = base_logging.DEBUG
+        if state in (states.FAILURE, states.REVERTED):
+            level = base_logging.WARNING
+        self._logger.log(level,
+                         _("Flow '%(flow_name)s' (%(flow_uuid)s) transitioned"
+                           " into state '%(state)s' from state"
+                           " '%(old_state)s'") %
+                         {'flow_name': details['flow_name'],
+                          'flow_uuid': details['flow_uuid'],
+                          'state': state,
+                          'old_state': details.get('old_state')})
+
+    def _task_receiver(self, state, details):
+        # Gets called on task state changes.
+        if 'result' in details and state in base_listener.FINISH_STATES:
+            # If the task failed, it's useful to show the exception traceback
+            # and any other available exception information.
+            result = details.get('result')
+            if isinstance(result, misc.Failure):
+                self._logger.warn(_("Task '%(task_name)s' (%(task_uuid)s)"
+                                    " transitioned into state '%(state)s'") %
+                                  {'task_name': details['task_name'],
+                                   'task_uuid': details['task_uuid'],
+                                   'state': state},
+                                  exc_info=tuple(result.exc_info))
+            else:
+                # Otherwise, depending on the enabled logging level/state we
+                # will show or hide results that the task may have produced
+                # during execution.
+                level = base_logging.DEBUG
+                if state == states.FAILURE:
+                    level = base_logging.WARNING
+                if (self._logger.isEnabledFor(base_logging.DEBUG) or
+                        state == states.FAILURE):
+                    self._logger.log(level,
+                                     _("Task '%(task_name)s' (%(task_uuid)s)"
+                                       " transitioned into state '%(state)s'"
+                                       " with result %(result)s") %
+                                     {'task_name': details['task_name'],
+                                      'task_uuid': details['task_uuid'],
+                                      'state': state, 'result': result})
+                else:
+                    self._logger.log(level,
+                                     _("Task '%(task_name)s' (%(task_uuid)s)"
+                                       " transitioned into state"
+                                       " '%(state)s'") %
+                                     {'task_name': details['task_name'],
+                                      'task_uuid': details['task_uuid'],
+                                      'state': state})
+        else:
+            level = base_logging.DEBUG
+            if state in (states.REVERTING, states.RETRYING):
+                level = base_logging.WARNING
+            self._logger.log(level,
+                             _("Task '%(task_name)s' (%(task_uuid)s)"
+                               " transitioned into state '%(state)s'") %
+                             {'task_name': details['task_name'],
+                              'task_uuid': details['task_uuid'],
+                              'state': state})
index fd63761d1cd83895d874c54275240a568633b505..663ef89777329ab6b49005aae3f97f039c860ecb 100644 (file)
@@ -25,6 +25,7 @@ from oslo import messaging
 from cinder import context
 from cinder import db
 from cinder import exception
+from cinder import flow_utils
 from cinder import manager
 from cinder.openstack.common import excutils
 from cinder.openstack.common import importutils
@@ -101,7 +102,9 @@ class SchedulerManager(manager.Manager):
             LOG.exception(_("Failed to create scheduler manager volume flow"))
             raise exception.CinderException(
                 _("Failed to create scheduler manager volume flow"))
-        flow_engine.run()
+
+        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+            flow_engine.run()
 
     def request_service_capabilities(self, context):
         volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
index 02295c5f22a7bcad7d81370dcd1abad704b47b91..1466c0913ccbbe473f7dee15bc6244b7c502bf6f 100644 (file)
@@ -28,6 +28,7 @@ from oslo.config import cfg
 from cinder import context
 from cinder.db import base
 from cinder import exception
+from cinder import flow_utils
 from cinder.image import glance
 from cinder import keymgr
 from cinder.openstack.common import excutils
@@ -40,7 +41,6 @@ from cinder import quota_utils
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import utils
 from cinder.volume.flows.api import create_volume
-from cinder.volume.flows import common as flow_common
 from cinder.volume import qos_specs
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
@@ -214,7 +214,7 @@ class API(base.Base):
         # Attaching this listener will capture all of the notifications that
         # taskflow sends out and redirect them to a more useful log for
         # cinders debugging (or error reporting) usage.
-        with flow_common.DynamicLogListener(flow_engine, logger=LOG):
+        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
             flow_engine.run()
             return flow_engine.storage.fetch('volume')
 
index ef5fd5649faae0a477143baad1f6395824ac846c..ba35d7d47e0d96c1d3023a08a2460a4defd93364 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import logging as base_logging
-
 import six
-from taskflow.listeners import base as base_listener
-from taskflow import states
-from taskflow.utils import misc
 
 from cinder import exception
 from cinder.openstack.common import log as logging
@@ -34,91 +29,6 @@ LOG = logging.getLogger(__name__)
 REASON_LENGTH = 128
 
 
-class DynamicLogListener(base_listener.ListenerBase):
-    """This is used to attach to taskflow engines while they are running.
-
-    It provides a bunch of useful features that expose the actions happening
-    inside a taskflow engine, which can be useful for developers for debugging,
-    for operations folks for monitoring and tracking of the resource actions
-    and more...
-    """
-
-    def __init__(self, engine,
-                 task_listen_for=(misc.Notifier.ANY,),
-                 flow_listen_for=(misc.Notifier.ANY,),
-                 logger=None):
-        super(DynamicLogListener, self).__init__(
-            engine,
-            task_listen_for=task_listen_for,
-            flow_listen_for=flow_listen_for)
-        if logger is None:
-            self._logger = LOG
-        else:
-            self._logger = logger
-
-    def _flow_receiver(self, state, details):
-        # Gets called on flow state changes.
-        level = base_logging.DEBUG
-        if state in (states.FAILURE, states.REVERTED):
-            level = base_logging.WARNING
-        self._logger.log(level,
-                         _("Flow '%(flow_name)s' (%(flow_uuid)s) transitioned"
-                           " into state '%(state)s' from state"
-                           " '%(old_state)s'") %
-                         {'flow_name': details['flow_name'],
-                          'flow_uuid': details['flow_uuid'],
-                          'state': state,
-                          'old_state': details.get('old_state')})
-
-    def _task_receiver(self, state, details):
-        # Gets called on task state changes.
-        if 'result' in details and state in base_listener.FINISH_STATES:
-            # If the task failed, it's useful to show the exception traceback
-            # and any other available exception information.
-            result = details.get('result')
-            if isinstance(result, misc.Failure):
-                self._logger.warn(_("Task '%(task_name)s' (%(task_uuid)s)"
-                                    " transitioned into state '%(state)s'") %
-                                  {'task_name': details['task_name'],
-                                   'task_uuid': details['task_uuid'],
-                                   'state': state},
-                                  exc_info=tuple(result.exc_info))
-            else:
-                # Otherwise, depending on the enabled logging level/state we
-                # will show or hide results that the task may have produced
-                # during execution.
-                level = base_logging.DEBUG
-                if state == states.FAILURE:
-                    level = base_logging.WARNING
-                if (self._logger.isEnabledFor(base_logging.DEBUG) or
-                        state == states.FAILURE):
-                    self._logger.log(level,
-                                     _("Task '%(task_name)s' (%(task_uuid)s)"
-                                       " transitioned into state '%(state)s'"
-                                       " with result %(result)s") %
-                                     {'task_name': details['task_name'],
-                                      'task_uuid': details['task_uuid'],
-                                      'state': state, 'result': result})
-                else:
-                    self._logger.log(level,
-                                     _("Task '%(task_name)s' (%(task_uuid)s)"
-                                       " transitioned into state"
-                                       " '%(state)s'") %
-                                     {'task_name': details['task_name'],
-                                      'task_uuid': details['task_uuid'],
-                                      'state': state})
-        else:
-            level = base_logging.DEBUG
-            if state in (states.REVERTING, states.RETRYING):
-                level = base_logging.WARNING
-            self._logger.log(level,
-                             _("Task '%(task_name)s' (%(task_uuid)s)"
-                               " transitioned into state '%(state)s'") %
-                             {'task_name': details['task_name'],
-                              'task_uuid': details['task_uuid'],
-                              'state': state})
-
-
 def make_pretty_name(method):
     """Makes a pretty name for a function/method."""
     meth_pieces = [method.__name__]
index 41b650cd61b7b695a915720dec759f8b4b1e80db..30617a376de51071690b3e5fd13860dd5af33b10 100644 (file)
@@ -44,6 +44,7 @@ from oslo import messaging
 from cinder import compute
 from cinder import context
 from cinder import exception
+from cinder import flow_utils
 from cinder.image import glance
 from cinder import manager
 from cinder.openstack.common import excutils
@@ -325,7 +326,8 @@ class VolumeManager(manager.SchedulerDependentManager):
             # flow reverts all job that was done and reraises an exception.
             # Otherwise, all data that was generated by flow becomes available
             # in flow engine's storage.
-            flow_engine.run()
+            with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+                flow_engine.run()
 
         @utils.synchronized(locked_action, external=True)
         def _run_flow_locked():
@@ -1303,7 +1305,9 @@ class VolumeManager(manager.SchedulerDependentManager):
             LOG.exception(_("Failed to create manage_existing flow."))
             raise exception.CinderException(
                 _("Failed to create manage existing flow."))
-        flow_engine.run()
+
+        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+            flow_engine.run()
 
         # Fetch created volume from storage
         volume_ref = flow_engine.storage.fetch('volume')