]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Use a task subclass instead of a functor + task wrapper
authorJoshua Harlow <harlowja@yahoo-inc.com>
Wed, 25 Jun 2014 18:24:02 +0000 (11:24 -0700)
committerJoshua Harlow <harlowja@yahoo-inc.com>
Wed, 25 Jun 2014 18:41:14 +0000 (11:41 -0700)
Instead of using a functor that activates the scheduler driver
instead incorporate and use a task subclass, this avoids the less
meaningful information that is presented when a functor is used
as well as allows for a more straightforward path for future
subclassing and derivations of work.

Change-Id: Ice66e109968c28f36156edf3dd47a89f94494bb8

cinder/scheduler/flows/create_volume.py

index 4f981981ea2949e683f578604225e525254e9ccf..e6da1c5deb8cba906c96edef9e3b6a6ed5d9fe27 100644 (file)
@@ -12,7 +12,6 @@
 
 import taskflow.engines
 from taskflow.patterns import linear_flow
-from taskflow import task
 
 from cinder import exception
 from cinder import flow_utils
@@ -35,10 +34,10 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
 
     default_provides = set(['request_spec'])
 
-    def __init__(self, db, **kwargs):
+    def __init__(self, db_api, **kwargs):
         super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
                                                        **kwargs)
-        self.db = db
+        self.db_api = db_api
 
     def _populate_request_spec(self, context, volume_id, snapshot_id,
                                image_id):
@@ -52,9 +51,9 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
         if not volume_id:
             msg = _("No volume_id provided to populate a request_spec from")
             raise exception.InvalidInput(reason=msg)
-        volume_ref = self.db.volume_get(context, volume_id)
+        volume_ref = self.db_api.volume_get(context, volume_id)
         volume_type_id = volume_ref.get('volume_type_id')
-        vol_type = self.db.volume_type_get(context, volume_type_id)
+        vol_type = self.db_api.volume_type_get(context, volume_type_id)
         return {
             'volume_id': volume_id,
             'snapshot_id': snapshot_id,
@@ -78,7 +77,76 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
         }
 
 
-def get_flow(context, db, driver, request_spec=None,
+class ScheduleCreateVolumeTask(flow_utils.CinderTask):
+    """Activates a scheduler driver and handles any subsequent failures.
+
+    Notification strategy: on failure the scheduler rpc notifier will be
+    activated and a notification will be emitted indicating what errored,
+    the reason, and the request (and misc. other data) that caused the error
+    to be triggered.
+
+    Reversion strategy: N/A
+    """
+    FAILURE_TOPIC = "scheduler.create_volume"
+
+    def __init__(self, db_api, driver_api, **kwargs):
+        super(ScheduleCreateVolumeTask, self).__init__(addons=[ACTION],
+                                                       **kwargs)
+        self.db_api = db_api
+        self.driver_api = driver_api
+
+    def _handle_failure(self, context, request_spec, cause):
+        try:
+            self._notify_failure(context, request_spec, cause)
+        finally:
+            LOG.error(_("Failed to run task %(name)s: %(cause)s") %
+                      {'cause': cause, 'name': self.name})
+
+    def _notify_failure(self, context, request_spec, cause):
+        """When scheduling fails send out a event that it failed."""
+        payload = {
+            'request_spec': request_spec,
+            'volume_properties': request_spec.get('volume_properties', {}),
+            'volume_id': request_spec['volume_id'],
+            'state': 'error',
+            'method': 'create_volume',
+            'reason': cause,
+        }
+        try:
+            rpc.get_notifier('scheduler').error(context, self.FAILURE_TOPIC,
+                                                payload)
+        except exception.CinderException:
+            LOG.exception(_("Failed notifying on %(topic)s "
+                            "payload %(payload)s") %
+                          {'topic': self.FAILURE_TOPIC, 'payload': payload})
+
+    def execute(self, context, request_spec, filter_properties):
+        try:
+            self.driver_api.schedule_create_volume(context, request_spec,
+                                                   filter_properties)
+        except exception.NoValidHost as e:
+            # No host found happened, notify on the scheduler queue and log
+            # that this happened and set the volume to errored out and
+            # *do not* reraise the error (since whats the point).
+            try:
+                self._handle_failure(context, request_spec, e)
+            finally:
+                common.error_out_volume(context, self.db_api,
+                                        request_spec['volume_id'], reason=e)
+        except Exception as e:
+            # Some other error happened, notify on the scheduler queue and log
+            # that this happened and set the volume to errored out and
+            # *do* reraise the error.
+            with excutils.save_and_reraise_exception():
+                try:
+                    self._handle_failure(context, request_spec, e)
+                finally:
+                    common.error_out_volume(context, self.db_api,
+                                            request_spec['volume_id'],
+                                            reason=e)
+
+
+def get_flow(context, db_api, driver_api, request_spec=None,
              filter_properties=None,
              volume_id=None, snapshot_id=None, image_id=None):
 
@@ -107,53 +175,12 @@ def get_flow(context, db, driver, request_spec=None,
 
     # This will extract and clean the spec from the starting values.
     scheduler_flow.add(ExtractSchedulerSpecTask(
-        db,
+        db_api,
         rebind={'request_spec': 'raw_request_spec'}))
 
-    def schedule_create_volume(context, request_spec, filter_properties):
-
-        def _log_failure(cause):
-            LOG.error(_("Failed to schedule_create_volume: %(cause)s") %
-                      {'cause': cause})
-
-        def _notify_failure(cause):
-            """When scheduling fails send out a event that it failed."""
-            topic = "scheduler.create_volume"
-            payload = {
-                'request_spec': request_spec,
-                'volume_properties': request_spec.get('volume_properties', {}),
-                'volume_id': volume_id,
-                'state': 'error',
-                'method': 'create_volume',
-                'reason': cause,
-            }
-            try:
-                rpc.get_notifier('scheduler').error(context, topic, payload)
-            except exception.CinderException:
-                LOG.exception(_("Failed notifying on %(topic)s "
-                                "payload %(payload)s") % {'topic': topic,
-                                                          'payload': payload})
-
-        try:
-            driver.schedule_create_volume(context, request_spec,
-                                          filter_properties)
-        except exception.NoValidHost as e:
-            # Not host found happened, notify on the scheduler queue and log
-            # that this happened and set the volume to errored out and
-            # *do not* reraise the error (since whats the point).
-            _notify_failure(e)
-            _log_failure(e)
-            common.error_out_volume(context, db, volume_id, reason=e)
-        except Exception as e:
-            # Some other error happened, notify on the scheduler queue and log
-            # that this happened and set the volume to errored out and
-            # *do* reraise the error.
-            with excutils.save_and_reraise_exception():
-                _notify_failure(e)
-                _log_failure(e)
-                common.error_out_volume(context, db, volume_id, reason=e)
-
-    scheduler_flow.add(task.FunctorTask(schedule_create_volume))
+    # This will activate the desired scheduler driver (and handle any
+    # driver related failures appropriately).
+    scheduler_flow.add(ScheduleCreateVolumeTask(db_api, driver_api))
 
     # Now load (but do not run) the flow using the provided initial data.
     return taskflow.engines.load(scheduler_flow, store=create_what)