]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Refactoring of create_volume to use taskflow.
authorJoshua Harlow <harlowja@yahoo-inc.com>
Thu, 1 Aug 2013 19:08:04 +0000 (12:08 -0700)
committerJoshua Harlow <harlowja@yahoo-inc.com>
Thu, 8 Aug 2013 02:08:21 +0000 (19:08 -0700)
Move the create_volume workflow to using taskflow and
split that workflow into three major pieces (each with
there own workflow) and create tasks that perform the
individual required actions to accomplish the pieces
desired outcome.

1. An api workflow composed of the following tasks:
  - Extracting volume request (which checks types, values) and creates a
    standard output for other tasks to work on (allowing further tasks to be
    plugged in the chain without having to worry about other tasks output
    formats).
  - Quota reservation (rolled back on failure).
  - Database entry creation.
  - Quota committing.
  - Volume RPC casting to volume scheduler or to targeted volume manager.
2. A scheduler workflow composed of the following tasks:
  - Extracting scheduler request specification for further tasks to use.
  - Change status & notify (activated only on failure).
  - Create volume scheduler driver call (which will itself RPC cast to a
    targeted volume manager).
3. A manager workflow composed of the following tasks:
  - Extract volume request specification from incoming request for
    further tasks to use. This also breaks up the incoming request into the 4
    volume types that can be created later.
  - Change status & notify on failure or reschedule on failure, this is
    dependent on if rescheduling is enabled *and* which exception types are
    thrown from the volume creation code.
  - Create volume from specification
    - This contains the code to create from image, create raw volume, create
      from source volume, create from snapshot using the extracted volume
      specification.
  - Change status & notify success.

Key benefits:
  - Handled exceptions in a easier to understand, easier to review and more
    reliable way than they are currently being handled.
  - Rescheduling is now easier to understand.
  - Easier to understand structure with tasks that consume inputs, take some
    action on them and produce outputs and revert on subsequent failure using
    whatever they produced to know how to revert.
  - Ability to add new unit tests that can test individual task actions by
    providing mock task inputs and validating expected task outputs.

Future additions:
  - Eventual addition of resumption logic to recover from operations stopped
    halfway through.
  - Ability to centrally orchestrate the tasks and pick and choice how
    reconciliation of failures based on code or policies.

Part of bp: cinder-state-machine

Change-Id: I96b688511b35014a8c006e4d30b875dcaf409d93

19 files changed:
cinder/exception.py
cinder/policy.py
cinder/scheduler/manager.py
cinder/taskflow/__init__.py [new file with mode: 0644]
cinder/taskflow/decorators.py [new file with mode: 0644]
cinder/taskflow/exceptions.py [new file with mode: 0644]
cinder/taskflow/patterns/__init__.py [new file with mode: 0644]
cinder/taskflow/patterns/base.py [new file with mode: 0644]
cinder/taskflow/patterns/linear_flow.py [new file with mode: 0644]
cinder/taskflow/states.py [new file with mode: 0644]
cinder/taskflow/task.py [new file with mode: 0644]
cinder/taskflow/utils.py [new file with mode: 0644]
cinder/tests/test_volume.py
cinder/utils.py
cinder/volume/api.py
cinder/volume/flows/__init__.py [new file with mode: 0644]
cinder/volume/flows/create_volume.py [new file with mode: 0644]
cinder/volume/manager.py
taskflow.conf [new file with mode: 0644]

index bb53b1bd79d7966540a4080731fb0c7e51c20ba7..50b995e46f728ee8cdf2c119c3cd4238a5d3d63c 100644 (file)
@@ -580,6 +580,22 @@ class GlanceMetadataExists(Invalid):
                 " exists for volume id %(volume_id)s")
 
 
+class ExportFailure(Invalid):
+    message = _("Failed to export for volume: %(reason)s")
+
+
+class MetadataCreateFailure(Invalid):
+    message = _("Failed to create metadata for volume: %(reason)s")
+
+
+class MetadataUpdateFailure(Invalid):
+    message = _("Failed to update metadata for volume: %(reason)s")
+
+
+class MetadataCopyFailure(Invalid):
+    message = _("Failed to copy metadata to volume: %(reason)s")
+
+
 class ImageCopyFailure(Invalid):
     message = _("Failed to copy image to volume: %(reason)s")
 
index 390fc6ac09446dce2f881ed4cbb2bb5e82f4b60c..72a08ae315fe87bf919453198fb761af52645e60 100644 (file)
@@ -62,6 +62,20 @@ def _set_brain(data):
     policy.set_brain(policy.Brain.load_json(data, default_rule))
 
 
