import taskflow.engines
from taskflow.patterns import linear_flow
-from taskflow import task
from cinder import exception
from cinder import flow_utils
default_provides = set(['request_spec'])
- def __init__(self, db, **kwargs):
+ def __init__(self, db_api, **kwargs):
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
**kwargs)
- self.db = db
+ self.db_api = db_api
def _populate_request_spec(self, context, volume_id, snapshot_id,
image_id):
if not volume_id:
msg = _("No volume_id provided to populate a request_spec from")
raise exception.InvalidInput(reason=msg)
- volume_ref = self.db.volume_get(context, volume_id)
+ volume_ref = self.db_api.volume_get(context, volume_id)
volume_type_id = volume_ref.get('volume_type_id')
- vol_type = self.db.volume_type_get(context, volume_type_id)
+ vol_type = self.db_api.volume_type_get(context, volume_type_id)
return {
'volume_id': volume_id,
'snapshot_id': snapshot_id,
}
-def get_flow(context, db, driver, request_spec=None,
+class ScheduleCreateVolumeTask(flow_utils.CinderTask):
+ """Activates a scheduler driver and handles any subsequent failures.
+
+ Notification strategy: on failure the scheduler rpc notifier will be
+ activated and a notification will be emitted indicating what errored,
+ the reason, and the request (and misc. other data) that caused the error
+ to be triggered.
+
+ Reversion strategy: N/A
+ """
+ FAILURE_TOPIC = "scheduler.create_volume"
+
+ def __init__(self, db_api, driver_api, **kwargs):
+ super(ScheduleCreateVolumeTask, self).__init__(addons=[ACTION],
+ **kwargs)
+ self.db_api = db_api
+ self.driver_api = driver_api
+
+ def _handle_failure(self, context, request_spec, cause):
+ try:
+ self._notify_failure(context, request_spec, cause)
+ finally:
+ LOG.error(_("Failed to run task %(name)s: %(cause)s") %
+ {'cause': cause, 'name': self.name})
+
+ def _notify_failure(self, context, request_spec, cause):
+ """When scheduling fails send out a event that it failed."""
+ payload = {
+ 'request_spec': request_spec,
+ 'volume_properties': request_spec.get('volume_properties', {}),
+ 'volume_id': request_spec['volume_id'],
+ 'state': 'error',
+ 'method': 'create_volume',
+ 'reason': cause,
+ }
+ try:
+ rpc.get_notifier('scheduler').error(context, self.FAILURE_TOPIC,
+ payload)
+ except exception.CinderException:
+ LOG.exception(_("Failed notifying on %(topic)s "
+ "payload %(payload)s") %
+ {'topic': self.FAILURE_TOPIC, 'payload': payload})
+
+ def execute(self, context, request_spec, filter_properties):
+ try:
+ self.driver_api.schedule_create_volume(context, request_spec,
+ filter_properties)
+ except exception.NoValidHost as e:
+ # No host found happened, notify on the scheduler queue and log
+ # that this happened and set the volume to errored out and
+ # *do not* reraise the error (since whats the point).
+ try:
+ self._handle_failure(context, request_spec, e)
+ finally:
+ common.error_out_volume(context, self.db_api,
+ request_spec['volume_id'], reason=e)
+ except Exception as e:
+ # Some other error happened, notify on the scheduler queue and log
+ # that this happened and set the volume to errored out and
+ # *do* reraise the error.
+ with excutils.save_and_reraise_exception():
+ try:
+ self._handle_failure(context, request_spec, e)
+ finally:
+ common.error_out_volume(context, self.db_api,
+ request_spec['volume_id'],
+ reason=e)
+
+
+def get_flow(context, db_api, driver_api, request_spec=None,
filter_properties=None,
volume_id=None, snapshot_id=None, image_id=None):
# This will extract and clean the spec from the starting values.
scheduler_flow.add(ExtractSchedulerSpecTask(
- db,
+ db_api,
rebind={'request_spec': 'raw_request_spec'}))
- def schedule_create_volume(context, request_spec, filter_properties):
-
- def _log_failure(cause):
- LOG.error(_("Failed to schedule_create_volume: %(cause)s") %
- {'cause': cause})
-
- def _notify_failure(cause):
- """When scheduling fails send out a event that it failed."""
- topic = "scheduler.create_volume"
- payload = {
- 'request_spec': request_spec,
- 'volume_properties': request_spec.get('volume_properties', {}),
- 'volume_id': volume_id,
- 'state': 'error',
- 'method': 'create_volume',
- 'reason': cause,
- }
- try:
- rpc.get_notifier('scheduler').error(context, topic, payload)
- except exception.CinderException:
- LOG.exception(_("Failed notifying on %(topic)s "
- "payload %(payload)s") % {'topic': topic,
- 'payload': payload})
-
- try:
- driver.schedule_create_volume(context, request_spec,
- filter_properties)
- except exception.NoValidHost as e:
- # Not host found happened, notify on the scheduler queue and log
- # that this happened and set the volume to errored out and
- # *do not* reraise the error (since whats the point).
- _notify_failure(e)
- _log_failure(e)
- common.error_out_volume(context, db, volume_id, reason=e)
- except Exception as e:
- # Some other error happened, notify on the scheduler queue and log
- # that this happened and set the volume to errored out and
- # *do* reraise the error.
- with excutils.save_and_reraise_exception():
- _notify_failure(e)
- _log_failure(e)
- common.error_out_volume(context, db, volume_id, reason=e)
-
- scheduler_flow.add(task.FunctorTask(schedule_create_volume))
+ # This will activate the desired scheduler driver (and handle any
+ # driver related failures appropriately).
+ scheduler_flow.add(ScheduleCreateVolumeTask(db_api, driver_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(scheduler_flow, store=create_what)