From 0144e71b5cd1bb7c7e4ab82652e94dfe0dbfad40 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 25 Jun 2014 11:24:02 -0700 Subject: [PATCH] Use a task subclass instead of a functor + task wrapper Instead of using a functor that activates the scheduler driver instead incorporate and use a task subclass, this avoids the less meaningful information that is presented when a functor is used as well as allows for a more straightforward path for future subclassing and derivations of work. Change-Id: Ice66e109968c28f36156edf3dd47a89f94494bb8 --- cinder/scheduler/flows/create_volume.py | 129 ++++++++++++++---------- 1 file changed, 78 insertions(+), 51 deletions(-) diff --git a/cinder/scheduler/flows/create_volume.py b/cinder/scheduler/flows/create_volume.py index 4f981981e..e6da1c5de 100644 --- a/cinder/scheduler/flows/create_volume.py +++ b/cinder/scheduler/flows/create_volume.py @@ -12,7 +12,6 @@ import taskflow.engines from taskflow.patterns import linear_flow -from taskflow import task from cinder import exception from cinder import flow_utils @@ -35,10 +34,10 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask): default_provides = set(['request_spec']) - def __init__(self, db, **kwargs): + def __init__(self, db_api, **kwargs): super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION], **kwargs) - self.db = db + self.db_api = db_api def _populate_request_spec(self, context, volume_id, snapshot_id, image_id): @@ -52,9 +51,9 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask): if not volume_id: msg = _("No volume_id provided to populate a request_spec from") raise exception.InvalidInput(reason=msg) - volume_ref = self.db.volume_get(context, volume_id) + volume_ref = self.db_api.volume_get(context, volume_id) volume_type_id = volume_ref.get('volume_type_id') - vol_type = self.db.volume_type_get(context, volume_type_id) + vol_type = self.db_api.volume_type_get(context, volume_type_id) return { 'volume_id': volume_id, 'snapshot_id': snapshot_id, @@ -78,7 +77,76 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask): } -def get_flow(context, db, driver, request_spec=None, +class ScheduleCreateVolumeTask(flow_utils.CinderTask): + """Activates a scheduler driver and handles any subsequent failures. + + Notification strategy: on failure the scheduler rpc notifier will be + activated and a notification will be emitted indicating what errored, + the reason, and the request (and misc. other data) that caused the error + to be triggered. + + Reversion strategy: N/A + """ + FAILURE_TOPIC = "scheduler.create_volume" + + def __init__(self, db_api, driver_api, **kwargs): + super(ScheduleCreateVolumeTask, self).__init__(addons=[ACTION], + **kwargs) + self.db_api = db_api + self.driver_api = driver_api + + def _handle_failure(self, context, request_spec, cause): + try: + self._notify_failure(context, request_spec, cause) + finally: + LOG.error(_("Failed to run task %(name)s: %(cause)s") % + {'cause': cause, 'name': self.name}) + + def _notify_failure(self, context, request_spec, cause): + """When scheduling fails send out a event that it failed.""" + payload = { + 'request_spec': request_spec, + 'volume_properties': request_spec.get('volume_properties', {}), + 'volume_id': request_spec['volume_id'], + 'state': 'error', + 'method': 'create_volume', + 'reason': cause, + } + try: + rpc.get_notifier('scheduler').error(context, self.FAILURE_TOPIC, + payload) + except exception.CinderException: + LOG.exception(_("Failed notifying on %(topic)s " + "payload %(payload)s") % + {'topic': self.FAILURE_TOPIC, 'payload': payload}) + + def execute(self, context, request_spec, filter_properties): + try: + self.driver_api.schedule_create_volume(context, request_spec, + filter_properties) + except exception.NoValidHost as e: + # No host found happened, notify on the scheduler queue and log + # that this happened and set the volume to errored out and + # *do not* reraise the error (since whats the point). + try: + self._handle_failure(context, request_spec, e) + finally: + common.error_out_volume(context, self.db_api, + request_spec['volume_id'], reason=e) + except Exception as e: + # Some other error happened, notify on the scheduler queue and log + # that this happened and set the volume to errored out and + # *do* reraise the error. + with excutils.save_and_reraise_exception(): + try: + self._handle_failure(context, request_spec, e) + finally: + common.error_out_volume(context, self.db_api, + request_spec['volume_id'], + reason=e) + + +def get_flow(context, db_api, driver_api, request_spec=None, filter_properties=None, volume_id=None, snapshot_id=None, image_id=None): @@ -107,53 +175,12 @@ def get_flow(context, db, driver, request_spec=None, # This will extract and clean the spec from the starting values. scheduler_flow.add(ExtractSchedulerSpecTask( - db, + db_api, rebind={'request_spec': 'raw_request_spec'})) - def schedule_create_volume(context, request_spec, filter_properties): - - def _log_failure(cause): - LOG.error(_("Failed to schedule_create_volume: %(cause)s") % - {'cause': cause}) - - def _notify_failure(cause): - """When scheduling fails send out a event that it failed.""" - topic = "scheduler.create_volume" - payload = { - 'request_spec': request_spec, - 'volume_properties': request_spec.get('volume_properties', {}), - 'volume_id': volume_id, - 'state': 'error', - 'method': 'create_volume', - 'reason': cause, - } - try: - rpc.get_notifier('scheduler').error(context, topic, payload) - except exception.CinderException: - LOG.exception(_("Failed notifying on %(topic)s " - "payload %(payload)s") % {'topic': topic, - 'payload': payload}) - - try: - driver.schedule_create_volume(context, request_spec, - filter_properties) - except exception.NoValidHost as e: - # Not host found happened, notify on the scheduler queue and log - # that this happened and set the volume to errored out and - # *do not* reraise the error (since whats the point). - _notify_failure(e) - _log_failure(e) - common.error_out_volume(context, db, volume_id, reason=e) - except Exception as e: - # Some other error happened, notify on the scheduler queue and log - # that this happened and set the volume to errored out and - # *do* reraise the error. - with excutils.save_and_reraise_exception(): - _notify_failure(e) - _log_failure(e) - common.error_out_volume(context, db, volume_id, reason=e) - - scheduler_flow.add(task.FunctorTask(schedule_create_volume)) + # This will activate the desired scheduler driver (and handle any + # driver related failures appropriately). + scheduler_flow.add(ScheduleCreateVolumeTask(db_api, driver_api)) # Now load (but do not run) the flow using the provided initial data. return taskflow.engines.load(scheduler_flow, store=create_what) -- 2.45.2