+def enforce_action(context, action):
+    """Checks that the action can be done by the given context.
+
+    Applies a check to ensure the context's project_id and user_id can be
+    applied to the given action using the policy enforcement api.
+    """
+
+    target = {
+        'project_id': context.project_id,
+        'user_id': context.user_id,
+    }
+    enforce(context, action, target)
+
+
 def enforce(context, action, target):
     """Verifies that the action is valid on the target in this context.
 
index 8d4c3c8337c63bcace71e888a974372b6083f7e2..e70c4ff885707f24f8ec3fabdc78b3e4817778e0 100644 (file)
@@ -31,8 +31,10 @@ from cinder.openstack.common import excutils
 from cinder.openstack.common import importutils
 from cinder.openstack.common import log as logging
 from cinder.openstack.common.notifier import api as notifier
+from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 
+from cinder.taskflow import states
 
 scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
                                   default='cinder.scheduler.filter_scheduler.'
@@ -81,61 +83,18 @@ class SchedulerManager(manager.Manager):
     def create_volume(self, context, topic, volume_id, snapshot_id=None,
                       image_id=None, request_spec=None,
                       filter_properties=None):
-        try:
-            if request_spec is None:
-                # For RPC version < 1.2 backward compatibility
-                request_spec = {}
-                volume_ref = db.volume_get(context, volume_id)
-                size = volume_ref.get('size')
-                availability_zone = volume_ref.get('availability_zone')
-                volume_type_id = volume_ref.get('volume_type_id')
-                vol_type = db.volume_type_get(context, volume_type_id)
-                volume_properties = {'size': size,
-                                     'availability_zone': availability_zone,
-                                     'volume_type_id': volume_type_id}
-                request_spec.update(
-                    {'volume_id': volume_id,
-                     'snapshot_id': snapshot_id,
-                     'image_id': image_id,
-                     'volume_properties': volume_properties,
-                     'volume_type': dict(vol_type).iteritems()})
-
-            self.driver.schedule_create_volume(context, request_spec,
-                                               filter_properties)
-        except exception.NoValidHost as ex:
-            volume_state = {'volume_state': {'status': 'error'}}
-            self._set_volume_state_and_notify('create_volume',
-                                              volume_state,
-                                              context, ex, request_spec)
-        except Exception as ex:
-            with excutils.save_and_reraise_exception():
-                volume_state = {'volume_state': {'status': 'error'}}
-                self._set_volume_state_and_notify('create_volume',
-                                                  volume_state,
-                                                  context, ex, request_spec)
 
-    def _set_volume_state_and_notify(self, method, updates, context, ex,
-                                     request_spec):
-        LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
-                  {'method': method, 'ex': ex})
-
-        volume_state = updates['volume_state']
-        properties = request_spec.get('volume_properties', {})
-
-        volume_id = request_spec.get('volume_id', None)
+        flow = create_volume.get_scheduler_flow(db, self.driver,
+                                                request_spec,
+                                                filter_properties,
+                                                volume_id, snapshot_id,
+                                                image_id)
+        assert flow, _('Schedule volume flow not retrieved')
 
-        if volume_id:
-            db.volume_update(context, volume_id, volume_state)
-
-        payload = dict(request_spec=request_spec,
-                       volume_properties=properties,
-                       volume_id=volume_id,
-                       state=volume_state,
-                       method=method,
-                       reason=ex)
-
-        notifier.notify(context, notifier.publisher_id("scheduler"),
-                        'scheduler.' + method, notifier.ERROR, payload)
+        flow.run(context)
+        if flow.state != states.SUCCESS:
+            LOG.warn(_("Failed to successfully complete"
+                       " schedule volume using flow: %s"), flow)
 
     def request_service_capabilities(self, context):
         volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
@@ -164,3 +123,28 @@ class SchedulerManager(manager.Manager):
             volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
                                                      tgt_host,
                                                      force_host_copy)
+
+    def _set_volume_state_and_notify(self, method, updates, context, ex,
+                                     request_spec):
+        # TODO(harlowja): move into a task that just does this later.
+
+        LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
+                  {'method': method, 'ex': ex})
+
+        volume_state = updates['volume_state']
+        properties = request_spec.get('volume_properties', {})
+
+        volume_id = request_spec.get('volume_id', None)
+
+        if volume_id:
+            db.volume_update(context, volume_id, volume_state)
+
+        payload = dict(request_spec=request_spec,
+                       volume_properties=properties,
+                       volume_id=volume_id,
+                       state=volume_state,
+                       method=method,
+                       reason=ex)
+
+        notifier.notify(context, notifier.publisher_id("scheduler"),
+                        'scheduler.' + method, notifier.ERROR, payload)
diff --git a/cinder/taskflow/__init__.py b/cinder/taskflow/__init__.py
new file mode 100644 (file)
index 0000000..1f19be5
--- /dev/null
@@ -0,0 +1 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
diff --git a/cinder/taskflow/decorators.py b/cinder/taskflow/decorators.py
new file mode 100644 (file)
index 0000000..ea99d13
--- /dev/null
@@ -0,0 +1,276 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import functools
+import inspect
+import types
+
+# These arguments are ones that we will skip when parsing for requirements
+# for a function to operate (when used as a task).
+AUTO_ARGS = ('self', 'context', 'cls')
+
+
+def is_decorated(functor):
+    if not isinstance(functor, (types.MethodType, types.FunctionType)):
+        return False
+    return getattr(extract(functor), '__task__', False)
+
+
+def extract(functor):
+    # Extract the underlying functor if its a method since we can not set
+    # attributes on instance methods, this is supposedly fixed in python 3
+    # and later.
+    #
+    # TODO(harlowja): add link to this fix.
+    assert isinstance(functor, (types.MethodType, types.FunctionType))
+    if isinstance(functor, types.MethodType):
+        return functor.__func__
+    else:
+        return functor
+
+
+def _mark_as_task(functor):
+    setattr(functor, '__task__', True)
+
+
+def _get_wrapped(function):
+    """Get the method at the bottom of a stack of decorators."""
+
+    if hasattr(function, '__wrapped__'):
+        return getattr(function, '__wrapped__')
+
+    if not hasattr(function, 'func_closure') or not function.func_closure:
+        return function
+
+    def _get_wrapped_function(function):
+        if not hasattr(function, 'func_closure') or not function.func_closure:
+            return None
+
+        for closure in function.func_closure:
+            func = closure.cell_contents
+
+            deeper_func = _get_wrapped_function(func)
+            if deeper_func:
+                return deeper_func
+            elif hasattr(closure.cell_contents, '__call__'):
+                return closure.cell_contents
+
+    return _get_wrapped_function(function)
+
+
+def _take_arg(a):
+    if a in AUTO_ARGS:
+        return False
+    # In certain decorator cases it seems like we get the function to be
+    # decorated as an argument, we don't want to take that as a real argument.
+    if isinstance(a, collections.Callable):
+        return False
+    return True
+
+
+def wraps(fn):
+    """This will not be needed in python 3.2 or greater which already has this
+    built-in to its functools.wraps method.
+    """
+
+    def wrapper(f):
+        f = functools.wraps(fn)(f)
+        f.__wrapped__ = getattr(fn, '__wrapped__', fn)
+        return f
+
+    return wrapper
+
+
+def locked(f):
+
+    @wraps(f)
+    def wrapper(self, *args, **kwargs):
+        with self._lock:
+            return f(self, *args, **kwargs)
+
+    return wrapper
+
+
+def task(*args, **kwargs):
+    """Decorates a given function and ensures that all needed attributes of
+    that function are set so that the function can be used as a task.
+    """
+
+    def decorator(f):
+        w_f = extract(f)
+
+        def noop(*args, **kwargs):
+            pass
+
+        # Mark as being a task
+        _mark_as_task(w_f)
+
+        # By default don't revert this.
+        w_f.revert = kwargs.pop('revert_with', noop)
+
+        # Associate a name of this task that is the module + function name.
+        w_f.name = "%s.%s" % (f.__module__, f.__name__)
+
+        # Sets the version of the task.
+        version = kwargs.pop('version', (1, 0))
+        f = _versionize(*version)(f)
+
+        # Attach any requirements this function needs for running.
+        requires_what = kwargs.pop('requires', [])
+        f = _requires(*requires_what, **kwargs)(f)
+
+        # Attach any optional requirements this function needs for running.
+        optional_what = kwargs.pop('optional', [])
+        f = _optional(*optional_what, **kwargs)(f)
+
+        # Attach any items this function provides as output
+        provides_what = kwargs.pop('provides', [])
+        f = _provides(*provides_what, **kwargs)(f)
+
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            return f(*args, **kwargs)
+
+        return wrapper
+
+    # This is needed to handle when the decorator has args or the decorator
+    # doesn't have args, python is rather weird here...
+    if kwargs or not args:
+        return decorator
+    else:
+        if isinstance(args[0], collections.Callable):
+            return decorator(args[0])
+        else:
+            return decorator
+
+
+def _versionize(major, minor=None):
+    """A decorator that marks the wrapped function with a major & minor version
+    number.
+    """
+
+    if minor is None:
+        minor = 0
+
+    def decorator(f):
+        w_f = extract(f)
+        w_f.version = (major, minor)
+
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            return f(*args, **kwargs)
+
+        return wrapper
+
+    return decorator
+
+
+def _optional(*args, **kwargs):
+    """Attaches a set of items that the decorated function would like as input
+    to the functions underlying dictionary.
+    """
+
+    def decorator(f):
+        w_f = extract(f)
+
+        if not hasattr(w_f, 'optional'):
+            w_f.optional = set()
+
+        w_f.optional.update([a for a in args if _take_arg(a)])
+
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            return f(*args, **kwargs)
+
+        return wrapper
+
+    # This is needed to handle when the decorator has args or the decorator
+    # doesn't have args, python is rather weird here...
+    if kwargs or not args:
+        return decorator
+    else:
+        if isinstance(args[0], collections.Callable):
+            return decorator(args[0])
+        else:
+            return decorator
+
+
+def _requires(*args, **kwargs):
+    """Attaches a set of items that the decorated function requires as input
+    to the functions underlying dictionary.
+    """
+
+    def decorator(f):
+        w_f = extract(f)
+
+        if not hasattr(w_f, 'requires'):
+            w_f.requires = set()
+
+        if kwargs.pop('auto_extract', True):
+            inspect_what = _get_wrapped(f)
+            f_args = inspect.getargspec(inspect_what).args
+            w_f.requires.update([a for a in f_args if _take_arg(a)])
+
+        w_f.requires.update([a for a in args if _take_arg(a)])
+
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            return f(*args, **kwargs)
+
+        return wrapper
+
+    # This is needed to handle when the decorator has args or the decorator
+    # doesn't have args, python is rather weird here...
+    if kwargs or not args:
+        return decorator
+    else:
+        if isinstance(args[0], collections.Callable):
+            return decorator(args[0])
+        else:
+            return decorator
+
+
+def _provides(*args, **kwargs):
+    """Attaches a set of items that the decorated function provides as output
+    to the functions underlying dictionary.
+    """
+
+    def decorator(f):
+        w_f = extract(f)
+
+        if not hasattr(f, 'provides'):
+            w_f.provides = set()
+
+        w_f.provides.update([a for a in args if _take_arg(a)])
+
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            return f(*args, **kwargs)
+
+        return wrapper
+
+    # This is needed to handle when the decorator has args or the decorator
+    # doesn't have args, python is rather weird here...
+    if kwargs or not args:
+        return decorator
+    else:
+        if isinstance(args[0], collections.Callable):
+            return decorator(args[0])
+        else:
+            return decorator
diff --git a/cinder/taskflow/exceptions.py b/cinder/taskflow/exceptions.py
new file mode 100644 (file)
index 0000000..d50e30d
--- /dev/null
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+class TaskFlowException(Exception):
+    """Base class for exceptions emitted from this library."""
+    pass
+
+
+class Duplicate(TaskFlowException):
+    """Raised when a duplicate entry is found."""
+    pass
+
+
+class NotFound(TaskFlowException):
+    """Raised when some entry in some object doesn't exist."""
+    pass
+
+
+class AlreadyExists(TaskFlowException):
+    """Raised when some entry in some object already exists."""
+    pass
+
+
+class ClosedException(TaskFlowException):
+    """Raised when an access on a closed object occurs."""
+    pass
+
+
+class InvalidStateException(TaskFlowException):
+    """Raised when a task/job/workflow is in an invalid state when an
+    operation is attempting to apply to said task/job/workflow.
+    """
+    pass
+
+
+class UnclaimableJobException(TaskFlowException):
+    """Raised when a job can not be claimed."""
+    pass
+
+
+class JobNotFound(TaskFlowException):
+    """Raised when a job entry can not be found."""
+    pass
+
+
+class MissingDependencies(InvalidStateException):
+    """Raised when a task has dependencies that can not be satisified."""
+    message = ("%(task)s requires %(requirements)s but no other task produces"
+               " said requirements")
+
+    def __init__(self, task, requirements):
+        message = self.message % {'task': task, 'requirements': requirements}
+        super(MissingDependencies, self).__init__(message)
diff --git a/cinder/taskflow/patterns/__init__.py b/cinder/taskflow/patterns/__init__.py
new file mode 100644 (file)
index 0000000..1f19be5
--- /dev/null
@@ -0,0 +1 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
diff --git a/cinder/taskflow/patterns/base.py b/cinder/taskflow/patterns/base.py
new file mode 100644 (file)
index 0000000..edb5380
--- /dev/null
@@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import threading
+
+from cinder.openstack.common import uuidutils
+
+from cinder.taskflow import decorators
+from cinder.taskflow import exceptions as exc
+from cinder.taskflow import states
+from cinder.taskflow import utils
+
+
+class Flow(object):
+    """The base abstract class of all flow implementations.
+
+    It provides a set of parents to flows that have a concept of parent flows
+    as well as a state and state utility functions to the deriving classes. It
+    also provides a name and an identifier (uuid or other) to the flow so that
+    it can be uniquely identifed among many flows.
+
+    Flows are expected to provide (if desired) the following methods:
+    - add
+    - add_many
+    - interrupt
+    - reset
+    - rollback
+    - run
+    - soft_reset
+    """
+
+    __metaclass__ = abc.ABCMeta
+
+    # Common states that certain actions can be performed in. If the flow
+    # is not in these sets of states then it is likely that the flow operation
+    # can not succeed.
+    RESETTABLE_STATES = set([
+        states.INTERRUPTED,
+        states.SUCCESS,
+        states.PENDING,
+        states.FAILURE,
+    ])
+    SOFT_RESETTABLE_STATES = set([
+        states.INTERRUPTED,
+    ])
+    UNINTERRUPTIBLE_STATES = set([
+        states.FAILURE,
+        states.SUCCESS,
+        states.PENDING,
+    ])
+    RUNNABLE_STATES = set([
+        states.PENDING,
+    ])
+
+    def __init__(self, name, parents=None, uuid=None):
+        self._name = str(name)
+        # The state of this flow.
+        self._state = states.PENDING
+        # If this flow has a parent flow/s which need to be reverted if
+        # this flow fails then please include them here to allow this child
+        # to call the parents...
+        if parents:
+            self.parents = tuple(parents)
+        else:
+            self.parents = ()
+        # Any objects that want to listen when a wf/task starts/stops/completes
+        # or errors should be registered here. This can be used to monitor
+        # progress and record tasks finishing (so that it becomes possible to
+        # store the result of a task in some persistent or semi-persistent
+        # storage backend).
+        self.notifier = utils.TransitionNotifier()
+        self.task_notifier = utils.TransitionNotifier()
+        # Ensure that modifications and/or multiple runs aren't happening
+        # at the same time in the same flow at the same time.
+        self._lock = threading.RLock()
+        # Assign this flow a unique identifer.
+        if uuid:
+            self._id = str(uuid)
+        else:
+            self._id = uuidutils.generate_uuid()
+
+    @property
+    def name(self):
+        """A non-unique name for this flow (human readable)"""
+        return self._name
+
+    @property
+    def uuid(self):
+        """Uniquely identifies this flow"""
+        return "f-%s" % (self._id)
+
+    @property
+    def state(self):
+        """Provides a read-only view of the flow state."""
+        return self._state
+
+    def _change_state(self, context, new_state):
+        was_changed = False
+        old_state = self.state
+        with self._lock:
+            if self.state != new_state:
+                old_state = self.state
+                self._state = new_state
+                was_changed = True
+        if was_changed:
+            # Don't notify while holding the lock.
+            self.notifier.notify(self.state, details={
+                'context': context,
+                'flow': self,
+                'old_state': old_state,
+            })
+
+    def __str__(self):
+        lines = ["Flow: %s" % (self.name)]
+        lines.append("%s" % (self.uuid))
+        lines.append("%s" % (len(self.parents)))
+        lines.append("%s" % (self.state))
+        return "; ".join(lines)
+
+    @abc.abstractmethod
+    def add(self, task):
+        """Adds a given task to this flow.
+
+        Returns the uuid that is associated with the task for later operations
+        before and after it is ran.
+        """
+        raise NotImplementedError()
+
+    @decorators.locked
+    def add_many(self, tasks):
+        """Adds many tasks to this flow.
+
+        Returns a list of uuids (one for each task added).
+        """
+        uuids = []
+        for t in tasks:
+            uuids.append(self.add(t))
+        return uuids
+
+    def interrupt(self):
+        """Attempts to interrupt the current flow and any tasks that are
+        currently not running in the flow.
+
+        Returns how many tasks were interrupted (if any).
+        """
+        if self.state in self.UNINTERRUPTIBLE_STATES:
+            raise exc.InvalidStateException(("Can not interrupt when"
+                                             " in state %s") % (self.state))
+        # Note(harlowja): Do *not* acquire the lock here so that the flow may
+        # be interrupted while running. This does mean the the above check may
+        # not be valid but we can worry about that if it becomes an issue.
+        old_state = self.state
+        if old_state != states.INTERRUPTED:
+            self._state = states.INTERRUPTED
+            self.notifier.notify(self.state, details={
+                'context': None,
+                'flow': self,
+                'old_state': old_state,
+            })
+        return 0
+
+    @decorators.locked
+    def reset(self):
+        """Fully resets the internal state of this flow, allowing for the flow
+        to be ran again.
+
+        Note: Listeners are also reset.
+        """
+        if self.state not in self.RESETTABLE_STATES:
+            raise exc.InvalidStateException(("Can not reset when"
+                                             " in state %s") % (self.state))
+        self.notifier.reset()
+        self.task_notifier.reset()
+        self._change_state(None, states.PENDING)
+
+    @decorators.locked
+    def soft_reset(self):
+        """Partially resets the internal state of this flow, allowing for the
+        flow to be ran again from an interrupted state only.
+        """
+        if self.state not in self.SOFT_RESETTABLE_STATES:
+            raise exc.InvalidStateException(("Can not soft reset when"
+                                             " in state %s") % (self.state))
+        self._change_state(None, states.PENDING)
+
+    @decorators.locked
+    def run(self, context, *args, **kwargs):
+        """Executes the workflow."""
+        if self.state not in self.RUNNABLE_STATES:
+            raise exc.InvalidStateException("Unable to run flow when "
+                                            "in state %s" % (self.state))
+
+    @decorators.locked
+    def rollback(self, context, cause):
+        """Performs rollback of this workflow and any attached parent workflows
+        if present.
+        """
+        pass
diff --git a/cinder/taskflow/patterns/linear_flow.py b/cinder/taskflow/patterns/linear_flow.py
new file mode 100644 (file)
index 0000000..16f220a
--- /dev/null
@@ -0,0 +1,271 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import logging
+
+from cinder.openstack.common import excutils
+
+from cinder.taskflow import decorators
+from cinder.taskflow import exceptions as exc
+from cinder.taskflow import states
+from cinder.taskflow import utils
+
+from cinder.taskflow.patterns import base
+
+LOG = logging.getLogger(__name__)
+
+
+class Flow(base.Flow):
+    """"A linear chain of tasks that can be applied in order as one unit and
+    rolled back as one unit using the reverse order that the tasks have
+    been applied in.
+
+    Note(harlowja): Each task in the chain must have requirements
+    which are satisfied by the previous task/s in the chain.
+    """
+
+    def __init__(self, name, parents=None, uuid=None):
+        super(Flow, self).__init__(name, parents, uuid)
+        # The tasks which have been applied will be collected here so that they
+        # can be reverted in the correct order on failure.
+        self._accumulator = utils.RollbackAccumulator()
+        # Tasks results are stored here. Lookup is by the uuid that was
+        # returned from the add function.
+        self.results = {}
+        # The previously left off iterator that can be used to resume from
+        # the last task (if interrupted and soft-reset).
+        self._leftoff_at = None
+        # All runners to run are collected here.
+        self._runners = []
+        self._connected = False
+        # The resumption strategy to use.
+        self.resumer = None
+
+    @decorators.locked
+    def add(self, task):
+        """Adds a given task to this flow."""
+        assert isinstance(task, collections.Callable)
+        r = utils.Runner(task)
+        r.runs_before = list(reversed(self._runners))
+        self._runners.append(r)
+        self._reset_internals()
+        return r.uuid
+
+    def _reset_internals(self):
+        self._connected = False
+        self._leftoff_at = None
+
+    def _associate_providers(self, runner):
+        # Ensure that some previous task provides this input.
+        who_provides = {}
+        task_requires = runner.requires
+        for r in task_requires:
+            provider = None
+            for before_me in runner.runs_before:
+                if r in before_me.provides:
+                    provider = before_me
+                    break
+            if provider:
+                who_provides[r] = provider
+        # Ensure that the last task provides all the needed input for this
+        # task to run correctly.
+        missing_requires = task_requires - set(who_provides.keys())
+        if missing_requires:
+            raise exc.MissingDependencies(runner, sorted(missing_requires))
+        runner.providers.update(who_provides)
+
+    def __str__(self):
+        lines = ["LinearFlow: %s" % (self.name)]
+        lines.append("%s" % (self.uuid))
+        lines.append("%s" % (len(self._runners)))
+        lines.append("%s" % (len(self.parents)))
+        lines.append("%s" % (self.state))
+        return "; ".join(lines)
+
+    @decorators.locked
+    def remove(self, uuid):
+        index_removed = -1
+        for (i, r) in enumerate(self._runners):
+            if r.uuid == uuid:
+                index_removed = i
+                break
+        if index_removed == -1:
+            raise ValueError("No runner found with uuid %s" % (uuid))
+        else:
+            removed = self._runners.pop(index_removed)
+            self._reset_internals()
+            # Go and remove it from any runner after the removed runner since
+            # those runners may have had an attachment to it.
+            for r in self._runners[index_removed:]:
+                try:
+                    r.runs_before.remove(removed)
+                except (IndexError, ValueError):
+                    pass
+
+    def __len__(self):
+        return len(self._runners)
+
+    def _connect(self):
+        if self._connected:
+            return self._runners
+        for r in self._runners:
+            r.providers = {}
+        for r in reversed(self._runners):
+            self._associate_providers(r)
+        self._connected = True
+        return self._runners
+
+    def _ordering(self):
+        return iter(self._connect())
+
+    @decorators.locked
+    def run(self, context, *args, **kwargs):
+        super(Flow, self).run(context, *args, **kwargs)
+
+        def resume_it():
+            if self._leftoff_at is not None:
+                return ([], self._leftoff_at)
+            if self.resumer:
+                (finished, leftover) = self.resumer.resume(self,
+                                                           self._ordering())
+            else:
+                finished = []
+                leftover = self._ordering()
+            return (finished, leftover)
+
+        self._change_state(context, states.STARTED)
+        try:
+            those_finished, leftover = resume_it()
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                self._change_state(context, states.FAILURE)
+
+        def run_it(runner, failed=False, result=None, simulate_run=False):
+            try:
+                # Add the task to be rolled back *immediately* so that even if
+                # the task fails while producing results it will be given a
+                # chance to rollback.
+                rb = utils.RollbackTask(context, runner.task, result=None)
+                self._accumulator.add(rb)
+                self.task_notifier.notify(states.STARTED, details={
+                    'context': context,
+                    'flow': self,
+                    'runner': runner,
+                })
+                if not simulate_run:
+                    result = runner(context, *args, **kwargs)
+                else:
+                    if failed:
+                        # TODO(harlowja): make this configurable??
+                        # If we previously failed, we want to fail again at
+                        # the same place.
+                        if not result:
+                            # If no exception or exception message was provided
+                            # or captured from the previous run then we need to
+                            # form one for this task.
+                            result = "%s failed running." % (runner.task)
+                        if isinstance(result, basestring):
+                            result = exc.InvalidStateException(result)
+                        if not isinstance(result, Exception):
+                            LOG.warn("Can not raise a non-exception"
+                                     " object: %s", result)
+                            result = exc.InvalidStateException()
+                        raise result
+                # Adjust the task result in the accumulator before
+                # notifying others that the task has finished to
+                # avoid the case where a listener might throw an
+                # exception.
+                rb.result = result
+                runner.result = result
+                self.results[runner.uuid] = result
+                self.task_notifier.notify(states.SUCCESS, details={
+                    'context': context,
+                    'flow': self,
+                    'runner': runner,
+                })
+            except Exception as e:
+                runner.result = e
+                cause = utils.FlowFailure(runner, self, e)
+                with excutils.save_and_reraise_exception():
+                    # Notify any listeners that the task has errored.
+                    self.task_notifier.notify(states.FAILURE, details={
+                        'context': context,
+                        'flow': self,
+                        'runner': runner,
+                    })
+                    self.rollback(context, cause)
+
+        if len(those_finished):
+            self._change_state(context, states.RESUMING)
+            for (r, details) in those_finished:
+                # Fake running the task so that we trigger the same
+                # notifications and state changes (and rollback that
+                # would have happened in a normal flow).
+                failed = states.FAILURE in details.get('states', [])
+                result = details.get('result')
+                run_it(r, failed=failed, result=result, simulate_run=True)
+
+        self._leftoff_at = leftover
+        self._change_state(context, states.RUNNING)
+        if self.state == states.INTERRUPTED:
+            return
+
+        was_interrupted = False
+        for r in leftover:
+            r.reset()
+            run_it(r)
+            if self.state == states.INTERRUPTED:
+                was_interrupted = True
+                break
+
+        if not was_interrupted:
+            # Only gets here if everything went successfully.
+            self._change_state(context, states.SUCCESS)
+            self._leftoff_at = None
+
+    @decorators.locked
+    def reset(self):
+        super(Flow, self).reset()
+        self.results = {}
+        self.resumer = None
+        self._accumulator.reset()
+        self._reset_internals()
+
+    @decorators.locked
+    def rollback(self, context, cause):
+        # Performs basic task by task rollback by going through the reverse
+        # order that tasks have finished and asking said task to undo whatever
+        # it has done. If this flow has any parent flows then they will
+        # also be called to rollback any tasks said parents contain.
+        #
+        # Note(harlowja): if a flow can more simply revert a whole set of
+        # tasks via a simpler command then it can override this method to
+        # accomplish that.
+        #
+        # For example, if each task was creating a file in a directory, then
+        # it's easier to just remove the directory than to ask each task to
+        # delete its file individually.
+        self._change_state(context, states.REVERTING)
+        try:
+            self._accumulator.rollback(cause)
+        finally:
+            self._change_state(context, states.FAILURE)
+        # Rollback any parents flows if they exist...
+        for p in self.parents:
+            p.rollback(context, cause)
diff --git a/cinder/taskflow/states.py b/cinder/taskflow/states.py
new file mode 100644 (file)
index 0000000..48d320a
--- /dev/null
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+# Job states.
+CLAIMED = 'CLAIMED'
+FAILURE = 'FAILURE'
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+SUCCESS = 'SUCCESS'
+UNCLAIMED = 'UNCLAIMED'
+
+# Flow states.
+FAILURE = FAILURE
+INTERRUPTED = 'INTERRUPTED'
+PENDING = 'PENDING'
+RESUMING = 'RESUMING'
+REVERTING = 'REVERTING'
+RUNNING = RUNNING
+STARTED = 'STARTED'
+SUCCESS = SUCCESS
+
+# Task states.
+FAILURE = FAILURE
+STARTED = STARTED
+SUCCESS = SUCCESS
diff --git a/cinder/taskflow/task.py b/cinder/taskflow/task.py
new file mode 100644 (file)
index 0000000..57753d7
--- /dev/null
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+
+from cinder.taskflow import utils
+
+
+class Task(object):
+    """An abstraction that defines a potential piece of work that can be
+    applied and can be reverted to undo the work as a single unit.
+    """
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self, name):
+        self.name = name
+        # An *immutable* input 'resource' name set this task depends
+        # on existing before this task can be applied.
+        self.requires = set()
+        # An *immutable* input 'resource' name set this task would like to
+        # depends on existing before this task can be applied (but does not
+        # strongly depend on existing).
+        self.optional = set()
+        # An *immutable* output 'resource' name set this task
+        # produces that other tasks may depend on this task providing.
+        self.provides = set()
+        # This identifies the version of the task to be ran which
+        # can be useful in resuming older versions of tasks. Standard
+        # major, minor version semantics apply.
+        self.version = (1, 0)
+
+    def __str__(self):
+        return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
+
+    @abc.abstractmethod
+    def __call__(self, context, *args, **kwargs):
+        """Activate a given task which will perform some operation and return.
+
+           This method can be used to apply some given context and given set
+           of args and kwargs to accomplish some goal. Note that the result
+           that is returned needs to be serializable so that it can be passed
+           back into this task if reverting is triggered.
+        """
+        raise NotImplementedError()
+
+    def revert(self, context, result, cause):
+        """Revert this task using the given context, result that the apply
+           provided as well as any information which may have caused
+           said reversion.
+        """
+        pass
diff --git a/cinder/taskflow/utils.py b/cinder/taskflow/utils.py
new file mode 100644 (file)
index 0000000..81d53d8
--- /dev/null
@@ -0,0 +1,464 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import contextlib
+import copy
+import logging
+import re
+import sys
+import threading
+import time
+import types
+
+from cinder.openstack.common import uuidutils
+
+from cinder.taskflow import decorators
+
+LOG = logging.getLogger(__name__)
+
+
+def get_attr(task, field, default=None):
+    if decorators.is_decorated(task):
+        # If its a decorated functor then the attributes will be either
+        # in the underlying function of the instancemethod or the function
+        # itself.
+        task = decorators.extract(task)
+    return getattr(task, field, default)
+
+
+def join(itr, with_what=","):
+    pieces = [str(i) for i in itr]
+    return with_what.join(pieces)
+
+
+def get_many_attr(obj, *attrs):
+    many = []
+    for a in attrs:
+        many.append(get_attr(obj, a, None))
+    return many
+
+
+def get_task_version(task):
+    """Gets a tasks *string* version, whether it is a task object/function."""
+    task_version = get_attr(task, 'version')
+    if isinstance(task_version, (list, tuple)):
+        task_version = join(task_version, with_what=".")
+    if task_version is not None and not isinstance(task_version, basestring):
+        task_version = str(task_version)
+    return task_version
+
+
+def get_task_name(task):
+    """Gets a tasks *string* name, whether it is a task object/function."""
+    task_name = ""
+    if isinstance(task, (types.MethodType, types.FunctionType)):
+        # If its a function look for the attributes that should have been
+        # set using the task() decorator provided in the decorators file. If
+        # those have not been set, then we should at least have enough basic
+        # information (not a version) to form a useful task name.
+        task_name = get_attr(task, 'name')
+        if not task_name:
+            name_pieces = [a for a in get_many_attr(task,
+                                                    '__module__',
+                                                    '__name__')
+                           if a is not None]
+            task_name = join(name_pieces, ".")
+    else:
+        task_name = str(task)
+    return task_name
+
+
+def is_version_compatible(version_1, version_2):
+    """Checks for major version compatibility of two *string" versions."""
+    if version_1 == version_2:
+        # Equivalent exactly, so skip the rest.
+        return True
+
+    def _convert_to_pieces(version):
+        try:
+            pieces = []
+            for p in version.split("."):
+                p = p.strip()
+                if not len(p):
+                    pieces.append(0)
+                    continue
+                # Clean off things like 1alpha, or 2b and just select the
+                # digit that starts that entry instead.
+                p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
+                if p_match:
+                    p = p_match.group(1)
+                pieces.append(int(p))
+        except (AttributeError, TypeError, ValueError):
+            pieces = []
+        return pieces
+
+    version_1_pieces = _convert_to_pieces(version_1)
+    version_2_pieces = _convert_to_pieces(version_2)
+    if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
+        return False
+
+    # Ensure major version compatibility to start.
+    major1 = version_1_pieces[0]
+    major2 = version_2_pieces[0]
+    if major1 != major2:
+        return False
+    return True
+
+
+def await(check_functor, timeout=None):
+    if timeout is not None:
+        end_time = time.time() + max(0, timeout)
+    else:
+        end_time = None
+    # Use the same/similar scheme that the python condition class uses.
+    delay = 0.0005
+    while not check_functor():
+        time.sleep(delay)
+        if end_time is not None:
+            remaining = end_time - time.time()
+            if remaining <= 0:
+                return False
+            delay = min(delay * 2, remaining, 0.05)
+        else:
+            delay = min(delay * 2, 0.05)
+    return True
+
+
+class LastFedIter(object):
+    """An iterator which yields back the first item and then yields back
+    results from the provided iterator.
+    """
+
+    def __init__(self, first, rest_itr):
+        self.first = first
+        self.rest_itr = rest_itr
+
+    def __iter__(self):
+        yield self.first
+        for i in self.rest_itr:
+            yield i
+
+
+class FlowFailure(object):
+    """When a task failure occurs the following object will be given to revert
+       and can be used to interrogate what caused the failure.
+    """
+
+    def __init__(self, runner, flow, exception):
+        self.runner = runner
+        self.flow = flow
+        self.exc = exception
+        self.exc_info = sys.exc_info()
+
+
+class RollbackTask(object):
+    """A helper task that on being called will call the underlying callable
+    tasks revert method (if said method exists).
+    """
+
+    def __init__(self, context, task, result):
+        self.task = task
+        self.result = result
+        self.context = context
+
+    def __str__(self):
+        return str(self.task)
+
+    def __call__(self, cause):
+        if ((hasattr(self.task, "revert") and
+             isinstance(self.task.revert, collections.Callable))):
+            self.task.revert(self.context, self.result, cause)
+
+
+class Runner(object):
+    """A helper class that wraps a task and can find the needed inputs for
+    the task to run, as well as providing a uuid and other useful functionality
+    for users of the task.
+
+    TODO(harlowja): replace with the task details object or a subclass of
+    that???
+    """
+
+    def __init__(self, task, uuid=None):
+        assert isinstance(task, collections.Callable)
+        self.task = task
+        self.providers = {}
+        self.runs_before = []
+        self.result = None
+        if not uuid:
+            self._id = uuidutils.generate_uuid()
+        else:
+            self._id = str(uuid)
+
+    @property
+    def uuid(self):
+        return "r-%s" % (self._id)
+
+    @property
+    def requires(self):
+        return set(get_attr(self.task, 'requires', []))
+
+    @property
+    def provides(self):
+        return set(get_attr(self.task, 'provides', []))
+
+    @property
+    def optional(self):
+        return set(get_attr(self.task, 'optional', []))
+
+    @property
+    def version(self):
+        return get_task_version(self.task)
+
+    @property
+    def name(self):
+        return get_task_name(self.task)
+
+    def reset(self):
+        self.result = None
+
+    def __str__(self):
+        lines = ["Runner: %s" % (self.name)]
+        lines.append("%s" % (self.uuid))
+        lines.append("%s" % (self.version))
+        return "; ".join(lines)
+
+    def __call__(self, *args, **kwargs):
+        # Find all of our inputs first.
+        kwargs = dict(kwargs)
+        for (k, who_made) in self.providers.iteritems():
+            if who_made.result and k in who_made.result:
+                kwargs[k] = who_made.result[k]
+            else:
+                kwargs[k] = None
+        optional_keys = self.optional
+        optional_missing_keys = optional_keys - set(kwargs.keys())
+        if optional_missing_keys:
+            for k in optional_missing_keys:
+                for r in self.runs_before:
+                    r_provides = r.provides
+                    if k in r_provides and r.result and k in r.result:
+                        kwargs[k] = r.result[k]
+                        break
+        # And now finally run.
+        self.result = self.task(*args, **kwargs)
+        return self.result
+
+
+class TransitionNotifier(object):
+    """A utility helper class that can be used to subscribe to
+    notifications of events occuring as well as allow a entity to post said
+    notifications to subscribers.
+    """
+
+    RESERVED_KEYS = ('details',)
+    ANY = '*'
+
+    def __init__(self):
+        self._listeners = collections.defaultdict(list)
+
+    def reset(self):
+        self._listeners = collections.defaultdict(list)
+
+    def notify(self, state, details):
+        listeners = list(self._listeners.get(self.ANY, []))
+        for i in self._listeners[state]:
+            if i not in listeners:
+                listeners.append(i)
+        if not listeners:
+            return
+        for (callback, args, kwargs) in listeners:
+            if args is None:
+                args = []
+            if kwargs is None:
+                kwargs = {}
+            kwargs['details'] = details
+            try:
+                callback(state, *args, **kwargs)
+            except Exception:
+                LOG.exception(("Failure calling callback %s to notify about"
+                               " state transition %s"), callback, state)
+
+    def register(self, state, callback, args=None, kwargs=None):
+        assert isinstance(callback, collections.Callable)
+        for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
+            if cb is callback:
+                raise ValueError("Callback %s already registered" % (callback))
+        if kwargs:
+            for k in self.RESERVED_KEYS:
+                if k in kwargs:
+                    raise KeyError(("Reserved key '%s' not allowed in "
+                                    "kwargs") % k)
+            kwargs = copy.copy(kwargs)
+        if args:
+            args = copy.copy(args)
+        self._listeners[state].append((callback, args, kwargs))
+
+    def deregister(self, state, callback):
+        if state not in self._listeners:
+            return
+        for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
+            if cb is callback:
+                self._listeners[state].pop(i)
+                break
+
+
+class RollbackAccumulator(object):
+    """A utility class that can help in organizing 'undo' like code
+    so that said code be rolled back on failure (automatically or manually)
+    by activating rollback callables that were inserted during said codes
+    progression.
+    """
+
+    def __init__(self):
+        self._rollbacks = []
+
+    def add(self, *callables):
+        self._rollbacks.extend(callables)
+
+    def reset(self):
+        self._rollbacks = []
+
+    def __len__(self):
+        return len(self._rollbacks)
+
+    def __iter__(self):
+        # Rollbacks happen in the reverse order that they were added.
+        return reversed(self._rollbacks)
+
+    def __enter__(self):
+        return self
+
+    def rollback(self, cause):
+        LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
+        for (i, f) in enumerate(self):
+            LOG.debug("Calling rollback %s: %s", i + 1, f)
+            try:
+                f(cause)
+            except Exception:
+                LOG.exception(("Failed rolling back %s: %s due "
+                               "to inner exception."), i + 1, f)
+
+    def __exit__(self, type, value, tb):
+        if any((value, type, tb)):
+            self.rollback(value)
+
+
+class ReaderWriterLock(object):
+    """A simple reader-writer lock.
+
+    Several readers can hold the lock simultaneously, and only one writer.
+    Write locks have priority over reads to prevent write starvation.
+
+    Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
+    """
+
+    def __init__(self):
+        self.rwlock = 0
+        self.writers_waiting = 0
+        self.monitor = threading.Lock()
+        self.readers_ok = threading.Condition(self.monitor)
+        self.writers_ok = threading.Condition(self.monitor)
+
+    @contextlib.contextmanager
+    def acquire(self, read=True):
+        """Acquire a read or write lock in a context manager."""
+        try:
+            if read:
+                self.acquire_read()
+            else:
+                self.acquire_write()
+            yield self
+        finally:
+            self.release()
+
+    def acquire_read(self):
+        """Acquire a read lock.
+
+        Several threads can hold this typeof lock.
+        It is exclusive with write locks.
+        """
+
+        self.monitor.acquire()
+        while self.rwlock < 0 or self.writers_waiting:
+            self.readers_ok.wait()
+        self.rwlock += 1
+        self.monitor.release()
+
+    def acquire_write(self):
+        """Acquire a write lock.
+
+        Only one thread can hold this lock, and only when no read locks
+        are also held.
+        """
+
+        self.monitor.acquire()
+        while self.rwlock != 0:
+            self.writers_waiting += 1
+            self.writers_ok.wait()
+            self.writers_waiting -= 1
+        self.rwlock = -1
+        self.monitor.release()
+
+    def release(self):
+        """Release a lock, whether read or write."""
+
+        self.monitor.acquire()
+        if self.rwlock < 0:
+            self.rwlock = 0
+        else:
+            self.rwlock -= 1
+        wake_writers = self.writers_waiting and self.rwlock == 0
+        wake_readers = self.writers_waiting == 0
+        self.monitor.release()
+        if wake_writers:
+            self.writers_ok.acquire()
+            self.writers_ok.notify()
+            self.writers_ok.release()
+        elif wake_readers:
+            self.readers_ok.acquire()
+            self.readers_ok.notifyAll()
+            self.readers_ok.release()
+
+
+class LazyPluggable(object):
+    """A pluggable backend loaded lazily based on some value."""
+
+    def __init__(self, pivot, **backends):
+        self.__backends = backends
+        self.__pivot = pivot
+        self.__backend = None
+
+    def __get_backend(self):
+        if not self.__backend:
+            backend_name = 'sqlalchemy'
+            backend = self.__backends[backend_name]
+            if isinstance(backend, tuple):
+                name = backend[0]
+                fromlist = backend[1]
+            else:
+                name = backend
+                fromlist = backend
+
+            self.__backend = __import__(name, None, None, fromlist)
+        return self.__backend
+
+    def __getattr__(self, key):
+        backend = self.__get_backend()
+        return getattr(backend, key)
index 512e17b1f6f59c2732b4e7336a60c45e1e5bf137..82d5c57bc5945d4a8f76b8c5cdcb2d1b7ee2fcda 100644 (file)
@@ -49,6 +49,7 @@ from cinder.tests.image import fake as fake_image
 from cinder.volume import configuration as conf
 from cinder.volume import driver
 from cinder.volume.drivers import lvm
+from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volutils
 
@@ -151,7 +152,7 @@ class VolumeTestCase(test.TestCase):
             'volume_type': None,
             'snapshot_id': None,
             'user_id': 'fake',
-            'launched_at': '',
+            'launched_at': 'DONTCARE',
             'size': 0,
         }
         self.assertDictMatch(msg['payload'], expected)
@@ -167,7 +168,7 @@ class VolumeTestCase(test.TestCase):
             'volume_type': None,
             'snapshot_id': None,
             'user_id': 'fake',
-            'launched_at': '',
+            'launched_at': 'DONTCARE',
             'size': 0,
         }
         self.assertDictMatch(msg['payload'], expected)
@@ -1376,14 +1377,10 @@ class VolumeTestCase(test.TestCase):
 
     def test_create_volume_from_unelevated_context(self):
         """Test context does't change after volume creation failure."""
-        def fake_create_volume(context, volume_ref, snapshot_ref,
-                               sourcevol_ref, image_service, image_id,
-                               image_location):
+        def fake_create_volume(*args, **kwargs):
             raise exception.CinderException('fake exception')
 
-        def fake_reschedule_or_error(context, volume_id, exc_info,
-                                     snapshot_id, image_id, request_spec,
-                                     filter_properties):
+        def fake_reschedule_or_error(self, context, *args, **kwargs):
             self.assertFalse(context.is_admin)
             self.assertFalse('admin' in context.roles)
             #compare context passed in with the context we saved
@@ -1398,10 +1395,9 @@ class VolumeTestCase(test.TestCase):
         #create one copy of context for future comparison
         self.saved_ctxt = ctxt.deepcopy()
 
-        self.stubs.Set(self.volume, '_reschedule_or_error',
+        self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
                        fake_reschedule_or_error)
-        self.stubs.Set(self.volume, '_create_volume',
-                       fake_create_volume)
+        self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
 
         volume_src = self._create_volume()
         self.assertRaises(exception.CinderException,
@@ -1490,13 +1486,6 @@ class VolumeTestCase(test.TestCase):
             db.volume_update(self.context, src_vref['id'], {'status': 'error'})
             raise exception.CinderException('fake exception')
 
-        def fake_reschedule_or_error(context, volume_id, exc_info,
-                                     snapshot_id, image_id, request_spec,
-                                     filter_properties):
-            pass
-
-        self.stubs.Set(self.volume, '_reschedule_or_error',
-                       fake_reschedule_or_error)
         self.stubs.Set(self.volume.driver, 'create_cloned_volume',
                        fake_error_create_cloned_volume)
         volume_src = self._create_volume()
index d26943837bd193b11fc21c1d5bd4bb2f6035bf4a..fa05e73e6904aa2dd054e512a4baab6c6f27b0e3 100644 (file)
@@ -86,6 +86,54 @@ def find_config(config_path):
     raise exception.ConfigNotFound(path=os.path.abspath(config_path))
 
 
+def as_int(obj, quiet=True):
+    # Try "2" -> 2
+    try:
+        return int(obj)
+    except (ValueError, TypeError):
+        pass
+    # Try "2.5" -> 2
+    try:
+        return int(float(obj))
+    except (ValueError, TypeError):
+        pass
+    # Eck, not sure what this is then.
+    if not quiet:
+        raise TypeError(_("Can not translate %s to integer.") % (obj))
+    return obj
+
+
+def check_exclusive_options(**kwargs):
+    """Checks that only one of the provided options is actually not-none.
+
+    Iterates over all the kwargs passed in and checks that only one of said
+    arguments is not-none, if more than one is not-none then an exception will
+    be raised with the names of those arguments who were not-none.
+    """
+
+    if not kwargs:
+        return
+
+    pretty_keys = kwargs.pop("pretty_keys", True)
+    exclusive_options = {}
+    for (k, v) in kwargs.iteritems():
+        if v is not None:
+            exclusive_options[k] = True
+
+    if len(exclusive_options) > 1:
+        # Change the format of the names from pythonic to
+        # something that is more readable.
+        #
+        # Ex: 'the_key' -> 'the key'
+        if pretty_keys:
+            names = [k.replace('_', ' ') for k in kwargs.keys()]
+        else:
+            names = kwargs.keys()
+        names = ", ".join(sorted(names))
+        msg = (_("May specify only one of %s") % (names))
+        raise exception.InvalidInput(reason=msg)
+
+
 def fetchfile(url, target):
     LOG.debug(_('Fetching %s') % url)
     execute('curl', '--fail', url, '-o', target)
index 25d3074bcb9bd0106e40300c8ae158fe375e546c..bf9d82db8862c0d687584ce888eed7e3a321cfa2 100644 (file)
@@ -37,9 +37,12 @@ from cinder import quota
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import units
 from cinder import utils
+from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import volume_types
 
+from cinder.taskflow import states
+
 
 volume_host_opt = cfg.BoolOpt('snapshot_same_host',
                               default=True,
@@ -56,7 +59,6 @@ CONF.register_opt(volume_same_az_opt)
 CONF.import_opt('storage_availability_zone', 'cinder.volume.manager')
 
 LOG = logging.getLogger(__name__)
-GB = units.GiB
 QUOTAS = quota.QUOTAS
 
 
@@ -95,248 +97,17 @@ class API(base.Base):
         self.availability_zone_names = ()
         super(API, self).__init__(db_driver)
 
-    def create(self, context, size, name, description, snapshot=None,
-               image_id=None, volume_type=None, metadata=None,
-               availability_zone=None, source_volume=None,
-               scheduler_hints=None):
-
-        exclusive_options = (snapshot, image_id, source_volume)
-        exclusive_options_set = sum(1 for option in
-                                    exclusive_options if option is not None)
-        if exclusive_options_set > 1:
-            msg = (_("May specify only one of snapshot, imageRef "
-                     "or source volume"))
-            raise exception.InvalidInput(reason=msg)
-
-        check_policy(context, 'create')
-        if snapshot is not None:
-            if snapshot['status'] != "available":
-                msg = _("status must be available")
-                raise exception.InvalidSnapshot(reason=msg)
-            if not size:
-                size = snapshot['volume_size']
-            elif size < snapshot['volume_size']:
-                msg = _("Volume size cannot be lesser than"
-                        " the Snapshot size")
-                raise exception.InvalidInput(reason=msg)
-            snapshot_id = snapshot['id']
-        else:
-            snapshot_id = None
-
-        if source_volume is not None:
-            if source_volume['status'] == "error":
-                msg = _("Unable to clone volumes that are in an error state")
-                raise exception.InvalidSourceVolume(reason=msg)
-            if not size:
-                size = source_volume['size']
-            else:
-                if size < source_volume['size']:
-                    msg = _("Clones currently must be "
-                            ">= original volume size.")
-                    raise exception.InvalidInput(reason=msg)
-            source_volid = source_volume['id']
-        else:
-            source_volid = None
-
-        def as_int(s):
-            try:
-                return int(s)
-            except (ValueError, TypeError):
-                return s
-
-        # tolerate size as stringified int
-        size = as_int(size)
-
-        if not isinstance(size, int) or size <= 0:
-            msg = (_("Volume size '%s' must be an integer and greater than 0")
-                   % size)
-            raise exception.InvalidInput(reason=msg)
-
-        if (image_id and not (source_volume or snapshot)):
-            # check image existence
-            image_meta = self.image_service.show(context, image_id)
-            image_size_in_gb = (int(image_meta['size']) + GB - 1) / GB
-            #check image size is not larger than volume size.
-            if image_size_in_gb > size:
-                msg = _('Size of specified image is larger than volume size.')
-                raise exception.InvalidInput(reason=msg)
-            # Check image minDisk requirement is met for the particular volume
-            if size < image_meta.get('min_disk', 0):
-                msg = _('Image minDisk size is larger than the volume size.')
-                raise exception.InvalidInput(reason=msg)
-
-        if availability_zone is None:
-            if snapshot is not None:
-                availability_zone = snapshot['volume']['availability_zone']
-            elif source_volume is not None:
-                availability_zone = source_volume['availability_zone']
-            else:
-                availability_zone = CONF.storage_availability_zone
-        else:
-            self._check_availabilty_zone(availability_zone)
-
-        if CONF.cloned_volume_same_az:
-            if (snapshot and
-                snapshot['volume']['availability_zone'] !=
-                    availability_zone):
-                msg = _("Volume must be in the same "
-                        "availability zone as the snapshot")
-                raise exception.InvalidInput(reason=msg)
-            elif source_volume and \
-                    source_volume['availability_zone'] != availability_zone:
-                msg = _("Volume must be in the same "
-                        "availability zone as the source volume")
-                raise exception.InvalidInput(reason=msg)
-
-        if not volume_type and not source_volume:
-            volume_type = volume_types.get_default_volume_type()
-
-        if not volume_type and source_volume:
-            volume_type_id = source_volume['volume_type_id']
-        else:
-            volume_type_id = volume_type.get('id')
-
-        try:
-            reserve_opts = {'volumes': 1, 'gigabytes': size}
-            QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
-            reservations = QUOTAS.reserve(context, **reserve_opts)
-        except exception.OverQuota as e:
-            overs = e.kwargs['overs']
-            usages = e.kwargs['usages']
-            quotas = e.kwargs['quotas']
-
-            def _consumed(name):
-                return (usages[name]['reserved'] + usages[name]['in_use'])
-
-            for over in overs:
-                if 'gigabytes' in over:
-                    msg = _("Quota exceeded for %(s_pid)s, tried to create "
-                            "%(s_size)sG volume (%(d_consumed)dG of "
-                            "%(d_quota)dG already consumed)")
-                    LOG.warn(msg % {'s_pid': context.project_id,
-                                    's_size': size,
-                                    'd_consumed': _consumed(over),
-                                    'd_quota': quotas[over]})
-                    raise exception.VolumeSizeExceedsAvailableQuota()
-                elif 'volumes' in over:
-                    msg = _("Quota exceeded for %(s_pid)s, tried to create "
-                            "volume (%(d_consumed)d volumes"
-                            "already consumed)")
-                    LOG.warn(msg % {'s_pid': context.project_id,
-                                    'd_consumed': _consumed(over)})
-                    raise exception.VolumeLimitExceeded(allowed=quotas[over])
-
-        self._check_metadata_properties(context, metadata)
-        options = {'size': size,
-                   'user_id': context.user_id,
-                   'project_id': context.project_id,
-                   'snapshot_id': snapshot_id,
-                   'availability_zone': availability_zone,
-                   'status': "creating",
-                   'attach_status': "detached",
-                   'display_name': name,
-                   'display_description': description,
-                   'volume_type_id': volume_type_id,
-                   'metadata': metadata,
-                   'source_volid': source_volid}
-
-        try:
-            volume = self.db.volume_create(context, options)
-            QUOTAS.commit(context, reservations)
-        except Exception:
-            with excutils.save_and_reraise_exception():
-                try:
-                    self.db.volume_destroy(context, volume['id'])
-                finally:
-                    QUOTAS.rollback(context, reservations)
-
-        request_spec = {'volume_properties': options,
-                        'volume_type': volume_type,
-                        'volume_id': volume['id'],
-                        'snapshot_id': volume['snapshot_id'],
-                        'image_id': image_id,
-                        'source_volid': volume['source_volid']}
-
-        if scheduler_hints:
-            filter_properties = {'scheduler_hints': scheduler_hints}
-        else:
-            filter_properties = {}
-
-        self._cast_create_volume(context, request_spec, filter_properties)
-
-        return volume
-
-    def _cast_create_volume(self, context, request_spec, filter_properties):
-
-        # NOTE(Rongze Zhu): It is a simple solution for bug 1008866
-        # If snapshot_id is set, make the call create volume directly to
-        # the volume host where the snapshot resides instead of passing it
-        # through the scheduler. So snapshot can be copy to new volume.
-
-        source_volid = request_spec['source_volid']
-        volume_id = request_spec['volume_id']
-        snapshot_id = request_spec['snapshot_id']
-        image_id = request_spec['image_id']
-
-        if snapshot_id and CONF.snapshot_same_host:
-            snapshot_ref = self.db.snapshot_get(context, snapshot_id)
-            source_volume_ref = self.db.volume_get(context,
-                                                   snapshot_ref['volume_id'])
-            now = timeutils.utcnow()
-            values = {'host': source_volume_ref['host'], 'scheduled_at': now}
-            volume_ref = self.db.volume_update(context, volume_id, values)
-
-            # bypass scheduler and send request directly to volume
-            self.volume_rpcapi.create_volume(
-                context,
-                volume_ref,
-                volume_ref['host'],
-                request_spec=request_spec,
-                filter_properties=filter_properties,
-                allow_reschedule=False,
-                snapshot_id=snapshot_id,
-                image_id=image_id)
-        elif source_volid:
-            source_volume_ref = self.db.volume_get(context,
-                                                   source_volid)
-            now = timeutils.utcnow()
-            values = {'host': source_volume_ref['host'], 'scheduled_at': now}
-            volume_ref = self.db.volume_update(context, volume_id, values)
-
-            # bypass scheduler and send request directly to volume
-            self.volume_rpcapi.create_volume(
-                context,
-                volume_ref,
-                volume_ref['host'],
-                request_spec=request_spec,
-                filter_properties=filter_properties,
-                allow_reschedule=False,
-                snapshot_id=snapshot_id,
-                image_id=image_id,
-                source_volid=source_volid)
-        else:
-            self.scheduler_rpcapi.create_volume(
-                context,
-                CONF.volume_topic,
-                volume_id,
-                snapshot_id,
-                image_id,
-                request_spec=request_spec,
-                filter_properties=filter_properties)
-
-    def _check_availabilty_zone(self, availability_zone):
+    def _valid_availabilty_zone(self, availability_zone):
         #NOTE(bcwaldon): This approach to caching fails to handle the case
         # that an availability zone is disabled/removed.
         if availability_zone in self.availability_zone_names:
-            return
+            return True
+        if CONF.storage_availability_zone == availability_zone:
+            return True
 
         azs = self.list_availability_zones()
         self.availability_zone_names = [az['name'] for az in azs]
-
-        if availability_zone not in self.availability_zone_names:
-            msg = _("Availability zone is invalid")
-            LOG.warn(msg)
-            raise exception.InvalidInput(reason=msg)
+        return availability_zone in self.availability_zone_names
 
     def list_availability_zones(self):
         """Describe the known availability zones
@@ -358,6 +129,56 @@ class API(base.Base):
 
         return tuple(azs)
 
+    def create(self, context, size, name, description, snapshot=None,
+               image_id=None, volume_type=None, metadata=None,
+               availability_zone=None, source_volume=None,
+               scheduler_hints=None):
+
+        def check_volume_az_zone(availability_zone):
+            try:
+                return self._valid_availabilty_zone(availability_zone)
+            except exception.CinderException:
+                LOG.exception(_("Unable to query if %s is in the "
+                                "availability zone set"), availability_zone)
+                return False
+
+        create_what = {
+            'size': size,
+            'name': name,
+            'description': description,
+            'snapshot': snapshot,
+            'image_id': image_id,
+            'volume_type': volume_type,
+            'metadata': metadata,
+            'availability_zone': availability_zone,
+            'source_volume': source_volume,
+            'scheduler_hints': scheduler_hints,
+        }
+        (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
+                                                  self.volume_rpcapi,
+                                                  self.db,
+                                                  self.image_service,
+                                                  check_volume_az_zone,
+                                                  create_what)
+
+        assert flow, _('Create volume flow not retrieved')
+        flow.run(context)
+        if flow.state != states.SUCCESS:
+            raise exception.CinderException(_("Failed to successfully complete"
+                                              " create volume workflow"))
+
+        # Extract the volume information from the task uuid that was specified
+        # to produce said information.
+        volume = None
+        try:
+            volume = flow.results[uuid]['volume']
+        except KeyError:
+            pass
+
+        # Raise an error, nobody provided it??
+        assert volume, _('Expected volume result not found')
+        return volume
+
     @wrap_check_policy
     def delete(self, context, volume, force=False):
         if context.is_admin and context.project_id != volume['project_id']:
diff --git a/cinder/volume/flows/__init__.py b/cinder/volume/flows/__init__.py
new file mode 100644 (file)
index 0000000..830dd2e
--- /dev/null
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/cinder/volume/flows/create_volume.py b/cinder/volume/flows/create_volume.py
new file mode 100644 (file)
index 0000000..7022545
--- /dev/null
@@ -0,0 +1,1691 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#    Copyright (c) 2013 OpenStack, LLC.
+#    Copyright 2010 United States Government as represented by the
+#    Administrator of the National Aeronautics and Space Administration.
+#    All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import traceback
+
+from oslo.config import cfg
+
+from cinder import exception
+from cinder.image import glance
+from cinder import policy
+from cinder import quota
+from cinder import units
+from cinder import utils
+
+from cinder.openstack.common import excutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.notifier import api as notifier
+from cinder.openstack.common import timeutils
+from cinder.volume import utils as volume_utils
+from cinder.volume import volume_types
+
+from cinder.taskflow import decorators
+from cinder.taskflow.patterns import linear_flow
+from cinder.taskflow import task
+
+LOG = logging.getLogger(__name__)
+
+ACTION = 'volume:create'
+CONF = cfg.CONF
+GB = units.GiB
+QUOTAS = quota.QUOTAS
+
+# Only in these 'sources' status can we attempt to create a volume from a
+# source volume or a source snapshot, other status states we can not create
+# from, 'error' being the common example.
+PROCEED_STATUS = ('available',)
+
+# When a volume errors out we have the ability to save a piece of the exception
+# that caused said failure, but we don't want to save the whole message since
+# that could be very large, just save up to this number of characters.
+REASON_LENGTH = 128
+
+# These attributes we will attempt to save for the volume if they exist
+# in the source image metadata.
+IMAGE_ATTRIBUTES = (
+    'checksum',
+    'container_format',
+    'disk_format',
+    'min_disk',
+    'min_ram',
+    'size',
+)
+
+
+def _make_pretty_name(method):
+    """Makes a pretty name for a function/method."""
+    meth_pieces = [method.__name__]
+    # If its an instance method attempt to tack on the class name
+    if hasattr(method, 'im_self') and method.im_self is not None:
+        try:
+            meth_pieces.insert(0, method.im_self.__class__.__name__)
+        except AttributeError:
+            pass
+    return ".".join(meth_pieces)
+
+
+def _find_result_spec(flow):
+    """Find the last task that produced a valid volume_spec and returns it."""
+    for there_result in flow.results.values():
+        if not there_result or not 'volume_spec' in there_result:
+            continue
+        if there_result['volume_spec']:
+            return there_result['volume_spec']
+    return None
+
+
+def _make_task_name(klass, addons=None):
+    """Makes a pretty name for a task class."""
+    components = [klass.__module__, klass.__name__]
+    if addons:
+        for a in addons:
+            components.append(str(a))
+    return "%s#%s" % (ACTION, ".".join(components))
+
+
+def _restore_source_status(context, db, volume_spec):
+    # NOTE(harlowja): Only if the type of the volume that was being created is
+    # the source volume type should we try to reset the source volume status
+    # back to its original value.
+    if not volume_spec or volume_spec.get('type') != 'source_vol':
+        return
+    source_volid = volume_spec['source_volid']
+    source_status = volume_spec['source_volstatus']
+    try:
+        LOG.debug(_('Restoring source %(source_volid)s status to %(status)s') %
+                  {'status': source_status, 'source_volid': source_volid})
+        db.volume_update(context, source_volid, {'status': source_status})
+    except exception.CinderException:
+        # NOTE(harlowja): Don't let this cause further exceptions since this is
+        # a non-critical failure.
+        LOG.exception(_("Failed setting source volume %(source_volid)s back to"
+                        " its initial %(source_status)s status") %
+                      {'source_status': source_status,
+                       'source_volid': source_volid})
+
+
+def _error_out_volume(context, db, volume_id, reason=None):
+
+    def _clean_reason(reason):
+        if reason is None:
+            return '???'
+        reason = str(reason)
+        if len(reason) <= REASON_LENGTH:
+            return reason
+        else:
+            return reason[0:REASON_LENGTH] + '...'
+
+    update = {
+        'status': 'error',
+    }
+    reason = _clean_reason(reason)
+    # TODO(harlowja): re-enable when we can support this in the database.
+    # if reason:
+    #     status['details'] = reason
+    try:
+        LOG.debug(_('Updating volume: %(volume_id)s with %(update)s'
+                    ' due to: %(reason)s') % {'volume_id': volume_id,
+                                              'reason': reason,
+                                              'update': update})
+        db.volume_update(context, volume_id, update)
+    except exception.CinderException:
+        # Don't let this cause further exceptions.
+        LOG.exception(_("Failed updating volume %(volume_id)s with"
+                        " %(update)s") % {'volume_id': volume_id,
+                                          'update': update})
+
+
+class CinderTask(task.Task):
+    """The root task class for all cinder tasks.
+
+    It automatically names the given task using the module and class that
+    implement the given task as the task name.
+
+    TODO(harlowja): this likely should be moved later to a common folder in the
+    future when more of cinders flows are implemented.
+    """
+
+    def __init__(self, addons=None):
+        super(CinderTask, self).__init__(_make_task_name(self.__class__,
+                                                         addons))
+
+
+class ValuesInjectTask(CinderTask):
+    """This injects a dict into the flow.
+
+    This injection is done so that the keys (and values) provided can be
+    dependended on by tasks further down the line. Since taskflow is dependency
+    based this can be considered the bootstrapping task that provides an
+    initial set of values for other tasks to get started with. If this did not
+    exist then tasks would fail locating there dependent tasks and the values
+    said dependent tasks produce.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, inject_what):
+        super(ValuesInjectTask, self).__init__()
+        self.provides.update(inject_what.keys())
+        self._inject = inject_what
+
+    def __call__(self, context):
+        return dict(self._inject)
+
+
+class ExtractVolumeRequestTask(CinderTask):
+    """Processes an api request values into a validated set of values.
+
+    This tasks responsibility is to take in a set of inputs that will form
+    a potential volume request and validates those values against a set of
+    conditions and/or translates those values into a valid set and then returns
+    the validated/translated values for use by other tasks.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, image_service, az_check_functor=None):
+        super(ExtractVolumeRequestTask, self).__init__()
+        # This task will produce the following outputs (said outputs can be
+        # saved to durable storage in the future so that the flow can be
+        # reconstructed elsewhere and continued).
+        self.provides.update(['availability_zone', 'size', 'snapshot_id',
+                              'source_volid', 'volume_type', 'volume_type_id'])
+        # This task requires the following inputs to operate (provided
+        # automatically to __call__(). This is done so that the flow can
+        # be reconstructed elsewhere and continue running (in the future).
+        #
+        # It also is used to be able to link tasks that produce items to tasks
+        # that consume items (thus allowing the linking of the flow to be
+        # mostly automatic).
+        self.requires.update(['availability_zone', 'image_id', 'metadata',
+                              'size', 'snapshot', 'source_volume',
+                              'volume_type'])
+        self.image_service = image_service
+        self.az_check_functor = az_check_functor
+        if not self.az_check_functor:
+            self.az_check_functor = lambda az: True
+
+    @staticmethod
+    def _extract_snapshot(snapshot):
+        """Extracts the snapshot id from the provided snapshot (if provided).
+
+        This function validates the input snapshot dict and checks that the
+        status of that snapshot is valid for creating a volume from.
+        """
+
+        snapshot_id = None
+        if snapshot is not None:
+            if snapshot['status'] not in PROCEED_STATUS:
+                msg = _("Originating snapshot status must be one"
+                        " of %s values")
+                msg = msg % (", ".join(PROCEED_STATUS))
+                # TODO(harlowja): what happens if the status changes after this
+                # initial snapshot status check occurs??? Seems like someone
+                # could delete the snapshot after this check passes but before
+                # the volume is offically created?
+                raise exception.InvalidSnapshot(reason=msg)
+            snapshot_id = snapshot['id']
+        return snapshot_id
+
+    @staticmethod
+    def _extract_source_volume(source_volume):
+        """Extracts the volume id from the provided volume (if provided).
+
+        This function validates the input source_volume dict and checks that
+        the status of that source_volume is valid for creating a volume from.
+        """
+
+        source_volid = None
+        if source_volume is not None:
+            if source_volume['status'] not in PROCEED_STATUS:
+                msg = _("Unable to create a volume from an originating source"
+                        " volume when its status is not one of %s"
+                        " values")
+                msg = msg % (", ".join(PROCEED_STATUS))
+                # TODO(harlowja): what happens if the status changes after this
+                # initial volume status check occurs??? Seems like someone
+                # could delete the volume after this check passes but before
+                # the volume is offically created?
+                raise exception.InvalidVolume(reason=msg)
+            source_volid = source_volume['id']
+        return source_volid
+
+    @staticmethod
+    def _extract_size(size, source_volume, snapshot):
+        """Extracts and validates the volume size.
+
+        This function will validate or when not provided fill in the provided
+        size variable from the source_volume or snapshot and then does
+        validation on the size that is found and returns said validated size.
+        """
+
+        def validate_snap_size(size):
+            if snapshot and size < snapshot['volume_size']:
+                msg = _("Volume size %(size)s cannot be lesser than"
+                        " the snapshot size %(snap_size)s. "
+                        "They must be >= original snapshot size.")
+                msg = msg % {'size': size,
+                             'snap_size': snapshot['volume_size']}
+                raise exception.InvalidInput(reason=msg)
+
+        def validate_source_size(size):
+            if source_volume and size < source_volume['size']:
+                msg = _("Clones currently disallowed when "
+                        "%(size)s < %(source_size)s. "
+                        "They must be >= original volume size.")
+                msg = msg % {'size': size,
+                             'source_size': source_volume['size']}
+                raise exception.InvalidInput(reason=msg)
+
+        def validate_int(size):
+            if not isinstance(size, int) or size <= 0:
+                msg = _("Volume size %(size)s must be an integer and"
+                        " greater than 0") % {'size': size}
+                raise exception.InvalidInput(reason=msg)
+
+        # Figure out which validation functions we should be applying
+        # on the size value that we extract.
+        validator_functors = [validate_int]
+        if source_volume:
+            validator_functors.append(validate_source_size)
+        elif snapshot:
+            validator_functors.append(validate_snap_size)
+
+        # If the size is not provided then try to provide it.
+        if not size and source_volume:
+            size = source_volume['size']
+        elif not size and snapshot:
+            size = snapshot['volume_size']
+
+        size = utils.as_int(size)
+        LOG.debug("Validating volume %(size)s using %(functors)s" %
+                  {'size': size,
+                   'functors': ", ".join([_make_pretty_name(func)
+                                          for func in validator_functors])})
+        for func in validator_functors:
+            func(size)
+        return size
+
+    def _check_image_metadata(self, context, image_id, size):
+        """Checks image existence and validates that the image metadata."""
+
+        # Check image existence
+        if not image_id:
+            return
+
+        # NOTE(harlowja): this should raise an error if the image does not
+        # exist, this is expected as it signals that the image_id is missing.
+        image_meta = self.image_service.show(context, image_id)
+
+        # Check image size is not larger than volume size.
+        image_size = utils.as_int(image_meta['size'], quiet=False)
+        image_size_in_gb = (image_size + GB - 1) / GB
+        if image_size_in_gb > size:
+            msg = _('Size of specified image %(image_size)s'
+                    ' is larger than volume size %(volume_size)s.')
+            msg = msg % {'image_size': image_size_in_gb, 'volume_size': size}
+            raise exception.InvalidInput(reason=msg)
+
+        # Check image min_disk requirement is met for the particular volume
+        min_disk = image_meta.get('min_disk', 0)
+        if size < min_disk:
+            msg = _('Image minDisk size %(min_disk)s is larger'
+                    ' than the volume size %(volume_size)s.')
+            msg = msg % {'min_disk': min_disk, 'volume_size': size}
+            raise exception.InvalidInput(reason=msg)
+
+    @staticmethod
+    def _check_metadata_properties(metadata=None):
+        """Checks that the volume metadata properties are valid."""
+
+        if not metadata:
+            metadata = {}
+
+        for (k, v) in metadata.iteritems():
+            if len(k) == 0:
+                msg = _("Metadata property key blank")
+                LOG.warn(msg)
+                raise exception.InvalidVolumeMetadata(reason=msg)
+            if len(k) > 255:
+                msg = _("Metadata property key %s greater than 255 "
+                        "characters") % k
+                LOG.warn(msg)
+                raise exception.InvalidVolumeMetadataSize(reason=msg)
+            if len(v) > 255:
+                msg = _("Metadata property key %s value greater than"
+                        " 255 characters") % k
+                LOG.warn(msg)
+                raise exception.InvalidVolumeMetadataSize(reason=msg)
+
+    def _extract_availability_zone(self, availability_zone, snapshot,
+                                   source_volume):
+        """Extracts and returns a validated availability zone.
+
+        This function will extract the availability zone (if not provided) from
+        the snapshot or source_volume and then performs a set of validation
+        checks on the provided or extracted availability zone and then returns
+        the validated availability zone.
+        """
+
+        # Try to extract the availability zone from the corresponding snapshot
+        # or source volume if either is valid so that we can be in the same
+        # availability zone as the source.
+        if availability_zone is None:
+            if snapshot:
+                try:
+                    availability_zone = snapshot['volume']['availability_zone']
+                except (TypeError, KeyError):
+                    pass
+            if source_volume and availability_zone is None:
+                try:
+                    availability_zone = source_volume['availability_zone']
+                except (TypeError, KeyError):
+                    pass
+
+        if availability_zone is None:
+            availability_zone = CONF.storage_availability_zone
+        if not self.az_check_functor(availability_zone):
+            msg = _("Availability zone '%s' is invalid") % (availability_zone)
+            LOG.warn(msg)
+            raise exception.InvalidInput(reason=msg)
+
+        # If the configuration only allows cloning to the same availability
+        # zone then we need to enforce that.
+        if CONF.cloned_volume_same_az:
+            snap_az = None
+            try:
+                snap_az = snapshot['volume']['availability_zone']
+            except (TypeError, KeyError):
+                pass
+            if snap_az and snap_az != availability_zone:
+                msg = _("Volume must be in the same "
+                        "availability zone as the snapshot")
+                raise exception.InvalidInput(reason=msg)
+            source_vol_az = None
+            try:
+                source_vol_az = source_volume['availability_zone']
+            except (TypeError, KeyError):
+                pass
+            if source_vol_az and source_vol_az != availability_zone:
+                msg = _("Volume must be in the same "
+                        "availability zone as the source volume")
+                raise exception.InvalidInput(reason=msg)
+
+        return availability_zone
+
+    def __call__(self, context, size, snapshot, image_id, source_volume,
+                 availability_zone, volume_type, metadata):
+
+        utils.check_exclusive_options(snapshot=snapshot,
+                                      imageRef=image_id,
+                                      source_volume=source_volume)
+        policy.enforce_action(context, ACTION)
+
+        # TODO(harlowja): what guarantee is there that the snapshot or source
+        # volume will remain available after we do this initial verification??
+        snapshot_id = self._extract_snapshot(snapshot)
+        source_volid = self._extract_source_volume(source_volume)
+        size = self._extract_size(size, source_volume, snapshot)
+
+        self._check_image_metadata(context, image_id, size)
+
+        availability_zone = self._extract_availability_zone(availability_zone,
+                                                            snapshot,
+                                                            source_volume)
+
+        if not volume_type and not source_volume:
+            volume_type = volume_types.get_default_volume_type()
+        if not volume_type and source_volume:
+            volume_type_id = source_volume['volume_type_id']
+        else:
+            volume_type_id = volume_type.get('id')
+
+        self._check_metadata_properties(metadata)
+
+        return {
+            'size': size,
+            'snapshot_id': snapshot_id,
+            'source_volid': source_volid,
+            'availability_zone': availability_zone,
+            'volume_type': volume_type,
+            'volume_type_id': volume_type_id,
+        }
+
+
+class EntryCreateTask(CinderTask):
+    """Creates an entry for the given volume creation in the database.
+
+    Reversion strategy: remove the volume_id created from the database.
+    """
+
+    def __init__(self, db):
+        super(EntryCreateTask, self).__init__()
+        self.db = db
+        self.requires.update(['availability_zone', 'description', 'metadata',
+                              'name', 'reservations', 'size', 'snapshot_id',
+                              'source_volid', 'volume_type_id'])
+        self.provides.update(['volume_properties', 'volume_id'])
+
+    def __call__(self, context, **kwargs):
+        """Creates a database entry for the given inputs and returns details.
+
+        Accesses the database and creates a new entry for the to be created
+        volume using the given volume properties which are extracted from the
+        input kwargs (and associated requirements this task needs). These
+        requirements should be previously satisifed and validated by a
+        pre-cursor task.
+        """
+
+        volume_properties = {
+            'size': kwargs.pop('size'),
+            'user_id': context.user_id,
+            'project_id': context.project_id,
+            'status': 'creating',
+            'attach_status': 'detached',
+            # Rename these to the internal name.
+            'display_description': kwargs.pop('description'),
+            'display_name': kwargs.pop('name'),
+        }
+
+        # Merge in the other required arguments which should provide the rest
+        # of the volume property fields (if applicable).
+        volume_properties.update(kwargs)
+        volume = self.db.volume_create(context, volume_properties)
+
+        return {
+            'volume_id': volume['id'],
+            'volume_properties': volume_properties,
+            # NOTE(harlowja): it appears like further usage of this volume
+            # result actually depend on it being a sqlalchemy object and not
+            # just a plain dictionary so thats why we are storing this here.
+            #
+            # In the future where this task results can be serialized and
+            # restored automatically for continued running we will need to
+            # resolve the serialization & recreation of this object since raw
+            # sqlalchemy objects can't be serialized.
+            'volume': volume,
+        }
+
+    def revert(self, context, result, cause):
+        # We never produced a result and therefore can't destroy anything.
+        if not result:
+            return
+        vol_id = result['volume_id']
+        try:
+            self.db.volume_destroy(context, vol_id)
+        except exception.CinderException:
+            # We are already reverting, therefore we should silence this
+            # exception since a second exception being active will be bad.
+            #
+            # NOTE(harlowja): Being unable to destroy a volume is pretty
+            # bad though!!
+            LOG.exception(_("Failed destroying volume entry %s"), vol_id)
+
+
+class QuotaReserveTask(CinderTask):
+    """Reserves a single volume with the given size & the given volume type.
+
+    Reversion strategy: rollback the quota reservation.
+
+    Warning Warning: if the process that is running this reserve and commit
+    process fails (or is killed before the quota is rolled back or commited
+    it does appear like the quota will never be rolled back). This makes
+    software upgrades hard (inflight operations will need to be stopped or
+    allowed to complete before the upgrade can occur). *In the future* when
+    taskflow has persistence built-in this should be easier to correct via
+    an automated or manual process.
+    """
+
+    def __init__(self):
+        super(QuotaReserveTask, self).__init__()
+        self.requires.update(['size', 'volume_type_id'])
+        self.provides.update(['reservations'])
+
+    def __call__(self, context, size, volume_type_id):
+        try:
+            reserve_opts = {'volumes': 1, 'gigabytes': size}
+            QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
+            reservations = QUOTAS.reserve(context, **reserve_opts)
+            return {
+                'reservations': reservations,
+            }
+        except exception.OverQuota as e:
+            overs = e.kwargs['overs']
+            quotas = e.kwargs['quotas']
+            usages = e.kwargs['usages']
+
+            def _consumed(name):
+                return (usages[name]['reserved'] + usages[name]['in_use'])
+
+            def _is_over(name):
+                for over in overs:
+                    if name in over:
+                        return True
+                return False
+
+            if _is_over('gigabytes'):
+                msg = _("Quota exceeded for %(s_pid)s, tried to create "
+                        "%(s_size)sG volume (%(d_consumed)dG "
+                        "of %(d_quota)dG already consumed)")
+                LOG.warn(msg % {'s_pid': context.project_id,
+                                's_size': size,
+                                'd_consumed': _consumed('gigabytes'),
+                                'd_quota': quotas['gigabytes']})
+                raise exception.VolumeSizeExceedsAvailableQuota()
+            elif _is_over('volumes'):
+                msg = _("Quota exceeded for %(s_pid)s, tried to create "
+                        "volume (%(d_consumed)d volumes "
+                        "already consumed)")
+                LOG.warn(msg % {'s_pid': context.project_id,
+                                'd_consumed': _consumed('volumes')})
+                allowed = quotas['volumes']
+                raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
+            else:
+                # If nothing was reraised, ensure we reraise the initial error
+                raise
+
+    def revert(self, context, result, cause):
+        # We never produced a result and therefore can't destroy anything.
+        if not result:
+            return
+        # We actually produced an output that we can revert so lets attempt
+        # to use said output to rollback the reservation.
+        reservations = result['reservations']
+        try:
+            QUOTAS.rollback(context, reservations)
+        except exception.CinderException:
+            # We are already reverting, therefore we should silence this
+            # exception since a second exception being active will be bad.
+            LOG.exception(_("Failed rolling back quota for"
+                            " %s reservations"), reservations)
+
+
+class QuotaCommitTask(CinderTask):
+    """Commits the reservation.
+
+    Reversion strategy: N/A (the rollback will be handled by the task that did
+    the initial reservation (see: QuotaReserveTask).
+
+    Warning Warning: if the process that is running this reserve and commit
+    process fails (or is killed before the quota is rolled back or commited
+    it does appear like the quota will never be rolled back). This makes
+    software upgrades hard (inflight operations will need to be stopped or
+    allowed to complete before the upgrade can occur). *In the future* when
+    taskflow has persistence built-in this should be easier to correct via
+    an automated or manual process.
+    """
+
+    def __init__(self):
+        super(QuotaCommitTask, self).__init__()
+        self.requires.update(['reservations'])
+
+    def __call__(self, context, reservations):
+        QUOTAS.commit(context, reservations)
+
+
+class VolumeCastTask(CinderTask):
+    """Performs a volume create cast to the scheduler or to the volume manager.
+
+    This which will signal a transition of the api workflow to another child
+    and/or related workflow on another component.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
+        super(VolumeCastTask, self).__init__()
+        self.volume_rpcapi = volume_rpcapi
+        self.scheduler_rpcapi = scheduler_rpcapi
+        self.db = db
+        self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
+                              'source_volid', 'volume_id', 'volume_type',
+                              'volume_properties'])
+
+    def _cast_create_volume(self, context, request_spec, filter_properties):
+        # NOTE(Rongze Zhu): A simple solution for bug 1008866.
+        #
+        # If snapshot_id is set, make the call create volume directly to
+        # the volume host where the snapshot resides instead of passing it
+        # through the scheduler. So snapshot can be copy to new volume.
+        source_volid = request_spec['source_volid']
+        volume_id = request_spec['volume_id']
+        snapshot_id = request_spec['snapshot_id']
+        image_id = request_spec['image_id']
+        host = None
+
+        if snapshot_id and CONF.snapshot_same_host:
+            snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+            source_volume_ref = self.db.volume_get(context,
+                                                   snapshot_ref['volume_id'])
+            host = source_volume_ref['host']
+        elif source_volid:
+            source_volume_ref = self.db.volume_get(context, source_volid)
+            host = source_volume_ref['host']
+
+        if not host:
+            # Cast to the scheduler and let it handle whatever is needed
+            # to select the target host for this volume.
+            self.scheduler_rpcapi.create_volume(
+                context,
+                CONF.volume_topic,
+                volume_id,
+                snapshot_id,
+                image_id,
+                request_spec=request_spec,
+                filter_properties=filter_properties)
+        else:
+            # Bypass the scheduler and send the request directly to the volume
+            # manager.
+            now = timeutils.utcnow()
+            values = {'host': host, 'scheduled_at': now}
+            volume_ref = self.db.volume_update(context, volume_id, values)
+            self.volume_rpcapi.create_volume(
+                context,
+                volume_ref,
+                volume_ref['host'],
+                request_spec=request_spec,
+                filter_properties=filter_properties,
+                allow_reschedule=False,
+                snapshot_id=snapshot_id,
+                image_id=image_id)
+
+    def __call__(self, context, **kwargs):
+        scheduler_hints = kwargs.pop('scheduler_hints', None)
+        request_spec = kwargs.copy()
+        filter_properties = {}
+        if scheduler_hints:
+            filter_properties['scheduler_hints'] = scheduler_hints
+        self._cast_create_volume(context, request_spec, filter_properties)
+
+
+class OnFailureChangeStatusTask(CinderTask):
+    """Helper task that sets a volume id to status error.
+
+    Reversion strategy: On failure of any flow that includes this task the
+    volume id that is associated with this task will be have its status set
+    to error. If a volume specification is provided and the type of that spec
+    is a source volume said source volume will have its status status updated
+    as well.
+    """
+
+    def __init__(self, db):
+        super(OnFailureChangeStatusTask, self).__init__()
+        self.db = db
+        self.requires.update(['volume_id'])
+        self.optional.update(['volume_spec'])
+
+    def __call__(self, context, volume_id, volume_spec=None):
+        # Save these items since we only use them if a reversion is triggered.
+        return {
+            'volume_id': volume_id,
+            'volume_spec': volume_spec,
+        }
+
+    def revert(self, context, result, cause):
+        volume_spec = result.get('volume_spec')
+        if not volume_spec:
+            # Attempt to use it from a later task that *should* have populated
+            # this from the database. It is not needed to be found since
+            # reverting will continue without it.
+            volume_spec = _find_result_spec(cause.flow)
+
+        # Restore the source volume status and set the volume to error status.
+        volume_id = result['volume_id']
+        _restore_source_status(context, self.db, volume_spec)
+        _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+        LOG.error(_("Volume %s: create failed"), volume_id)
+        exc_info = False
+        if all(cause.exc_info):
+            exc_info = cause.exc_info
+        LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
+
+class OnFailureRescheduleTask(CinderTask):
+    """Triggers a rescheduling request to be sent when reverting occurs.
+
+    Reversion strategy: Triggers the rescheduling mechanism whereby a cast gets
+    sent to the scheduler rpc api to allow for an attempt X of Y for scheduling
+    this volume elsewhere.
+    """
+
+    def __init__(self, reschedule_context, db, scheduler_rpcapi):
+        super(OnFailureRescheduleTask, self).__init__()
+        self.requires.update(['filter_properties', 'image_id', 'request_spec',
+                              'snapshot_id', 'volume_id'])
+        self.optional.update(['volume_spec'])
+        self.scheduler_rpcapi = scheduler_rpcapi
+        self.db = db
+        self.reschedule_context = reschedule_context
+        # These exception types will trigger the volume to be set into error
+        # status rather than being rescheduled.
+        self.no_reschedule_types = [
+            # The volume has already finished being created when the exports
+            # occur, rescheduling would be bad if it happened due to exports
+            # not succeeding.
+            exception.ExportFailure,
+            # Image copying happens after volume creation so rescheduling due
+            # to copy failure will mean the same volume will be created at
+            # another place when it still exists locally.
+            exception.ImageCopyFailure,
+            # Metadata updates happen after the volume has been created so if
+            # they fail, rescheduling will likely attempt to create the volume
+            # on another machine when it still exists locally.
+            exception.MetadataCopyFailure,
+            exception.MetadataCreateFailure,
+            exception.MetadataUpdateFailure,
+            # The volume/snapshot has been removed from the database, that
+            # can not be fixed by rescheduling.
+            exception.VolumeNotFound,
+            exception.SnapshotNotFound,
+            exception.VolumeTypeNotFound,
+        ]
+
+    def _is_reschedulable(self, cause):
+        (exc_type, value, traceback) = cause.exc_info
+        if not exc_type and cause.exc:
+            exc_type = type(cause.exc)
+        if not exc_type:
+            return True
+        if exc_type in self.no_reschedule_types:
+            return False
+        return True
+
+    def __call__(self, context, *args, **kwargs):
+        # Save these items since we only use them if a reversion is triggered.
+        return kwargs.copy()
+
+    def _reschedule(self, context, cause, request_spec, filter_properties,
+                    snapshot_id, image_id, volume_id, **kwargs):
+        """Actions that happen during the rescheduling attempt occur here."""
+
+        create_volume = self.scheduler_rpcapi.create_volume
+        if not filter_properties:
+            filter_properties = {}
+        if 'retry' not in filter_properties:
+            filter_properties['retry'] = {}
+
+        retry_info = filter_properties['retry']
+        num_attempts = retry_info.get('num_attempts', 0)
+        request_spec['volume_id'] = volume_id
+
+        LOG.debug(_("Volume %(volume_id)s: re-scheduling %(method)s "
+                    "attempt %(num)d due to %(reason)s") %
+                  {'volume_id': volume_id,
+                   'method': _make_pretty_name(create_volume),
+                   'num': num_attempts, 'reason': unicode(cause.exc)})
+
+        if all(cause.exc_info):
+            # Stringify to avoid circular ref problem in json serialization
+            retry_info['exc'] = traceback.format_exception(*cause.exc_info)
+
+        return create_volume(context, CONF.volume_topic, volume_id,
+                             snapshot_id, image_id, request_spec,
+                             filter_properties)
+
+    def _post_reschedule(self, context, volume_id):
+        """Actions that happen after the rescheduling attempt occur here."""
+
+        LOG.debug(_("Volume %s: re-scheduled"), volume_id)
+
+    def _pre_reschedule(self, context, volume_id):
+        """Actions that happen before the rescheduling attempt occur here."""
+
+        try:
+            # Reset the volume state.
+            #
+            # NOTE(harlowja): this is awkward to be done here, shouldn't
+            # this happen at the scheduler itself and not before it gets
+            # sent to the scheduler? (since what happens if it never gets
+            # there??). It's almost like we need a status of 'on-the-way-to
+            # scheduler' in the future.
+            update = {
+                'status': 'creating',
+                'scheduled_at': timeutils.utcnow(),
+            }
+            LOG.debug(_("Updating volume %(volume_id)s with %(update)s") %
+                      {'update': update, 'volume_id': volume_id})
+            self.db.volume_update(context, volume_id, update)
+        except exception.CinderException:
+            # Don't let resetting the status cause the rescheduling to fail.
+            LOG.exception(_("Volume %s: resetting 'creating' status failed"),
+                          volume_id)
+
+    def revert(self, context, result, cause):
+        volume_spec = result.get('volume_spec')
+        if not volume_spec:
+            # Find it from a prior task that populated this from the database.
+            volume_spec = _find_result_spec(cause.flow)
+        volume_id = result['volume_id']
+
+        # Use a different context when rescheduling.
+        if self.reschedule_context:
+            context = self.reschedule_context
+
+        # If we are now supposed to reschedule (or unable to), then just
+        # restore the source volume status and set the volume to error status.
+        def do_error_revert():
+            LOG.debug(_("Failing volume %s creation by altering volume status"
+                        " instead of rescheduling"), volume_id)
+            _restore_source_status(context, self.db, volume_spec)
+            _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+            LOG.error(_("Volume %s: create failed"), volume_id)
+
+        # Check if we have a cause which can tell us not to reschedule.
+        if not self._is_reschedulable(cause):
+            do_error_revert()
+        else:
+            try:
+                self._pre_reschedule(context, volume_id)
+                self._reschedule(context, cause, **result)
+                self._post_reschedule(context, volume_id)
+            except exception.CinderException:
+                LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
+                # NOTE(harlowja): Do error volume status changing instead.
+                do_error_revert()
+        exc_info = False
+        if all(cause.exc_info):
+            exc_info = cause.exc_info
+        LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
+
+class NotifySchedulerFailureTask(CinderTask):
+    """Helper task that notifies some external service on failure.
+
+    Reversion strategy: On failure of any flow that includes this task the
+    request specification associated with that flow will be extracted and
+    sent as a payload to the notification service under the given methods
+    scheduler topic.
+    """
+
+    def __init__(self, method):
+        super(NotifySchedulerFailureTask, self).__init__()
+        self.requires.update(['request_spec', 'volume_id'])
+        self.method = method
+        self.topic = 'scheduler.%s' % self.method
+        self.publisher_id = notifier.publisher_id("scheduler")
+
+    def __call__(self, context, **kwargs):
+        # Save these items since we only use them if a reversion is triggered.
+        return kwargs.copy()
+
+    def revert(self, context, result, cause):
+        request_spec = result['request_spec']
+        volume_id = result['volume_id']
+        volume_properties = request_spec['volume_properties']
+        payload = {
+            'request_spec': request_spec,
+            'volume_properties': volume_properties,
+            'volume_id': volume_id,
+            'state': 'error',
+            'method': self.method,
+            'reason': unicode(cause.exc),
+        }
+        try:
+            notifier.notify(context, self.publisher_id, self.topic,
+                            notifier.ERROR, payload)
+        except exception.CinderException:
+            LOG.exception(_("Failed notifying on %(topic)s "
+                            "payload %(payload)s") % {'topic': self.topic,
+                                                      'payload': payload})
+
+
+class ExtractSchedulerSpecTask(CinderTask):
+    """Extracts a spec object from a partial and/or incomplete request spec.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db):
+        super(ExtractSchedulerSpecTask, self).__init__()
+        self.db = db
+        self.requires.update(['image_id', 'request_spec', 'snapshot_id',
+                              'volume_id'])
+        self.provides.update(['request_spec'])
+
+    def _populate_request_spec(self, context, volume_id, snapshot_id,
+                               image_id):
+        # Create the full request spec using the volume_id.
+        #
+        # NOTE(harlowja): this will fetch the volume from the database, if
+        # the volume has been deleted before we got here then this should fail.
+        #
+        # In the future we might want to have a lock on the volume_id so that
+        # the volume can not be deleted while its still being created?
+        if not volume_id:
+            msg = _("No volume_id provided to populate a request_spec from")
+            raise exception.InvalidInput(reason=msg)
+        volume_ref = self.db.volume_get(context, volume_id)
+        volume_type_id = volume_ref.get('volume_type_id')
+        vol_type = self.db.volume_type_get(context, volume_type_id)
+        return {
+            'volume_id': volume_id,
+            'snapshot_id': snapshot_id,
+            'image_id': image_id,
+            'volume_properties': {
+                'size': utils.as_int(volume_ref.get('size'), quiet=False),
+                'availability_zone': volume_ref.get('availability_zone'),
+                'volume_type_id': volume_type_id,
+            },
+            'volume_type': list(dict(vol_type).iteritems()),
+        }
+
+    def __call__(self, context, request_spec, volume_id, snapshot_id,
+                 image_id):
+        # For RPC version < 1.2 backward compatibility
+        if request_spec is None:
+            request_spec = self._populate_request_spec(context, volume_id,
+                                                       snapshot_id, image_id)
+        return {
+            'request_spec': request_spec,
+        }
+
+
+class ExtractVolumeSpecTask(CinderTask):
+    """Extracts a spec of a volume to be created into a common structure.
+
+    This task extracts and organizes the input requirements into a common
+    and easier to analyze structure for later tasks to use. It will also
+    attach the underlying database volume reference which can be used by
+    other tasks to reference for further details about the volume to be.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db):
+        super(ExtractVolumeSpecTask, self).__init__()
+        self.db = db
+        self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
+                              'source_volid', 'volume_id'])
+        self.provides.update(['volume_spec', 'volume_ref'])
+
+    def __call__(self, context, volume_id, **kwargs):
+        get_remote_image_service = glance.get_remote_image_service
+
+        # NOTE(harlowja): this will fetch the volume from the database, if
+        # the volume has been deleted before we got here then this should fail.
+        #
+        # In the future we might want to have a lock on the volume_id so that
+        # the volume can not be deleted while its still being created?
+        volume_ref = self.db.volume_get(context, volume_id)
+        volume_name = volume_ref['name']
+        volume_size = utils.as_int(volume_ref['size'], quiet=False)
+
+        # Create a dictionary that will represent the volume to be so that
+        # later tasks can easily switch between the different types and create
+        # the volume according to the volume types specifications (which are
+        # represented in this dictionary).
+        specs = {
+            'status': volume_ref['status'],
+            'type': 'raw',  # This will have the type of the volume to be
+                            # created, which should be one of [raw, snap,
+                            # source_vol, image]
+            'volume_id': volume_ref['id'],
+            'volume_name': volume_name,
+            'volume_size': volume_size,
+        }
+
+        if kwargs.get('snapshot_id'):
+            # We are making a snapshot based volume instead of a raw volume.
+            specs.update({
+                'type': 'snap',
+                'snapshot_id': kwargs['snapshot_id'],
+            })
+        elif kwargs.get('source_volid'):
+            # We are making a source based volume instead of a raw volume.
+            #
+            # NOTE(harlowja): This will likely fail if the source volume
+            # disappeared by the time this call occurred.
+            source_volid = kwargs['source_volid']
+            source_volume_ref = self.db.volume_get(context, source_volid)
+            specs.update({
+                'source_volid': source_volid,
+                # This is captured incase we have to revert and we want to set
+                # back the source volume status to its original status. This
+                # may or may not be sketchy to do??
+                'source_volstatus': source_volume_ref['status'],
+                'type': 'source_vol',
+            })
+        elif kwargs.get('image_id'):
+            # We are making a image based volume instead of a raw volume.
+            image_href = kwargs['image_id']
+            image_service, image_id = get_remote_image_service(context,
+                                                               image_href)
+            specs.update({
+                'type': 'image',
+                'image_id': image_id,
+                'image_location': image_service.get_location(context,
+                                                             image_id),
+                'image_meta': image_service.show(context, image_id),
+                # Instead of refetching the image service later just save it.
+                #
+                # NOTE(harlowja): if we have to later recover this tasks output
+                # on another 'node' that this object won't be able to be
+                # serialized, so we will have to recreate this object on
+                # demand in the future.
+                'image_service': image_service,
+            })
+
+        return {
+            'volume_spec': specs,
+            # NOTE(harlowja): it appears like further usage of this volume_ref
+            # result actually depend on it being a sqlalchemy object and not
+            # just a plain dictionary so thats why we are storing this here.
+            #
+            # It was attempted to refetch it when needed in subsequent tasks,
+            # but that caused sqlalchemy errors to occur (volume already open
+            # or similar).
+            #
+            # In the future where this task could fail and be recovered from we
+            # will need to store the volume_spec and recreate the volume_ref
+            # on demand.
+            'volume_ref': volume_ref,
+        }
+
+
+class NotifyVolumeActionTask(CinderTask):
+    """Performs a notification about the given volume when called.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db, host, event_suffix):
+        super(NotifyVolumeActionTask, self).__init__(addons=[event_suffix])
+        self.requires.update(['volume_ref'])
+        self.db = db
+        self.event_suffix = event_suffix
+        self.host = host
+
+    def __call__(self, context, volume_ref):
+        volume_id = volume_ref['id']
+        try:
+            volume_utils.notify_about_volume_usage(context, volume_ref,
+                                                   self.event_suffix,
+                                                   host=self.host)
+        except exception.CinderException:
+            # If notification sending of volume database entry reading fails
+            # then we shouldn't error out the whole workflow since this is
+            # not always information that must be sent for volumes to operate
+            LOG.exception(_("Failed notifying about the volume"
+                            " action %(event)s for volume %(volume_id)s") %
+                          {'event': self.event_suffix,
+                           'volume_id': volume_id})
+
+
+class CreateVolumeFromSpecTask(CinderTask):
+    """Creates a volume from a provided specification.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db, host, driver):
+        super(CreateVolumeFromSpecTask, self).__init__()
+        self.db = db
+        self.driver = driver
+        self.requires.update(['volume_spec', 'volume_ref'])
+        # This maps the different volume specification types into the methods
+        # that can create said volume type (aka this is a jump table).
+        self._create_func_mapping = {
+            'raw': self._create_raw_volume,
+            'snap': self._create_from_snapshot,
+            'source_vol': self._create_from_source_volume,
+            'image': self._create_from_image,
+        }
+        self.host = host
+
+    def _create_from_snapshot(self, context, volume_ref, snapshot_id,
+                              **kwargs):
+        volume_id = volume_ref['id']
+        snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+        model_update = self.driver.create_volume_from_snapshot(volume_ref,
+                                                               snapshot_ref)
+        # NOTE(harlowja): Subtasks would be useful here since after this
+        # point the volume has already been created and further failures
+        # will not destroy the volume (although they could in the future).
+        make_bootable = False
+        try:
+            originating_vref = self.db.volume_get(context,
+                                                  snapshot_ref['volume_id'])
+            make_bootable = originating_vref.bootable
+        except exception.CinderException as ex:
+            LOG.exception(_("Failed fetching snapshot %(snapshot_id)s bootable"
+                            " flag using the provided glance snapshot "
+                            "%(snapshot_ref_id)s volume reference") %
+                          {'snapshot_id': snapshot_id,
+                           'snapshot_ref_id': snapshot_ref['volume_id']})
+            raise exception.MetadataUpdateFailure(reason=ex)
+        if make_bootable:
+            self._enable_bootable_flag(context, volume_id)
+        try:
+            LOG.debug(_("Copying metadata from snapshot %(snap_volume_id)s"
+                        " to %(volume_id)s") % {'snap_volume_id': snapshot_id,
+                                                'volume_id': volume_id})
+            self.db.volume_glance_metadata_copy_to_volume(context, volume_id,
+                                                          snapshot_id)
+        except exception.CinderException as ex:
+            LOG.exception(_("Failed updating volume %(volume_id)s metadata"
+                            " using the provided glance snapshot "
+                            "%(snapshot_id)s metadata") %
+                          {'volume_id': volume_id, 'snapshot_id': snapshot_id})
+            raise exception.MetadataCopyFailure(reason=ex)
+        return model_update
+
+    def _enable_bootable_flag(self, context, volume_id):
+        try:
+            LOG.debug(_('Marking volume %s as bootable'), volume_id)
+            self.db.volume_update(context, volume_id, {'bootable': True})
+        except exception.CinderException as ex:
+            LOG.exception(_("Failed updating volume %(volume_id)s bootable"
+                            " flag to true") % {'volume_id': volume_id})
+            raise exception.MetadataUpdateFailure(reason=ex)
+
+    def _create_from_source_volume(self, context, volume_ref,
+                                   source_volid, **kwargs):
+        # NOTE(harlowja): if the source volume has disappeared this will be our
+        # detection of that since this database call should fail.
+        #
+        # NOTE(harlowja): likely this is not the best place for this to happen
+        # and we should have proper locks on the source volume while actions
+        # that use the source volume are underway.
+        srcvol_ref = self.db.volume_get(context, source_volid)
+        model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref)
+        # NOTE(harlowja): Subtasks would be useful here since after this
+        # point the volume has already been created and further failures
+        # will not destroy the volume (although they could in the future).
+        if srcvol_ref.bootable:
+            self._enable_bootable_flag(context, volume_ref['id'])
+        try:
+            LOG.debug(_('Copying metadata from source volume %(source_volid)s'
+                        ' to cloned volume %(clone_vol_id)s') % {
+                            'source_volid': source_volid,
+                            'clone_vol_id': volume_ref['id'],
+                        })
+            self.db.volume_glance_metadata_copy_from_volume_to_volume(
+                context,
+                source_volid,
+                volume_ref['id'])
+        except exception.CinderException as ex:
+            LOG.exception(_("Failed updating cloned volume %(volume_id)s"
+                            " metadata using the provided source volumes"
+                            " %(source_volid)s metadata") %
+                          {'volume_id': volume_ref['id'],
+                           'source_volid': source_volid})
+            raise exception.MetadataCopyFailure(reason=ex)
+        return model_update
+
+    def _copy_image_to_volume(self, context, volume_ref,
+                              image_id, image_location, image_service):
+        """Downloads Glance image to the specified volume. """
+        copy_image_to_volume = self.driver.copy_image_to_volume
+        volume_id = volume_ref['id']
+        LOG.debug(_("Attempting download of %(image_id)s (%(image_location)s)"
+                    " to volume %(volume_id)s") %
+                  {'image_id': image_id, 'volume_id': volume_id,
+                   'image_location': image_location})
+        try:
+            copy_image_to_volume(context, volume_ref, image_service, image_id)
+        except exception.ProcessExecutionError as ex:
+            LOG.error(_("Failed to copy image %(image_id)s to volume: "
+                        "%(volume_id)s, error: %(error)s") %
+                      {'volume_id': volume_id,
+                       'error': ex.stderr, 'image_id': image_id})
+            raise exception.ImageCopyFailure(reason=ex.stderr)
+        except exception.CinderException as ex:
+            LOG.error(_("Failed to copy image %(image_id)s to "
+                        "volume: %(volume_id)s, error: %(error)s") %
+                      {'volume_id': volume_id, 'error': ex,
+                       'image_id': image_id})
+            raise exception.ImageCopyFailure(reason=ex)
+
+        LOG.debug(_("Downloaded image %(image_id)s (%(image_location)s)"
+                    " to volume %(volume_id)s successfully") %
+                  {'image_id': image_id, 'volume_id': volume_id,
+                   'image_location': image_location})
+
+    def _capture_volume_image_metadata(self, context, volume_id,
+                                       image_id, image_meta):
+        if not image_meta:
+            image_meta = {}
+
+        # Save some base attributes into the volume metadata
+        base_metadata = {
+            'image_id': image_id,
+        }
+        name = image_meta.get('name', None)
+        if name:
+            base_metadata['image_name'] = name
+
+        # Save some more attributes into the volume metadata from the image
+        # metadata
+        for key in IMAGE_ATTRIBUTES:
+            if key not in image_meta:
+                continue
+            value = image_meta.get(key, None)
+            if value is not None:
+                base_metadata[key] = value
+
+        # Save all the image metadata properties into the volume metadata
+        property_metadata = {}
+        image_properties = image_meta.get('properties', {})
+        for (key, value) in image_properties.items():
+            if value is not None:
+                property_metadata[key] = value
+
+        # NOTE(harlowja): The best way for this to happen would be in bulk,
+        # but that doesn't seem to exist (yet), so we go through one by one
+        # which means we can have partial create/update failure.
+        volume_metadata = dict(property_metadata)
+        volume_metadata.update(base_metadata)
+        LOG.debug(_("Creating volume glance metadata for volume %(volume_id)s"
+                    " backed by image %(image_id)s with: %(vol_metadata)s") %
+                  {'volume_id': volume_id, 'image_id': image_id,
+                   'vol_metadata': volume_metadata})
+        for (key, value) in volume_metadata.items():
+            try:
+                self.db.volume_glance_metadata_create(context, volume_id,
+                                                      key, value)
+            except exception.GlanceMetadataExists:
+                pass
+
+    def _create_from_image(self, context, volume_ref,
+                           image_location, image_id, image_meta,
+                           image_service, **kwargs):
+        LOG.debug(_("Cloning %(volume_id)s from image %(image_id)s "
+                    " at location %(image_location)s") %
+                  {'volume_id': volume_ref['id'],
+                   'image_location': image_location, 'image_id': image_id})
+        # Create the volume from an image.
+        #
+        # NOTE (singn): two params need to be returned
+        # dict containing provider_location for cloned volume
+        # and clone status.
+        model_update, cloned = self.driver.clone_image(volume_ref,
+                                                       image_location)
+        make_bootable = False
+        if not cloned:
+            # TODO(harlowja): what needs to be rolled back in the clone if this
+            # volume create fails?? Likely this should be a subflow or broken
+            # out task in the future. That will bring up the question of how
+            # do we make said subflow/task which is only triggered in the
+            # clone image 'path' resumable and revertable in the correct
+            # manner.
+            #
+            # Create the volume and then download the image onto the volume.
+            model_update = self.driver.create_volume(volume_ref)
+            updates = dict(model_update or dict(), status='downloading')
+            try:
+                volume_ref = self.db.volume_update(context,
+                                                   volume_ref['id'], updates)
+            except exception.CinderException:
+                LOG.exception(_("Failed updating volume %(volume_id)s with "
+                                "%(updates)s") %
+                              {'volume_id': volume_ref['id'],
+                               'updates': updates})
+            self._copy_image_to_volume(context, volume_ref,
+                                       image_id, image_location, image_service)
+            make_bootable = True
+        if make_bootable:
+            self._enable_bootable_flag(context, volume_ref['id'])
+        try:
+            self._capture_volume_image_metadata(context, volume_ref['id'],
+                                                image_id, image_meta)
+        except exception.CinderException as ex:
+            LOG.exception(_("Failed updating volume %(volume_id)s metadata"
+                            " using the provided image metadata"
+                            " %(image_meta)s from image %(image_id)s") %
+                          {'volume_id': volume_ref['id'],
+                           'image_meta': image_meta, 'image_id': image_id})
+            raise exception.MetadataUpdateFailure(reason=ex)
+        return model_update
+
+    def _create_raw_volume(self, context, volume_ref, **kwargs):
+        return self.driver.create_volume(volume_ref)
+
+    def __call__(self, context, volume_ref, volume_spec):
+        create_type = volume_spec.pop('type', None)
+        create_functor = self._create_func_mapping.get(create_type)
+        if not create_functor:
+            raise exception.VolumeTypeNotFound(volume_type_id=create_type)
+
+        volume_spec = dict(volume_spec)
+        volume_id = volume_spec.pop('volume_id', None)
+        if not volume_id:
+            volume_id = volume_ref['id']
+        LOG.info(_("Volume %(volume_id)s: being created using %(functor)s "
+                   "with specification: %(volume_spec)s") %
+                 {'volume_spec': volume_spec, 'volume_id': volume_id,
+                  'functor': _make_pretty_name(create_functor)})
+
+        # NOTE(vish): so we don't have to get volume from db again before
+        # passing it to the driver.
+        volume_ref['host'] = self.host
+
+        # Call the given functor to make the volume.
+        model_update = create_functor(context, volume_ref=volume_ref,
+                                      **volume_spec)
+
+        # Persist any model information provided on creation.
+        try:
+            if model_update:
+                volume_ref = self.db.volume_update(context, volume_ref['id'],
+                                                   model_update)
+        except exception.CinderException as ex:
+            # If somehow the update failed we want to ensure that the
+            # failure is logged (but not try rescheduling since the volume at
+            # this point has been created).
+            if model_update:
+                LOG.exception(_("Failed updating model of volume %(volume_id)s"
+                                " with creation provided model %(model)s") %
+                              {'volume_id': volume_id, 'model': model_update})
+                raise exception.ExportFailure(reason=ex)
+
+        # Persist any driver exported model information.
+        model_update = None
+        try:
+            LOG.debug(_("Volume %s: creating export"), volume_ref['id'])
+            model_update = self.driver.create_export(context, volume_ref)
+            if model_update:
+                self.db.volume_update(context, volume_ref['id'], model_update)
+        except exception.CinderException as ex:
+            # If somehow the read *or* create export failed we want to ensure
+            # that the failure is logged (but not try rescheduling since
+            # the volume at this point has been created).
+            #
+            # NOTE(harlowja): Notice that since the model_update is initially
+            # empty, the only way it will still be empty is if there is no
+            # model_update (which we don't care about) or there was an
+            # model_update and updating failed.
+            if model_update:
+                LOG.exception(_("Failed updating model of volume %(volume_id)s"
+                              " with driver provided model %(model)s") %
+                              {'volume_id': volume_id, 'model': model_update})
+                raise exception.ExportFailure(reason=ex)
+
+
+class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
+    """On successful volume creation this will perform final volume actions.
+
+    When a volume is created successfully it is expected that MQ notifications
+    and database updates will occur to 'signal' to others that the volume is
+    now ready for usage. This task does those notifications and updates in a
+    reliable manner (not re-raising exceptions if said actions can not be
+    triggered).
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db, host, event_suffix):
+        super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
+        self.requires.update(['volume_spec'])
+        self.status_translation = {
+            'migration_target_creating': 'migration_target',
+        }
+
+    def __call__(self, context, volume_ref, volume_spec):
+        volume_id = volume_ref['id']
+        new_status = self.status_translation.get(volume_spec.get('status'),
+                                                 'available')
+        update = {
+            'status': new_status,
+            'launched_at': timeutils.utcnow(),
+        }
+        try:
+            # TODO(harlowja): is it acceptable to only log if this fails??
+            # or are there other side-effects that this will cause if the
+            # status isn't updated correctly (aka it will likely be stuck in
+            # 'building' if this fails)??
+            volume_ref = self.db.volume_update(context, volume_id, update)
+            # Now use the parent to notify.
+            super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
+        except exception.CinderException:
+            LOG.exception(_("Failed updating volume %(volume_id)s with "
+                            "%(update)s") % {'volume_id': volume_id,
+                                             'update': update})
+        # Even if the update fails, the volume is ready.
+        msg = _("Volume %(volume_name)s (%(volume_id)s): created successfully")
+        LOG.info(msg % {
+            'volume_name': volume_spec['volume_name'],
+            'volume_id': volume_id,
+        })
+
+
+def _attach_debug_listeners(flow):
+    """Sets up a nice set of debug listeners for the flow.
+
+    These listeners will log when tasks/flows are transitioning from state to
+    state so that said states can be seen in the debug log output which is very
+    useful for figuring out where problems are occuring.
+    """
+
+    def flow_log_change(state, details):
+        LOG.debug(_("%(flow)s has moved into state %(state)s from state"
+                    " %(old_state)s") % {'state': state,
+                                         'old_state': details.get('old_state'),
+                                         'flow': details['flow']})
+
+    def task_log_change(state, details):
+        LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
+                    " result: %(result)s") % {'state': state,
+                                              'flow': details['flow'],
+                                              'runner': details['runner'],
+                                              'result': details.get('result')})
+
+    # Register * for all state changes (and not selective state changes to be
+    # called upon) since all the changes is more useful.
+    flow.notifier.register('*', flow_log_change)
+    flow.task_notifier.register('*', task_log_change)
+    return flow
+
+
+def get_api_flow(scheduler_rpcapi, volume_rpcapi, db,
+                 image_service,
+                 az_check_functor,
+                 create_what):
+    """Constructs and returns the api entrypoint flow.
+
+    This flow will do the following:
+
+    1. Inject keys & values for dependent tasks.
+    2. Extracts and validates the input keys & values.
+    3. Reserves the quota (reverts quota on any failures).
+    4. Creates the database entry.
+    5. Commits the quota.
+    6. Casts to volume manager or scheduler for further processing.
+    """
+
+    flow_name = ACTION.replace(":", "_") + "_api"
+    api_flow = linear_flow.Flow(flow_name)
+
+    # This injects the initial starting flow values into the workflow so that
+    # the dependency order of the tasks provides/requires can be correctly
+    # determined.
+    api_flow.add(ValuesInjectTask(create_what))
+    api_flow.add(ExtractVolumeRequestTask(image_service,
+                                          az_check_functor))
+    api_flow.add(QuotaReserveTask())
+    v_uuid = api_flow.add(EntryCreateTask(db))
+    api_flow.add(QuotaCommitTask())
+
+    # If after commiting something fails, ensure we set the db to failure
+    # before reverting any prior tasks.
+    api_flow.add(OnFailureChangeStatusTask(db))
+
+    # This will cast it out to either the scheduler or volume manager via
+    # the rpc apis provided.
+    api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
+
+    # Note(harlowja): this will return the flow as well as the uuid of the
+    # task which will produce the 'volume' database reference (since said
+    # reference is returned to other callers in the api for further usage).
+    return (_attach_debug_listeners(api_flow), v_uuid)
+
+
+def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
+                       volume_id=None, snapshot_id=None, image_id=None):
+
+    """Constructs and returns the scheduler entrypoint flow.
+
+    This flow will do the following:
+
+    1. Inject keys & values for dependent tasks.
+    2. Extracts a scheduler specification from the provided inputs.
+    3. Attaches 2 activated only on *failure* tasks (one to update the db
+       status and one to notify on the MQ of the failure that occured).
+    4. Uses provided driver to to then select and continue processing of
+       volume request.
+    """
+
+    flow_name = ACTION.replace(":", "_") + "_scheduler"
+    scheduler_flow = linear_flow.Flow(flow_name)
+
+    # This injects the initial starting flow values into the workflow so that
+    # the dependency order of the tasks provides/requires can be correctly
+    # determined.
+    scheduler_flow.add(ValuesInjectTask({
+        'request_spec': request_spec,
+        'filter_properties': filter_properties,
+        'volume_id': volume_id,
+        'snapshot_id': snapshot_id,
+        'image_id': image_id,
+    }))
+
+    # This will extract and clean the spec from the starting values.
+    scheduler_flow.add(ExtractSchedulerSpecTask(db))
+
+    # The decorator application here ensures that the method gets the right
+    # requires attributes automatically by examining the underlying functions
+    # arguments.
+
+    @decorators.task
+    def schedule_create_volume(context, request_spec, filter_properties):
+
+        def _log_failure(cause):
+            LOG.error(_("Failed to schedule_create_volume: %(cause)s") %
+                      {'cause': cause})
+
+        def _notify_failure(cause):
+            """When scheduling fails send out a event that it failed."""
+            topic = "scheduler.create_volume"
+            payload = {
+                'request_spec': request_spec,
+                'volume_properties': request_spec.get('volume_properties', {}),
+                'volume_id': volume_id,
+                'state': 'error',
+                'method': 'create_volume',
+                'reason': cause,
+            }
+            try:
+                publisher_id = notifier.publisher_id("scheduler")
+                notifier.notify(context, publisher_id, topic, notifier.ERROR,
+                                payload)
+            except exception.CinderException:
+                LOG.exception(_("Failed notifying on %(topic)s "
+                                "payload %(payload)s") % {'topic': topic,
+                                                          'payload': payload})
+
+        try:
+            driver.schedule_create_volume(context, request_spec,
+                                          filter_properties)
+        except exception.NoValidHost as e:
+            # Not host found happened, notify on the scheduler queue and log
+            # that this happened and set the volume to errored out and
+            # *do not* reraise the error (since whats the point).
+            _notify_failure(e)
+            _log_failure(e)
+            _error_out_volume(context, db, volume_id, reason=e)
+        except Exception as e:
+            # Some other error happened, notify on the scheduler queue and log
+            # that this happened and set the volume to errored out and
+            # *do* reraise the error.
+            with excutils.save_and_reraise_exception():
+                _notify_failure(e)
+                _log_failure(e)
+                _error_out_volume(context, db, volume_id, reason=e)
+
+    scheduler_flow.add(schedule_create_volume)
+
+    return _attach_debug_listeners(scheduler_flow)
+
+
+def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
+                     request_spec=None, filter_properties=None,
+                     allow_reschedule=True,
+                     snapshot_id=None, image_id=None, source_volid=None,
+                     reschedule_context=None):
+    """Constructs and returns the manager entrypoint flow.
+
+    This flow will do the following:
+
+    1. Determines if rescheduling is enabled (ahead of time).
+    2. Inject keys & values for dependent tasks.
+    3. Selects 1 of 2 activated only on *failure* tasks (one to update the db
+       status & notify or one to update the db status & notify & *reschedule*).
+    4. Extracts a volume specification from the provided inputs.
+    5. Notifies that the volume has start to be created.
+    6. Creates a volume from the extracted volume specification.
+    7. Attaches a on-success *only* task that notifies that the volume creation
+       has ended and performs further database status updates.
+    """
+
+    flow_name = ACTION.replace(":", "_") + "_manager"
+    volume_flow = linear_flow.Flow(flow_name)
+
+    # Determine if we are allowed to reschedule since this affects how
+    # failures will be handled.
+    if not filter_properties:
+        filter_properties = {}
+    if not request_spec and allow_reschedule:
+        LOG.debug(_("No request spec, will not reschedule"))
+        allow_reschedule = False
+    if not filter_properties.get('retry', None) and allow_reschedule:
+        LOG.debug(_("No retry filter property or associated "
+                    "retry info, will not reschedule"))
+        allow_reschedule = False
+
+    # This injects the initial starting flow values into the workflow so that
+    # the dependency order of the tasks provides/requires can be correctly
+    # determined.
+    volume_flow.add(ValuesInjectTask({
+        'filter_properties': filter_properties,
+        'image_id': image_id,
+        'request_spec': request_spec,
+        'snapshot_id': snapshot_id,
+        'source_volid': source_volid,
+        'volume_id': volume_id,
+    }))
+
+    # We can actually just check if we should reschedule on failure ahead of
+    # time instead of trying to determine this later, certain values are needed
+    # to reschedule and without them we should just avoid rescheduling.
+    if not allow_reschedule:
+        # On failure ensure that we just set the volume status to error.
+        LOG.debug(_("Retry info not present, will not reschedule"))
+        volume_flow.add(OnFailureChangeStatusTask(db))
+    else:
+        volume_flow.add(OnFailureRescheduleTask(reschedule_context,
+                                                db, scheduler_rpcapi))
+
+    volume_flow.add(ExtractVolumeSpecTask(db))
+    volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
+    volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
+    volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
+
+    return _attach_debug_listeners(volume_flow)
index d3c800be0e26abdfca035d26feeb12ed403ed8a8..40fe197507c75cd75da78f6932fd9358b7f42577 100644 (file)
@@ -58,9 +58,12 @@ from cinder.openstack.common import uuidutils
 from cinder import quota
 from cinder import utils
 from cinder.volume.configuration import Configuration
+from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
 
+from cinder.taskflow import states
+
 LOG = logging.getLogger(__name__)
 
 QUOTAS = quota.QUOTAS
@@ -167,282 +170,34 @@ class VolumeManager(manager.SchedulerDependentManager):
         # collect and publish service capabilities
         self.publish_service_capabilities(ctxt)
 
-    def _create_volume(self, context, volume_ref, snapshot_ref,
-                       srcvol_ref, image_service, image_id, image_location):
-        cloned = None
-        model_update = False
-
-        if all(x is None for x in(snapshot_ref, image_id, srcvol_ref)):
-            model_update = self.driver.create_volume(volume_ref)
-        elif snapshot_ref is not None:
-            model_update = self.driver.create_volume_from_snapshot(
-                volume_ref,
-                snapshot_ref)
-
-            originating_vref = self.db.volume_get(context,
-                                                  snapshot_ref['volume_id'])
-            if originating_vref.bootable:
-                self.db.volume_update(context,
-                                      volume_ref['id'],
-                                      {'bootable': True})
-        elif srcvol_ref is not None:
-            model_update = self.driver.create_cloned_volume(volume_ref,
-                                                            srcvol_ref)
-            if srcvol_ref.bootable:
-                self.db.volume_update(context,
-                                      volume_ref['id'],
-                                      {'bootable': True})
-        else:
-            # create the volume from an image
-            # NOTE (singn): two params need to be returned
-            # dict containing provider_location for cloned volume
-            # and clone status
-            model_update, cloned = self.driver.clone_image(
-                volume_ref, image_location)
-            if not cloned:
-                model_update = self.driver.create_volume(volume_ref)
-
-                updates = dict(model_update or dict(), status='downloading')
-                volume_ref = self.db.volume_update(context,
-                                                   volume_ref['id'],
-                                                   updates)
-
-                # TODO(jdg): Wrap this in a try block and update status
-                # appropriately if the download image fails
-                self._copy_image_to_volume(context,
-                                           volume_ref,
-                                           image_service,
-                                           image_id)
-                self.db.volume_update(context,
-                                      volume_ref['id'],
-                                      {'bootable': True})
-        return model_update, cloned
-
     def create_volume(self, context, volume_id, request_spec=None,
                       filter_properties=None, allow_reschedule=True,
                       snapshot_id=None, image_id=None, source_volid=None):
         """Creates and exports the volume."""
-        context_saved = context.deepcopy()
-        context = context.elevated()
-        if filter_properties is None:
-            filter_properties = {}
-        volume_ref = self.db.volume_get(context, volume_id)
-        self._notify_about_volume_usage(context, volume_ref, "create.start")
-
-        # NOTE(vish): so we don't have to get volume from db again
-        #             before passing it to the driver.
-        volume_ref['host'] = self.host
 
-        if volume_ref['status'] == 'migration_target_creating':
-            status = 'migration_target'
-        else:
-            status = 'available'
-        model_update = False
-        image_meta = None
-        cloned = False
-
-        try:
-            LOG.debug(_("volume %(vol_name)s: creating lv of"
-                        " size %(vol_size)sG"),
-                      {'vol_name': volume_ref['name'],
-                       'vol_size': volume_ref['size']})
-            snapshot_ref = None
-            sourcevol_ref = None
-            image_service = None
-            image_location = None
-            image_meta = None
-
-            if snapshot_id is not None:
-                LOG.info(_("volume %s: creating from snapshot"),
-                         volume_ref['name'])
-                snapshot_ref = self.db.snapshot_get(context, snapshot_id)
-            elif source_volid is not None:
-                LOG.info(_("volume %s: creating from existing volume"),
-                         volume_ref['name'])
-                sourcevol_ref = self.db.volume_get(context, source_volid)
-            elif image_id is not None:
-                LOG.info(_("volume %s: creating from image"),
-                         volume_ref['name'])
-                # create the volume from an image
-                image_service, image_id = \
-                    glance.get_remote_image_service(context,
-                                                    image_id)
-                image_location = image_service.get_location(context, image_id)
-                image_meta = image_service.show(context, image_id)
-            else:
-                LOG.info(_("volume %s: creating"), volume_ref['name'])
-
-            try:
-                model_update, cloned = self._create_volume(context,
-                                                           volume_ref,
-                                                           snapshot_ref,
-                                                           sourcevol_ref,
-                                                           image_service,
-                                                           image_id,
-                                                           image_location)
-            except exception.ImageCopyFailure as ex:
-                LOG.error(_('Setting volume: %s status to error '
-                            'after failed image copy.'), volume_ref['id'])
-                self.db.volume_update(context,
-                                      volume_ref['id'],
-                                      {'status': 'error'})
-                return
-            except Exception:
-                exc_info = sys.exc_info()
-                # restore source volume status before reschedule
-                # FIXME(zhiteng) do all the clean-up before reschedule
-                if sourcevol_ref is not None:
-                    self.db.volume_update(context, sourcevol_ref['id'],
-                                          {'status': sourcevol_ref['status']})
-                rescheduled = False
-                # try to re-schedule volume:
-                if allow_reschedule:
-                    rescheduled = self._reschedule_or_error(context_saved,
-                                                            volume_id,
-                                                            exc_info,
-                                                            snapshot_id,
-                                                            image_id,
-                                                            request_spec,
-                                                            filter_properties)
-
-                if rescheduled:
-                    LOG.error(_('Unexpected Error: '), exc_info=exc_info)
-                    msg = (_('Creating %(volume_id)s %(snapshot_id)s '
-                             '%(image_id)s was rescheduled due to '
-                             '%(reason)s')
-                           % {'volume_id': volume_id,
-                              'snapshot_id': snapshot_id,
-                              'image_id': image_id,
-                              'reason': unicode(exc_info[1])})
-                    raise exception.CinderException(msg)
-                else:
-                    # not re-scheduling
-                    raise exc_info[0], exc_info[1], exc_info[2]
+        flow = create_volume.get_manager_flow(
+            self.db,
+            self.driver,
+            self.scheduler_rpcapi,
+            self.host,
+            volume_id,
+            request_spec=request_spec,
+            filter_properties=filter_properties,
+            allow_reschedule=allow_reschedule,
+            snapshot_id=snapshot_id,
+            image_id=image_id,
+            source_volid=source_volid,
+            reschedule_context=context.deepcopy())
+
+        assert flow, _('Manager volume flow not retrieved')
+
+        flow.run(context.elevated())
+        if flow.state != states.SUCCESS:
+            raise exception.CinderException(_("Failed to successfully complete"
+                                              " manager volume workflow"))
 
-            if model_update:
-                volume_ref = self.db.volume_update(
-                    context, volume_ref['id'], model_update)
-            if sourcevol_ref is not None:
-                self.db.volume_glance_metadata_copy_from_volume_to_volume(
-                    context,
-                    source_volid,
-                    volume_id)
-
-            LOG.debug(_("volume %s: creating export"), volume_ref['name'])
-            model_update = self.driver.create_export(context, volume_ref)
-            if model_update:
-                self.db.volume_update(context, volume_ref['id'], model_update)
-        except Exception:
-            with excutils.save_and_reraise_exception():
-                volume_ref['status'] = 'error'
-                self.db.volume_update(context,
-                                      volume_ref['id'],
-                                      {'status': volume_ref['status']})
-                LOG.error(_("volume %s: create failed"), volume_ref['name'])
-                self._notify_about_volume_usage(context, volume_ref,
-                                                "create.end")
-
-        if snapshot_id:
-            # Copy any Glance metadata from the original volume
-            self.db.volume_glance_metadata_copy_to_volume(context,
-                                                          volume_ref['id'],
-                                                          snapshot_id)
-
-        if image_id and image_meta:
-            # Copy all of the Glance image properties to the
-            # volume_glance_metadata table for future reference.
-            self.db.volume_glance_metadata_create(context,
-                                                  volume_ref['id'],
-                                                  'image_id', image_id)
-            name = image_meta.get('name', None)
-            if name:
-                self.db.volume_glance_metadata_create(context,
-                                                      volume_ref['id'],
-                                                      'image_name', name)
-            # Save some more attributes into the volume metadata
-            IMAGE_ATTRIBUTES = ['size', 'disk_format',
-                                'container_format', 'checksum',
-                                'min_disk', 'min_ram']
-            for key in IMAGE_ATTRIBUTES:
-                value = image_meta.get(key, None)
-                if value is not None:
-                    self.db.volume_glance_metadata_create(context,
-                                                          volume_ref['id'],
-                                                          key, value)
-            image_properties = image_meta.get('properties', {})
-            for key, value in image_properties.items():
-                self.db.volume_glance_metadata_create(context,
-                                                      volume_ref['id'],
-                                                      key, value)
-
-        now = timeutils.utcnow()
-        volume_ref['status'] = status
-        self.db.volume_update(context,
-                              volume_ref['id'],
-                              {'status': volume_ref['status'],
-                               'launched_at': now})
-        LOG.info(_("volume %s: created successfully"), volume_ref['name'])
         self._reset_stats()
-
-        self._notify_about_volume_usage(context, volume_ref, "create.end")
-        return volume_ref['id']
-
-    def _reschedule_or_error(self, context, volume_id, exc_info,
-                             snapshot_id, image_id, request_spec,
-                             filter_properties):
-        """Try to re-schedule the request."""
-        rescheduled = False
-        try:
-            method_args = (CONF.volume_topic, volume_id, snapshot_id,
-                           image_id, request_spec, filter_properties)
-
-            rescheduled = self._reschedule(context, request_spec,
-                                           filter_properties, volume_id,
-                                           self.scheduler_rpcapi.create_volume,
-                                           method_args,
-                                           exc_info)
-        except Exception:
-            rescheduled = False
-            LOG.exception(_("volume %s: Error trying to reschedule create"),
-                          volume_id)
-
-        return rescheduled
-
-    def _reschedule(self, context, request_spec, filter_properties,
-                    volume_id, scheduler_method, method_args,
-                    exc_info=None):
-        """Attempt to re-schedule a volume operation."""
-
-        retry = filter_properties.get('retry', None)
-        if not retry:
-            # no retry information, do not reschedule.
-            LOG.debug(_("Retry info not present, will not reschedule"))
-            return
-
-        if not request_spec:
-            LOG.debug(_("No request spec, will not reschedule"))
-            return
-
-        request_spec['volume_id'] = volume_id
-
-        LOG.debug(_("volume %(volume_id)s: re-scheduling %(method)s "
-                    "attempt %(num)d") %
-                  {'volume_id': volume_id,
-                   'method': scheduler_method.func_name,
-                   'num': retry['num_attempts']})
-
-        # reset the volume state:
-        now = timeutils.utcnow()
-        self.db.volume_update(context, volume_id,
-                              {'status': 'creating',
-                               'scheduled_at': now})
-
-        if exc_info:
-            # stringify to avoid circular ref problem in json serialization:
-            retry['exc'] = traceback.format_exception(*exc_info)
-
-        scheduler_method(context, *method_args)
-        return True
+        return volume_id
 
     def delete_volume(self, context, volume_id):
         """Deletes and unexports volume."""
@@ -680,28 +435,6 @@ class VolumeManager(manager.SchedulerDependentManager):
                 volume['name'] not in volume['provider_location']):
             self.driver.ensure_export(context, volume)
 
-    def _copy_image_to_volume(self, context, volume, image_service, image_id):
-        """Downloads Glance image to the specified volume."""
-        volume_id = volume['id']
-        try:
-            self.driver.copy_image_to_volume(context, volume,
-                                             image_service,
-                                             image_id)
-        except exception.ProcessExecutionError as ex:
-            LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
-                        "error: %(error)s") % {'volume_id': volume_id,
-                                               'error': ex.stderr})
-            raise exception.ImageCopyFailure(reason=ex.stderr)
-        except Exception as ex:
-            LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
-                        "error: %(error)s") % {'volume_id': volume_id,
-                                               'error': ex})
-            raise exception.ImageCopyFailure(reason=ex)
-
-        LOG.info(_("Downloaded image %(image_id)s to %(volume_id)s "
-                   "successfully.") % {'image_id': image_id,
-                                       'volume_id': volume_id})
-
     def copy_volume_to_image(self, context, volume_id, image_meta):
         """Uploads the specified volume to Glance.
 
diff --git a/taskflow.conf b/taskflow.conf
new file mode 100644 (file)
index 0000000..a871ff7
--- /dev/null
@@ -0,0 +1,7 @@
+[DEFAULT]
+
+# The list of primitives to copy from taskflow
+primitives=flow.linear_flow,task
+
+# The base module to hold the copy of taskflow
+base=cinder