]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Switch create volume commands to Taskflow 0.1.1
authoranastasia-karpinska <akarpinska@griddynamics.com>
Tue, 10 Dec 2013 21:12:31 +0000 (23:12 +0200)
committerAnastasia Karpinska <akarpinska@griddynamics.com>
Thu, 12 Dec 2013 08:36:45 +0000 (10:36 +0200)
- Old TaskFlow code was removed from Cinder.

- TaskFlow 0.1.1 was added to Cinder requirements.

- Create volume flows for volume.api, volume.manager and
  scheduler.manager were updated to use taskFlow 0.1.1

Partially implements: blueprint create-volume-flow
Change-Id: Idbac8d001436f02978b366fbb3205ce84c847267

18 files changed:
cinder/scheduler/manager.py
cinder/taskflow/__init__.py [deleted file]
cinder/taskflow/decorators.py [deleted file]
cinder/taskflow/exceptions.py [deleted file]
cinder/taskflow/patterns/__init__.py [deleted file]
cinder/taskflow/patterns/base.py [deleted file]
cinder/taskflow/patterns/linear_flow.py [deleted file]
cinder/taskflow/states.py [deleted file]
cinder/taskflow/task.py [deleted file]
cinder/taskflow/utils.py [deleted file]
cinder/tests/test_volume.py
cinder/volume/api.py
cinder/volume/flows/base.py
cinder/volume/flows/create_volume/__init__.py
cinder/volume/flows/utils.py [deleted file]
cinder/volume/manager.py
requirements.txt
taskflow.conf [deleted file]

index 19d0a500c8376e3aa83bad39061ed43547ec4bf4..b4d342717e8ba0c2cacd22d685a77fa8c6f4c398 100644 (file)
@@ -34,7 +34,6 @@ from cinder.openstack.common.notifier import api as notifier
 from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 
-from cinder.taskflow import states
 
 scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
                                   default='cinder.scheduler.filter_scheduler.'
@@ -76,17 +75,18 @@ class SchedulerManager(manager.Manager):
                       image_id=None, request_spec=None,
                       filter_properties=None):
 
-        flow = create_volume.get_scheduler_flow(db, self.driver,
-                                                request_spec,
-                                                filter_properties,
-                                                volume_id, snapshot_id,
-                                                image_id)
-        assert flow, _('Schedule volume flow not retrieved')
-
-        flow.run(context)
-        if flow.state != states.SUCCESS:
-            LOG.warn(_("Failed to successfully complete"
-                       " schedule volume using flow: %s"), flow)
+        try:
+            flow_engine = create_volume.get_scheduler_flow(context,
+                                                           db, self.driver,
+                                                           request_spec,
+                                                           filter_properties,
+                                                           volume_id,
+                                                           snapshot_id,
+                                                           image_id)
+        except Exception:
+            raise exception.CinderException(
+                _("Failed to create scheduler manager volume flow"))
+        flow_engine.run()
 
     def request_service_capabilities(self, context):
         volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
diff --git a/cinder/taskflow/__init__.py b/cinder/taskflow/__init__.py
deleted file mode 100644 (file)
index 1f19be5..0000000
+++ /dev/null
@@ -1 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
diff --git a/cinder/taskflow/decorators.py b/cinder/taskflow/decorators.py
deleted file mode 100644 (file)
index ea99d13..0000000
+++ /dev/null
@@ -1,276 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import collections
-import functools
-import inspect
-import types
-
-# These arguments are ones that we will skip when parsing for requirements
-# for a function to operate (when used as a task).
-AUTO_ARGS = ('self', 'context', 'cls')
-
-
-def is_decorated(functor):
-    if not isinstance(functor, (types.MethodType, types.FunctionType)):
-        return False
-    return getattr(extract(functor), '__task__', False)
-
-
-def extract(functor):
-    # Extract the underlying functor if its a method since we can not set
-    # attributes on instance methods, this is supposedly fixed in python 3
-    # and later.
-    #
-    # TODO(harlowja): add link to this fix.
-    assert isinstance(functor, (types.MethodType, types.FunctionType))
-    if isinstance(functor, types.MethodType):
-        return functor.__func__
-    else:
-        return functor
-
-
-def _mark_as_task(functor):
-    setattr(functor, '__task__', True)
-
-
-def _get_wrapped(function):
-    """Get the method at the bottom of a stack of decorators."""
-
-    if hasattr(function, '__wrapped__'):
-        return getattr(function, '__wrapped__')
-
-    if not hasattr(function, 'func_closure') or not function.func_closure:
-        return function
-
-    def _get_wrapped_function(function):
-        if not hasattr(function, 'func_closure') or not function.func_closure:
-            return None
-
-        for closure in function.func_closure:
-            func = closure.cell_contents
-
-            deeper_func = _get_wrapped_function(func)
-            if deeper_func:
-                return deeper_func
-            elif hasattr(closure.cell_contents, '__call__'):
-                return closure.cell_contents
-
-    return _get_wrapped_function(function)
-
-
-def _take_arg(a):
-    if a in AUTO_ARGS:
-        return False
-    # In certain decorator cases it seems like we get the function to be
-    # decorated as an argument, we don't want to take that as a real argument.
-    if isinstance(a, collections.Callable):
-        return False
-    return True
-
-
-def wraps(fn):
-    """This will not be needed in python 3.2 or greater which already has this
-    built-in to its functools.wraps method.
-    """
-
-    def wrapper(f):
-        f = functools.wraps(fn)(f)
-        f.__wrapped__ = getattr(fn, '__wrapped__', fn)
-        return f
-
-    return wrapper
-
-
-def locked(f):
-
-    @wraps(f)
-    def wrapper(self, *args, **kwargs):
-        with self._lock:
-            return f(self, *args, **kwargs)
-
-    return wrapper
-
-
-def task(*args, **kwargs):
-    """Decorates a given function and ensures that all needed attributes of
-    that function are set so that the function can be used as a task.
-    """
-
-    def decorator(f):
-        w_f = extract(f)
-
-        def noop(*args, **kwargs):
-            pass
-
-        # Mark as being a task
-        _mark_as_task(w_f)
-
-        # By default don't revert this.
-        w_f.revert = kwargs.pop('revert_with', noop)
-
-        # Associate a name of this task that is the module + function name.
-        w_f.name = "%s.%s" % (f.__module__, f.__name__)
-
-        # Sets the version of the task.
-        version = kwargs.pop('version', (1, 0))
-        f = _versionize(*version)(f)
-
-        # Attach any requirements this function needs for running.
-        requires_what = kwargs.pop('requires', [])
-        f = _requires(*requires_what, **kwargs)(f)
-
-        # Attach any optional requirements this function needs for running.
-        optional_what = kwargs.pop('optional', [])
-        f = _optional(*optional_what, **kwargs)(f)
-
-        # Attach any items this function provides as output
-        provides_what = kwargs.pop('provides', [])
-        f = _provides(*provides_what, **kwargs)(f)
-
-        @wraps(f)
-        def wrapper(*args, **kwargs):
-            return f(*args, **kwargs)
-
-        return wrapper
-
-    # This is needed to handle when the decorator has args or the decorator
-    # doesn't have args, python is rather weird here...
-    if kwargs or not args:
-        return decorator
-    else:
-        if isinstance(args[0], collections.Callable):
-            return decorator(args[0])
-        else:
-            return decorator
-
-
-def _versionize(major, minor=None):
-    """A decorator that marks the wrapped function with a major & minor version
-    number.
-    """
-
-    if minor is None:
-        minor = 0
-
-    def decorator(f):
-        w_f = extract(f)
-        w_f.version = (major, minor)
-
-        @wraps(f)
-        def wrapper(*args, **kwargs):
-            return f(*args, **kwargs)
-
-        return wrapper
-
-    return decorator
-
-
-def _optional(*args, **kwargs):
-    """Attaches a set of items that the decorated function would like as input
-    to the functions underlying dictionary.
-    """
-
-    def decorator(f):
-        w_f = extract(f)
-
-        if not hasattr(w_f, 'optional'):
-            w_f.optional = set()
-
-        w_f.optional.update([a for a in args if _take_arg(a)])
-
-        @wraps(f)
-        def wrapper(*args, **kwargs):
-            return f(*args, **kwargs)
-
-        return wrapper
-
-    # This is needed to handle when the decorator has args or the decorator
-    # doesn't have args, python is rather weird here...
-    if kwargs or not args:
-        return decorator
-    else:
-        if isinstance(args[0], collections.Callable):
-            return decorator(args[0])
-        else:
-            return decorator
-
-
-def _requires(*args, **kwargs):
-    """Attaches a set of items that the decorated function requires as input
-    to the functions underlying dictionary.
-    """
-
-    def decorator(f):
-        w_f = extract(f)
-
-        if not hasattr(w_f, 'requires'):
-            w_f.requires = set()
-
-        if kwargs.pop('auto_extract', True):
-            inspect_what = _get_wrapped(f)
-            f_args = inspect.getargspec(inspect_what).args
-            w_f.requires.update([a for a in f_args if _take_arg(a)])
-
-        w_f.requires.update([a for a in args if _take_arg(a)])
-
-        @wraps(f)
-        def wrapper(*args, **kwargs):
-            return f(*args, **kwargs)
-
-        return wrapper
-
-    # This is needed to handle when the decorator has args or the decorator
-    # doesn't have args, python is rather weird here...
-    if kwargs or not args:
-        return decorator
-    else:
-        if isinstance(args[0], collections.Callable):
-            return decorator(args[0])
-        else:
-            return decorator
-
-
-def _provides(*args, **kwargs):
-    """Attaches a set of items that the decorated function provides as output
-    to the functions underlying dictionary.
-    """
-
-    def decorator(f):
-        w_f = extract(f)
-
-        if not hasattr(f, 'provides'):
-            w_f.provides = set()
-
-        w_f.provides.update([a for a in args if _take_arg(a)])
-
-        @wraps(f)
-        def wrapper(*args, **kwargs):
-            return f(*args, **kwargs)
-
-        return wrapper
-
-    # This is needed to handle when the decorator has args or the decorator
-    # doesn't have args, python is rather weird here...
-    if kwargs or not args:
-        return decorator
-    else:
-        if isinstance(args[0], collections.Callable):
-            return decorator(args[0])
-        else:
-            return decorator
diff --git a/cinder/taskflow/exceptions.py b/cinder/taskflow/exceptions.py
deleted file mode 100644 (file)
index 62deadd..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-
-class TaskFlowException(Exception):
-    """Base class for exceptions emitted from this library."""
-    pass
-
-
-class Duplicate(TaskFlowException):
-    """Raised when a duplicate entry is found."""
-    pass
-
-
-class NotFound(TaskFlowException):
-    """Raised when some entry in some object doesn't exist."""
-    pass
-
-
-class AlreadyExists(TaskFlowException):
-    """Raised when some entry in some object already exists."""
-    pass
-
-
-class ClosedException(TaskFlowException):
-    """Raised when an access on a closed object occurs."""
-    pass
-
-
-class InvalidStateException(TaskFlowException):
-    """Raised when a task/job/workflow is in an invalid state when an
-    operation is attempting to apply to said task/job/workflow.
-    """
-    pass
-
-
-class UnclaimableJobException(TaskFlowException):
-    """Raised when a job can not be claimed."""
-    pass
-
-
-class JobNotFound(TaskFlowException):
-    """Raised when a job entry can not be found."""
-    pass
-
-
-class MissingDependencies(InvalidStateException):
-    """Raised when a task has dependencies that can not be satisfied."""
-    message = ("%(task)s requires %(requirements)s but no other task produces"
-               " said requirements")
-
-    def __init__(self, task, requirements):
-        message = self.message % {'task': task, 'requirements': requirements}
-        super(MissingDependencies, self).__init__(message)
diff --git a/cinder/taskflow/patterns/__init__.py b/cinder/taskflow/patterns/__init__.py
deleted file mode 100644 (file)
index 1f19be5..0000000
+++ /dev/null
@@ -1 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
diff --git a/cinder/taskflow/patterns/base.py b/cinder/taskflow/patterns/base.py
deleted file mode 100644 (file)
index 20ab9d1..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import abc
-import threading
-import uuid as uuidlib
-
-import six
-
-
-from cinder.taskflow import decorators
-from cinder.taskflow import exceptions as exc
-from cinder.taskflow import states
-from cinder.taskflow import utils
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Flow(object):
-    """The base abstract class of all flow implementations.
-
-    It provides a set of parents to flows that have a concept of parent flows
-    as well as a state and state utility functions to the deriving classes. It
-    also provides a name and an identifier (uuid or other) to the flow so that
-    it can be uniquely identifed among many flows.
-
-    Flows are expected to provide (if desired) the following methods:
-    - add
-    - add_many
-    - interrupt
-    - reset
-    - rollback
-    - run
-    - soft_reset
-    """
-
-    # Common states that certain actions can be performed in. If the flow
-    # is not in these sets of states then it is likely that the flow operation
-    # can not succeed.
-    RESETTABLE_STATES = set([
-        states.INTERRUPTED,
-        states.SUCCESS,
-        states.PENDING,
-        states.FAILURE,
-    ])
-    SOFT_RESETTABLE_STATES = set([
-        states.INTERRUPTED,
-    ])
-    UNINTERRUPTIBLE_STATES = set([
-        states.FAILURE,
-        states.SUCCESS,
-        states.PENDING,
-    ])
-    RUNNABLE_STATES = set([
-        states.PENDING,
-    ])
-
-    def __init__(self, name, parents=None, uuid=None):
-        self._name = str(name)
-        # The state of this flow.
-        self._state = states.PENDING
-        # If this flow has a parent flow/s which need to be reverted if
-        # this flow fails then please include them here to allow this child
-        # to call the parents...
-        if parents:
-            self.parents = tuple(parents)
-        else:
-            self.parents = ()
-        # Any objects that want to listen when a wf/task starts/stops/completes
-        # or errors should be registered here. This can be used to monitor
-        # progress and record tasks finishing (so that it becomes possible to
-        # store the result of a task in some persistent or semi-persistent
-        # storage backend).
-        self.notifier = utils.TransitionNotifier()
-        self.task_notifier = utils.TransitionNotifier()
-        # Ensure that modifications and/or multiple runs aren't happening
-        # at the same time in the same flow at the same time.
-        self._lock = threading.RLock()
-        # Assign this flow a unique identifer.
-        if uuid:
-            self._id = str(uuid)
-        else:
-            self._id = str(uuidlib.uuid4())
-
-    @property
-    def name(self):
-        """A non-unique name for this flow (human readable)"""
-        return self._name
-
-    @property
-    def uuid(self):
-        """Uniquely identifies this flow"""
-        return "f-%s" % (self._id)
-
-    @property
-    def state(self):
-        """Provides a read-only view of the flow state."""
-        return self._state
-
-    def _change_state(self, context, new_state):
-        was_changed = False
-        old_state = self.state
-        with self._lock:
-            if self.state != new_state:
-                old_state = self.state
-                self._state = new_state
-                was_changed = True
-        if was_changed:
-            # Don't notify while holding the lock.
-            self.notifier.notify(self.state, details={
-                'context': context,
-                'flow': self,
-                'old_state': old_state,
-            })
-
-    def __str__(self):
-        lines = ["Flow: %s" % (self.name)]
-        lines.append("%s" % (self.uuid))
-        lines.append("%s" % (len(self.parents)))
-        lines.append("%s" % (self.state))
-        return "; ".join(lines)
-
-    @abc.abstractmethod
-    def add(self, task):
-        """Adds a given task to this flow.
-
-        Returns the uuid that is associated with the task for later operations
-        before and after it is ran.
-        """
-        raise NotImplementedError()
-
-    @decorators.locked
-    def add_many(self, tasks):
-        """Adds many tasks to this flow.
-
-        Returns a list of uuids (one for each task added).
-        """
-        uuids = []
-        for t in tasks:
-            uuids.append(self.add(t))
-        return uuids
-
-    def interrupt(self):
-        """Attempts to interrupt the current flow and any tasks that are
-        currently not running in the flow.
-
-        Returns how many tasks were interrupted (if any).
-        """
-        if self.state in self.UNINTERRUPTIBLE_STATES:
-            raise exc.InvalidStateException(("Can not interrupt when"
-                                             " in state %s") % (self.state))
-        # Note(harlowja): Do *not* acquire the lock here so that the flow may
-        # be interrupted while running. This does mean the the above check may
-        # not be valid but we can worry about that if it becomes an issue.
-        old_state = self.state
-        if old_state != states.INTERRUPTED:
-            self._state = states.INTERRUPTED
-            self.notifier.notify(self.state, details={
-                'context': None,
-                'flow': self,
-                'old_state': old_state,
-            })
-        return 0
-
-    @decorators.locked
-    def reset(self):
-        """Fully resets the internal state of this flow, allowing for the flow
-        to be ran again.
-
-        Note: Listeners are also reset.
-        """
-        if self.state not in self.RESETTABLE_STATES:
-            raise exc.InvalidStateException(("Can not reset when"
-                                             " in state %s") % (self.state))
-        self.notifier.reset()
-        self.task_notifier.reset()
-        self._change_state(None, states.PENDING)
-
-    @decorators.locked
-    def soft_reset(self):
-        """Partially resets the internal state of this flow, allowing for the
-        flow to be ran again from an interrupted state only.
-        """
-        if self.state not in self.SOFT_RESETTABLE_STATES:
-            raise exc.InvalidStateException(("Can not soft reset when"
-                                             " in state %s") % (self.state))
-        self._change_state(None, states.PENDING)
-
-    @decorators.locked
-    def run(self, context, *args, **kwargs):
-        """Executes the workflow."""
-        if self.state not in self.RUNNABLE_STATES:
-            raise exc.InvalidStateException("Unable to run flow when "
-                                            "in state %s" % (self.state))
-
-    @decorators.locked
-    def rollback(self, context, cause):
-        """Performs rollback of this workflow and any attached parent workflows
-        if present.
-        """
-        pass
diff --git a/cinder/taskflow/patterns/linear_flow.py b/cinder/taskflow/patterns/linear_flow.py
deleted file mode 100644 (file)
index 16f220a..0000000
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import collections
-import logging
-
-from cinder.openstack.common import excutils
-
-from cinder.taskflow import decorators
-from cinder.taskflow import exceptions as exc
-from cinder.taskflow import states
-from cinder.taskflow import utils
-
-from cinder.taskflow.patterns import base
-
-LOG = logging.getLogger(__name__)
-
-
-class Flow(base.Flow):
-    """"A linear chain of tasks that can be applied in order as one unit and
-    rolled back as one unit using the reverse order that the tasks have
-    been applied in.
-
-    Note(harlowja): Each task in the chain must have requirements
-    which are satisfied by the previous task/s in the chain.
-    """
-
-    def __init__(self, name, parents=None, uuid=None):
-        super(Flow, self).__init__(name, parents, uuid)
-        # The tasks which have been applied will be collected here so that they
-        # can be reverted in the correct order on failure.
-        self._accumulator = utils.RollbackAccumulator()
-        # Tasks results are stored here. Lookup is by the uuid that was
-        # returned from the add function.
-        self.results = {}
-        # The previously left off iterator that can be used to resume from
-        # the last task (if interrupted and soft-reset).
-        self._leftoff_at = None
-        # All runners to run are collected here.
-        self._runners = []
-        self._connected = False
-        # The resumption strategy to use.
-        self.resumer = None
-
-    @decorators.locked
-    def add(self, task):
-        """Adds a given task to this flow."""
-        assert isinstance(task, collections.Callable)
-        r = utils.Runner(task)
-        r.runs_before = list(reversed(self._runners))
-        self._runners.append(r)
-        self._reset_internals()
-        return r.uuid
-
-    def _reset_internals(self):
-        self._connected = False
-        self._leftoff_at = None
-
-    def _associate_providers(self, runner):
-        # Ensure that some previous task provides this input.
-        who_provides = {}
-        task_requires = runner.requires
-        for r in task_requires:
-            provider = None
-            for before_me in runner.runs_before:
-                if r in before_me.provides:
-                    provider = before_me
-                    break
-            if provider:
-                who_provides[r] = provider
-        # Ensure that the last task provides all the needed input for this
-        # task to run correctly.
-        missing_requires = task_requires - set(who_provides.keys())
-        if missing_requires:
-            raise exc.MissingDependencies(runner, sorted(missing_requires))
-        runner.providers.update(who_provides)
-
-    def __str__(self):
-        lines = ["LinearFlow: %s" % (self.name)]
-        lines.append("%s" % (self.uuid))
-        lines.append("%s" % (len(self._runners)))
-        lines.append("%s" % (len(self.parents)))
-        lines.append("%s" % (self.state))
-        return "; ".join(lines)
-
-    @decorators.locked
-    def remove(self, uuid):
-        index_removed = -1
-        for (i, r) in enumerate(self._runners):
-            if r.uuid == uuid:
-                index_removed = i
-                break
-        if index_removed == -1:
-            raise ValueError("No runner found with uuid %s" % (uuid))
-        else:
-            removed = self._runners.pop(index_removed)
-            self._reset_internals()
-            # Go and remove it from any runner after the removed runner since
-            # those runners may have had an attachment to it.
-            for r in self._runners[index_removed:]:
-                try:
-                    r.runs_before.remove(removed)
-                except (IndexError, ValueError):
-                    pass
-
-    def __len__(self):
-        return len(self._runners)
-
-    def _connect(self):
-        if self._connected:
-            return self._runners
-        for r in self._runners:
-            r.providers = {}
-        for r in reversed(self._runners):
-            self._associate_providers(r)
-        self._connected = True
-        return self._runners
-
-    def _ordering(self):
-        return iter(self._connect())
-
-    @decorators.locked
-    def run(self, context, *args, **kwargs):
-        super(Flow, self).run(context, *args, **kwargs)
-
-        def resume_it():
-            if self._leftoff_at is not None:
-                return ([], self._leftoff_at)
-            if self.resumer:
-                (finished, leftover) = self.resumer.resume(self,
-                                                           self._ordering())
-            else:
-                finished = []
-                leftover = self._ordering()
-            return (finished, leftover)
-
-        self._change_state(context, states.STARTED)
-        try:
-            those_finished, leftover = resume_it()
-        except Exception:
-            with excutils.save_and_reraise_exception():
-                self._change_state(context, states.FAILURE)
-
-        def run_it(runner, failed=False, result=None, simulate_run=False):
-            try:
-                # Add the task to be rolled back *immediately* so that even if
-                # the task fails while producing results it will be given a
-                # chance to rollback.
-                rb = utils.RollbackTask(context, runner.task, result=None)
-                self._accumulator.add(rb)
-                self.task_notifier.notify(states.STARTED, details={
-                    'context': context,
-                    'flow': self,
-                    'runner': runner,
-                })
-                if not simulate_run:
-                    result = runner(context, *args, **kwargs)
-                else:
-                    if failed:
-                        # TODO(harlowja): make this configurable??
-                        # If we previously failed, we want to fail again at
-                        # the same place.
-                        if not result:
-                            # If no exception or exception message was provided
-                            # or captured from the previous run then we need to
-                            # form one for this task.
-                            result = "%s failed running." % (runner.task)
-                        if isinstance(result, basestring):
-                            result = exc.InvalidStateException(result)
-                        if not isinstance(result, Exception):
-                            LOG.warn("Can not raise a non-exception"
-                                     " object: %s", result)
-                            result = exc.InvalidStateException()
-                        raise result
-                # Adjust the task result in the accumulator before
-                # notifying others that the task has finished to
-                # avoid the case where a listener might throw an
-                # exception.
-                rb.result = result
-                runner.result = result
-                self.results[runner.uuid] = result
-                self.task_notifier.notify(states.SUCCESS, details={
-                    'context': context,
-                    'flow': self,
-                    'runner': runner,
-                })
-            except Exception as e:
-                runner.result = e
-                cause = utils.FlowFailure(runner, self, e)
-                with excutils.save_and_reraise_exception():
-                    # Notify any listeners that the task has errored.
-                    self.task_notifier.notify(states.FAILURE, details={
-                        'context': context,
-                        'flow': self,
-                        'runner': runner,
-                    })
-                    self.rollback(context, cause)
-
-        if len(those_finished):
-            self._change_state(context, states.RESUMING)
-            for (r, details) in those_finished:
-                # Fake running the task so that we trigger the same
-                # notifications and state changes (and rollback that
-                # would have happened in a normal flow).
-                failed = states.FAILURE in details.get('states', [])
-                result = details.get('result')
-                run_it(r, failed=failed, result=result, simulate_run=True)
-
-        self._leftoff_at = leftover
-        self._change_state(context, states.RUNNING)
-        if self.state == states.INTERRUPTED:
-            return
-
-        was_interrupted = False
-        for r in leftover:
-            r.reset()
-            run_it(r)
-            if self.state == states.INTERRUPTED:
-                was_interrupted = True
-                break
-
-        if not was_interrupted:
-            # Only gets here if everything went successfully.
-            self._change_state(context, states.SUCCESS)
-            self._leftoff_at = None
-
-    @decorators.locked
-    def reset(self):
-        super(Flow, self).reset()
-        self.results = {}
-        self.resumer = None
-        self._accumulator.reset()
-        self._reset_internals()
-
-    @decorators.locked
-    def rollback(self, context, cause):
-        # Performs basic task by task rollback by going through the reverse
-        # order that tasks have finished and asking said task to undo whatever
-        # it has done. If this flow has any parent flows then they will
-        # also be called to rollback any tasks said parents contain.
-        #
-        # Note(harlowja): if a flow can more simply revert a whole set of
-        # tasks via a simpler command then it can override this method to
-        # accomplish that.
-        #
-        # For example, if each task was creating a file in a directory, then
-        # it's easier to just remove the directory than to ask each task to
-        # delete its file individually.
-        self._change_state(context, states.REVERTING)
-        try:
-            self._accumulator.rollback(cause)
-        finally:
-            self._change_state(context, states.FAILURE)
-        # Rollback any parents flows if they exist...
-        for p in self.parents:
-            p.rollback(context, cause)
diff --git a/cinder/taskflow/states.py b/cinder/taskflow/states.py
deleted file mode 100644 (file)
index 48d320a..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-# Job states.
-CLAIMED = 'CLAIMED'
-FAILURE = 'FAILURE'
-PENDING = 'PENDING'
-RUNNING = 'RUNNING'
-SUCCESS = 'SUCCESS'
-UNCLAIMED = 'UNCLAIMED'
-
-# Flow states.
-FAILURE = FAILURE
-INTERRUPTED = 'INTERRUPTED'
-PENDING = 'PENDING'
-RESUMING = 'RESUMING'
-REVERTING = 'REVERTING'
-RUNNING = RUNNING
-STARTED = 'STARTED'
-SUCCESS = SUCCESS
-
-# Task states.
-FAILURE = FAILURE
-STARTED = STARTED
-SUCCESS = SUCCESS
diff --git a/cinder/taskflow/task.py b/cinder/taskflow/task.py
deleted file mode 100644 (file)
index 3d5d2ee..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import abc
-
-import six
-
-from cinder.taskflow import utils
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Task(object):
-    """An abstraction that defines a potential piece of work that can be
-    applied and can be reverted to undo the work as a single unit.
-    """
-    def __init__(self, name):
-        self.name = name
-        # An *immutable* input 'resource' name set this task depends
-        # on existing before this task can be applied.
-        self.requires = set()
-        # An *immutable* input 'resource' name set this task would like to
-        # depends on existing before this task can be applied (but does not
-        # strongly depend on existing).
-        self.optional = set()
-        # An *immutable* output 'resource' name set this task
-        # produces that other tasks may depend on this task providing.
-        self.provides = set()
-        # This identifies the version of the task to be ran which
-        # can be useful in resuming older versions of tasks. Standard
-        # major, minor version semantics apply.
-        self.version = (1, 0)
-
-    def __str__(self):
-        return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
-
-    @abc.abstractmethod
-    def __call__(self, context, *args, **kwargs):
-        """Activate a given task which will perform some operation and return.
-
-           This method can be used to apply some given context and given set
-           of args and kwargs to accomplish some goal. Note that the result
-           that is returned needs to be serializable so that it can be passed
-           back into this task if reverting is triggered.
-        """
-        raise NotImplementedError()
-
-    def revert(self, context, result, cause):
-        """Revert this task using the given context, result that the apply
-           provided as well as any information which may have caused
-           said reversion.
-        """
-        pass
diff --git a/cinder/taskflow/utils.py b/cinder/taskflow/utils.py
deleted file mode 100644 (file)
index cb66fef..0000000
+++ /dev/null
@@ -1,464 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import collections
-import contextlib
-import copy
-import logging
-import re
-import sys
-import threading
-import time
-import types
-import uuid as uuidlib
-
-
-from cinder.taskflow import decorators
-
-LOG = logging.getLogger(__name__)
-
-
-def get_attr(task, field, default=None):
-    if decorators.is_decorated(task):
-        # If its a decorated functor then the attributes will be either
-        # in the underlying function of the instancemethod or the function
-        # itself.
-        task = decorators.extract(task)
-    return getattr(task, field, default)
-
-
-def join(itr, with_what=","):
-    pieces = [str(i) for i in itr]
-    return with_what.join(pieces)
-
-
-def get_many_attr(obj, *attrs):
-    many = []
-    for a in attrs:
-        many.append(get_attr(obj, a, None))
-    return many
-
-
-def get_task_version(task):
-    """Gets a tasks *string* version, whether it is a task object/function."""
-    task_version = get_attr(task, 'version')
-    if isinstance(task_version, (list, tuple)):
-        task_version = join(task_version, with_what=".")
-    if task_version is not None and not isinstance(task_version, basestring):
-        task_version = str(task_version)
-    return task_version
-
-
-def get_task_name(task):
-    """Gets a tasks *string* name, whether it is a task object/function."""
-    task_name = ""
-    if isinstance(task, (types.MethodType, types.FunctionType)):
-        # If its a function look for the attributes that should have been
-        # set using the task() decorator provided in the decorators file. If
-        # those have not been set, then we should at least have enough basic
-        # information (not a version) to form a useful task name.
-        task_name = get_attr(task, 'name')
-        if not task_name:
-            name_pieces = [a for a in get_many_attr(task,
-                                                    '__module__',
-                                                    '__name__')
-                           if a is not None]
-            task_name = join(name_pieces, ".")
-    else:
-        task_name = str(task)
-    return task_name
-
-
-def is_version_compatible(version_1, version_2):
-    """Checks for major version compatibility of two *string" versions."""
-    if version_1 == version_2:
-        # Equivalent exactly, so skip the rest.
-        return True
-
-    def _convert_to_pieces(version):
-        try:
-            pieces = []
-            for p in version.split("."):
-                p = p.strip()
-                if not len(p):
-                    pieces.append(0)
-                    continue
-                # Clean off things like 1alpha, or 2b and just select the
-                # digit that starts that entry instead.
-                p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
-                if p_match:
-                    p = p_match.group(1)
-                pieces.append(int(p))
-        except (AttributeError, TypeError, ValueError):
-            pieces = []
-        return pieces
-
-    version_1_pieces = _convert_to_pieces(version_1)
-    version_2_pieces = _convert_to_pieces(version_2)
-    if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
-        return False
-
-    # Ensure major version compatibility to start.
-    major1 = version_1_pieces[0]
-    major2 = version_2_pieces[0]
-    if major1 != major2:
-        return False
-    return True
-
-
-def await(check_functor, timeout=None):
-    if timeout is not None:
-        end_time = time.time() + max(0, timeout)
-    else:
-        end_time = None
-    # Use the same/similar scheme that the python condition class uses.
-    delay = 0.0005
-    while not check_functor():
-        time.sleep(delay)
-        if end_time is not None:
-            remaining = end_time - time.time()
-            if remaining <= 0:
-                return False
-            delay = min(delay * 2, remaining, 0.05)
-        else:
-            delay = min(delay * 2, 0.05)
-    return True
-
-
-class LastFedIter(object):
-    """An iterator which yields back the first item and then yields back
-    results from the provided iterator.
-    """
-
-    def __init__(self, first, rest_itr):
-        self.first = first
-        self.rest_itr = rest_itr
-
-    def __iter__(self):
-        yield self.first
-        for i in self.rest_itr:
-            yield i
-
-
-class FlowFailure(object):
-    """When a task failure occurs the following object will be given to revert
-       and can be used to interrogate what caused the failure.
-    """
-
-    def __init__(self, runner, flow, exception):
-        self.runner = runner
-        self.flow = flow
-        self.exc = exception
-        self.exc_info = sys.exc_info()
-
-
-class RollbackTask(object):
-    """A helper task that on being called will call the underlying callable
-    tasks revert method (if said method exists).
-    """
-
-    def __init__(self, context, task, result):
-        self.task = task
-        self.result = result
-        self.context = context
-
-    def __str__(self):
-        return str(self.task)
-
-    def __call__(self, cause):
-        if ((hasattr(self.task, "revert") and
-             isinstance(self.task.revert, collections.Callable))):
-            self.task.revert(self.context, self.result, cause)
-
-
-class Runner(object):
-    """A helper class that wraps a task and can find the needed inputs for
-    the task to run, as well as providing a uuid and other useful functionality
-    for users of the task.
-
-    TODO(harlowja): replace with the task details object or a subclass of
-    that???
-    """
-
-    def __init__(self, task, uuid=None):
-        assert isinstance(task, collections.Callable)
-        self.task = task
-        self.providers = {}
-        self.runs_before = []
-        self.result = None
-        if not uuid:
-            self._id = str(uuidlib.uuid4())
-        else:
-            self._id = str(uuid)
-
-    @property
-    def uuid(self):
-        return "r-%s" % (self._id)
-
-    @property
-    def requires(self):
-        return set(get_attr(self.task, 'requires', []))
-
-    @property
-    def provides(self):
-        return set(get_attr(self.task, 'provides', []))
-
-    @property
-    def optional(self):
-        return set(get_attr(self.task, 'optional', []))
-
-    @property
-    def version(self):
-        return get_task_version(self.task)
-
-    @property
-    def name(self):
-        return get_task_name(self.task)
-
-    def reset(self):
-        self.result = None
-
-    def __str__(self):
-        lines = ["Runner: %s" % (self.name)]
-        lines.append("%s" % (self.uuid))
-        lines.append("%s" % (self.version))
-        return "; ".join(lines)
-
-    def __call__(self, *args, **kwargs):
-        # Find all of our inputs first.
-        kwargs = dict(kwargs)
-        for (k, who_made) in self.providers.iteritems():
-            if who_made.result and k in who_made.result:
-                kwargs[k] = who_made.result[k]
-            else:
-                kwargs[k] = None
-        optional_keys = self.optional
-        optional_missing_keys = optional_keys - set(kwargs.keys())
-        if optional_missing_keys:
-            for k in optional_missing_keys:
-                for r in self.runs_before:
-                    r_provides = r.provides
-                    if k in r_provides and r.result and k in r.result:
-                        kwargs[k] = r.result[k]
-                        break
-        # And now finally run.
-        self.result = self.task(*args, **kwargs)
-        return self.result
-
-
-class TransitionNotifier(object):
-    """A utility helper class that can be used to subscribe to
-    notifications of events occurring as well as allow a entity to post said
-    notifications to subscribers.
-    """
-
-    RESERVED_KEYS = ('details',)
-    ANY = '*'
-
-    def __init__(self):
-        self._listeners = collections.defaultdict(list)
-
-    def reset(self):
-        self._listeners = collections.defaultdict(list)
-
-    def notify(self, state, details):
-        listeners = list(self._listeners.get(self.ANY, []))
-        for i in self._listeners[state]:
-            if i not in listeners:
-                listeners.append(i)
-        if not listeners:
-            return
-        for (callback, args, kwargs) in listeners:
-            if args is None:
-                args = []
-            if kwargs is None:
-                kwargs = {}
-            kwargs['details'] = details
-            try:
-                callback(state, *args, **kwargs)
-            except Exception:
-                LOG.exception(("Failure calling callback %s to notify about"
-                               " state transition %s"), callback, state)
-
-    def register(self, state, callback, args=None, kwargs=None):
-        assert isinstance(callback, collections.Callable)
-        for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
-            if cb is callback:
-                raise ValueError("Callback %s already registered" % (callback))
-        if kwargs:
-            for k in self.RESERVED_KEYS:
-                if k in kwargs:
-                    raise KeyError(("Reserved key '%s' not allowed in "
-                                    "kwargs") % k)
-            kwargs = copy.copy(kwargs)
-        if args:
-            args = copy.copy(args)
-        self._listeners[state].append((callback, args, kwargs))
-
-    def deregister(self, state, callback):
-        if state not in self._listeners:
-            return
-        for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
-            if cb is callback:
-                self._listeners[state].pop(i)
-                break
-
-
-class RollbackAccumulator(object):
-    """A utility class that can help in organizing 'undo' like code
-    so that said code be rolled back on failure (automatically or manually)
-    by activating rollback callables that were inserted during said codes
-    progression.
-    """
-
-    def __init__(self):
-        self._rollbacks = []
-
-    def add(self, *callables):
-        self._rollbacks.extend(callables)
-
-    def reset(self):
-        self._rollbacks = []
-
-    def __len__(self):
-        return len(self._rollbacks)
-
-    def __iter__(self):
-        # Rollbacks happen in the reverse order that they were added.
-        return reversed(self._rollbacks)
-
-    def __enter__(self):
-        return self
-
-    def rollback(self, cause):
-        LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
-        for (i, f) in enumerate(self):
-            LOG.debug("Calling rollback %s: %s", i + 1, f)
-            try:
-                f(cause)
-            except Exception:
-                LOG.exception(("Failed rolling back %s: %s due "
-                               "to inner exception."), i + 1, f)
-
-    def __exit__(self, type, value, tb):
-        if any((value, type, tb)):
-            self.rollback(value)
-
-
-class ReaderWriterLock(object):
-    """A simple reader-writer lock.
-
-    Several readers can hold the lock simultaneously, and only one writer.
-    Write locks have priority over reads to prevent write starvation.
-
-    Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
-    """
-
-    def __init__(self):
-        self.rwlock = 0
-        self.writers_waiting = 0
-        self.monitor = threading.Lock()
-        self.readers_ok = threading.Condition(self.monitor)
-        self.writers_ok = threading.Condition(self.monitor)
-
-    @contextlib.contextmanager
-    def acquire(self, read=True):
-        """Acquire a read or write lock in a context manager."""
-        try:
-            if read:
-                self.acquire_read()
-            else:
-                self.acquire_write()
-            yield self
-        finally:
-            self.release()
-
-    def acquire_read(self):
-        """Acquire a read lock.
-
-        Several threads can hold this typeof lock.
-        It is exclusive with write locks.
-        """
-
-        self.monitor.acquire()
-        while self.rwlock < 0 or self.writers_waiting:
-            self.readers_ok.wait()
-        self.rwlock += 1
-        self.monitor.release()
-
-    def acquire_write(self):
-        """Acquire a write lock.
-
-        Only one thread can hold this lock, and only when no read locks
-        are also held.
-        """
-
-        self.monitor.acquire()
-        while self.rwlock != 0:
-            self.writers_waiting += 1
-            self.writers_ok.wait()
-            self.writers_waiting -= 1
-        self.rwlock = -1
-        self.monitor.release()
-
-    def release(self):
-        """Release a lock, whether read or write."""
-
-        self.monitor.acquire()
-        if self.rwlock < 0:
-            self.rwlock = 0
-        else:
-            self.rwlock -= 1
-        wake_writers = self.writers_waiting and self.rwlock == 0
-        wake_readers = self.writers_waiting == 0
-        self.monitor.release()
-        if wake_writers:
-            self.writers_ok.acquire()
-            self.writers_ok.notify()
-            self.writers_ok.release()
-        elif wake_readers:
-            self.readers_ok.acquire()
-            self.readers_ok.notifyAll()
-            self.readers_ok.release()
-
-
-class LazyPluggable(object):
-    """A pluggable backend loaded lazily based on some value."""
-
-    def __init__(self, pivot, **backends):
-        self.__backends = backends
-        self.__pivot = pivot
-        self.__backend = None
-
-    def __get_backend(self):
-        if not self.__backend:
-            backend_name = 'sqlalchemy'
-            backend = self.__backends[backend_name]
-            if isinstance(backend, tuple):
-                name = backend[0]
-                fromlist = backend[1]
-            else:
-                name = backend
-                fromlist = backend
-
-            self.__backend = __import__(name, None, None, fromlist)
-        return self.__backend
-
-    def __getattr__(self, key):
-        backend = self.__get_backend()
-        return getattr(backend, key)
index 6109c2f8313f2dc76189eb0487869d3027918223..4d28c267555b4f2e107806efd63c168d79aea0a5 100644 (file)
@@ -30,6 +30,7 @@ import tempfile
 
 import mox
 from oslo.config import cfg
+from taskflow.engines.action_engine import engine
 
 from cinder.backup import driver as backup_driver
 from cinder.brick.iscsi import iscsi
@@ -46,7 +47,6 @@ from cinder.openstack.common.notifier import test_notifier
 from cinder.openstack.common import rpc
 import cinder.policy
 from cinder import quota
-from cinder.taskflow.patterns import linear_flow
 from cinder import test
 from cinder.tests.brick.fake_lvm import FakeBrickLVM
 from cinder.tests import conf_fixture
@@ -59,7 +59,6 @@ import cinder.volume
 from cinder.volume import configuration as conf
 from cinder.volume import driver
 from cinder.volume.drivers import lvm
-from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volutils
 
@@ -465,7 +464,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
                        lambda *args, **kwargs: None)
 
-        orig_flow = linear_flow.Flow.run
+        orig_flow = engine.ActionEngine.run
 
         def mock_flow_run(*args, **kwargs):
             # ensure the lock has been taken
@@ -492,7 +491,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         admin_ctxt = context.get_admin_context()
 
         # mock the flow runner so we can do some checks
-        self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+        self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
 
         # locked
         self.volume.create_volume(self.context, volume_id=dst_vol_id,
@@ -528,7 +527,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         # mock the synchroniser so we can record events
         self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
 
-        orig_flow = linear_flow.Flow.run
+        orig_flow = engine.ActionEngine.run
 
         def mock_flow_run(*args, **kwargs):
             # ensure the lock has been taken
@@ -551,7 +550,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         admin_ctxt = context.get_admin_context()
 
         # mock the flow runner so we can do some checks
-        self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+        self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
 
         # locked
         self.volume.create_volume(self.context, volume_id=dst_vol_id,
@@ -1816,13 +1815,6 @@ class VolumeTestCase(BaseVolumeTestCase):
         def fake_create_volume(*args, **kwargs):
             raise exception.CinderException('fake exception')
 
-        def fake_reschedule_or_error(self, context, *args, **kwargs):
-            self.assertFalse(context.is_admin)
-            self.assertNotIn('admin', context.roles)
-            #compare context passed in with the context we saved
-            self.assertDictMatch(self.saved_ctxt.__dict__,
-                                 context.__dict__)
-
         #create context for testing
         ctxt = self.context.deepcopy()
         if 'admin' in ctxt.roles:
@@ -1831,8 +1823,6 @@ class VolumeTestCase(BaseVolumeTestCase):
         #create one copy of context for future comparison
         self.saved_ctxt = ctxt.deepcopy()
 
-        self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
-                       fake_reschedule_or_error)
         self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
 
         volume_src = tests_utils.create_volume(self.context,
index c36ff6b6d6728610b381b800de131a8b1101355e..857c107ae719ec2bdaa4fe7427c3676d9557ff08 100644 (file)
@@ -37,15 +37,12 @@ from cinder.openstack.common import timeutils
 import cinder.policy
 from cinder import quota
 from cinder.scheduler import rpcapi as scheduler_rpcapi
-from cinder import units
 from cinder import utils
 from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
 from cinder.volume import volume_types
 
-from cinder.taskflow import states
-
 
 volume_host_opt = cfg.BoolOpt('snapshot_same_host',
                               default=True,
@@ -147,42 +144,34 @@ class API(base.Base):
                 return False
 
         create_what = {
-            'size': size,
+            'context': context,
+            'raw_size': size,
             'name': name,
             'description': description,
             'snapshot': snapshot,
             'image_id': image_id,
-            'volume_type': volume_type,
+            'raw_volume_type': volume_type,
             'metadata': metadata,
-            'availability_zone': availability_zone,
+            'raw_availability_zone': availability_zone,
             'source_volume': source_volume,
             'scheduler_hints': scheduler_hints,
             'key_manager': self.key_manager,
             'backup_source_volume': backup_source_volume,
         }
-        (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
-                                                  self.volume_rpcapi,
-                                                  self.db,
-                                                  self.image_service,
-                                                  check_volume_az_zone,
-                                                  create_what)
-
-        assert flow, _('Create volume flow not retrieved')
-        flow.run(context)
-        if flow.state != states.SUCCESS:
-            raise exception.CinderException(_("Failed to successfully complete"
-                                              " create volume workflow"))
-
-        # Extract the volume information from the task uuid that was specified
-        # to produce said information.
-        volume = None
+
         try:
-            volume = flow.results[uuid]['volume']
-        except KeyError:
-            pass
+            flow_engine = create_volume.get_api_flow(self.scheduler_rpcapi,
+                                                     self.volume_rpcapi,
+                                                     self.db,
+                                                     self.image_service,
+                                                     check_volume_az_zone,
+                                                     create_what)
+        except Exception:
+            raise exception.CinderException(
+                _("Failed to create api volume flow"))
 
-        # Raise an error, nobody provided it??
-        assert volume, _('Expected volume result not found')
+        flow_engine.run()
+        volume = flow_engine.storage.fetch('volume')
         return volume
 
     @wrap_check_policy
index 54ce06ad098fd023f36807240b1083394ecae9de..26c4805c26662244e0614b08d9253fea4081a0e6 100644 (file)
@@ -15,7 +15,7 @@
 #    under the License.
 
 # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
-from cinder.taskflow import task
+from taskflow import task
 
 
 def _make_task_name(cls, addons=None):
@@ -34,28 +34,7 @@ class CinderTask(task.Task):
     implement the given task as the task name.
     """
 
-    def __init__(self, addons=None):
+    def __init__(self, addons=None, **kwargs):
         super(CinderTask, self).__init__(_make_task_name(self.__class__,
-                                                         addons))
-
-
-class InjectTask(CinderTask):
-    """This injects a dict into the flow.
-
-    This injection is done so that the keys (and values) provided can be
-    dependended on by tasks further down the line. Since taskflow is dependency
-    based this can be considered the bootstrapping task that provides an
-    initial set of values for other tasks to get started with. If this did not
-    exist then tasks would fail locating there dependent tasks and the values
-    said dependent tasks produce.
-
-    Reversion strategy: N/A
-    """
-
-    def __init__(self, inject_what, addons=None):
-        super(InjectTask, self).__init__(addons=addons)
-        self.provides.update(inject_what.keys())
-        self._inject = inject_what
-
-    def __call__(self, context):
-        return dict(self._inject)
+                                                         addons),
+                                         **kwargs)
index d4fa8a20c991c491e50e1a02c02527f234ea69bc..7f3a6efa28a8655aff0e73831d2a40dd45cf703a 100644 (file)
 import traceback
 
 from oslo.config import cfg
+import taskflow.engines
+from taskflow.patterns import linear_flow
+from taskflow import task
+from taskflow.utils import misc
 
 from cinder import exception
 from cinder.image import glance
@@ -34,13 +38,9 @@ from cinder.openstack.common import strutils
 from cinder.openstack.common import timeutils
 from cinder import policy
 from cinder import quota
-from cinder.taskflow import decorators
-from cinder.taskflow.patterns import linear_flow
-from cinder.taskflow import task
 from cinder import units
 from cinder import utils
 from cinder.volume.flows import base
-from cinder.volume.flows import utils as flow_utils
 from cinder.volume import utils as volume_utils
 from cinder.volume import volume_types
 
@@ -86,16 +86,6 @@ def _make_pretty_name(method):
     return ".".join(meth_pieces)
 
 
-def _find_result_spec(flow):
-    """Find the last task that produced a valid volume_spec and returns it."""
-    for there_result in flow.results.values():
-        if not there_result or not 'volume_spec' in there_result:
-            continue
-        if there_result['volume_spec']:
-            return there_result['volume_spec']
-    return None
-
-
 def _restore_source_status(context, db, volume_spec):
     # NOTE(harlowja): Only if the type of the volume that was being created is
     # the source volume type should we try to reset the source volume status
@@ -171,25 +161,16 @@ class ExtractVolumeRequestTask(base.CinderTask):
     Reversion strategy: N/A
     """
 
-    def __init__(self, image_service, az_check_functor=None):
-        super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
-        # This task will produce the following outputs (said outputs can be
-        # saved to durable storage in the future so that the flow can be
-        # reconstructed elsewhere and continued).
-        self.provides.update(['availability_zone', 'size', 'snapshot_id',
-                              'source_volid', 'volume_type', 'volume_type_id',
-                              'encryption_key_id'])
-        # This task requires the following inputs to operate (provided
-        # automatically to __call__(). This is done so that the flow can
-        # be reconstructed elsewhere and continue running (in the future).
-        #
-        # It also is used to be able to link tasks that produce items to tasks
-        # that consume items (thus allowing the linking of the flow to be
-        # mostly automatic).
-        self.requires.update(['availability_zone', 'image_id', 'metadata',
-                              'size', 'snapshot', 'source_volume',
-                              'volume_type', 'key_manager',
-                              'backup_source_volume'])
+    # This task will produce the following outputs (said outputs can be
+    # saved to durable storage in the future so that the flow can be
+    # reconstructed elsewhere and continued).
+    default_provides = set(['availability_zone', 'size', 'snapshot_id',
+                            'source_volid', 'volume_type', 'volume_type_id',
+                            'encryption_key_id'])
+
+    def __init__(self, image_service, az_check_functor=None, **kwargs):
+        super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
+                                                       **kwargs)
         self.image_service = image_service
         self.az_check_functor = az_check_functor
         if not self.az_check_functor:
@@ -451,9 +432,9 @@ class ExtractVolumeRequestTask(base.CinderTask):
 
         return volume_type_id
 
-    def __call__(self, context, size, snapshot, image_id, source_volume,
-                 availability_zone, volume_type, metadata,
-                 key_manager, backup_source_volume):
+    def execute(self, context, size, snapshot, image_id, source_volume,
+                availability_zone, volume_type, metadata,
+                key_manager, backup_source_volume):
 
         utils.check_exclusive_options(snapshot=snapshot,
                                       imageRef=image_id,
@@ -519,16 +500,18 @@ class EntryCreateTask(base.CinderTask):
     Reversion strategy: remove the volume_id created from the database.
     """
 
+    default_provides = set(['volume_properties', 'volume_id', 'volume'])
+
     def __init__(self, db):
-        super(EntryCreateTask, self).__init__(addons=[ACTION])
+        requires = ['availability_zone', 'description', 'metadata',
+                    'name', 'reservations', 'size', 'snapshot_id',
+                    'source_volid', 'volume_type_id', 'encryption_key_id']
+        super(EntryCreateTask, self).__init__(addons=[ACTION],
+                                              requires=requires)
         self.db = db
-        self.requires.update(['availability_zone', 'description', 'metadata',
-                              'name', 'reservations', 'size', 'snapshot_id',
-                              'source_volid', 'volume_type_id',
-                              'encryption_key_id'])
-        self.provides.update(['volume_properties', 'volume_id'])
+        self.provides.update()
 
-    def __call__(self, context, **kwargs):
+    def execute(self, context, **kwargs):
         """Creates a database entry for the given inputs and returns details.
 
         Accesses the database and creates a new entry for the to be created
@@ -569,9 +552,9 @@ class EntryCreateTask(base.CinderTask):
             'volume': volume,
         }
 
-    def revert(self, context, result, cause):
+    def revert(self, context, result, **kwargs):
         # We never produced a result and therefore can't destroy anything.
-        if not result:
+        if isinstance(result, misc.Failure):
             return
         if context.quota_committed:
             # Committed quota doesn't rollback as the volume has already been
@@ -603,12 +586,12 @@ class QuotaReserveTask(base.CinderTask):
     an automated or manual process.
     """
 
+    default_provides = set(['reservations'])
+
     def __init__(self):
         super(QuotaReserveTask, self).__init__(addons=[ACTION])
-        self.requires.update(['size', 'volume_type_id'])
-        self.provides.update(['reservations'])
 
-    def __call__(self, context, size, volume_type_id):
+    def execute(self, context, size, volume_type_id):
         try:
             reserve_opts = {'volumes': 1, 'gigabytes': size}
             QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
@@ -648,15 +631,14 @@ class QuotaReserveTask(base.CinderTask):
                         "already consumed)")
                 LOG.warn(msg % {'s_pid': context.project_id,
                                 'd_consumed': _consumed('volumes')})
-                allowed = quotas['volumes']
                 raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
             else:
                 # If nothing was reraised, ensure we reraise the initial error
                 raise
 
-    def revert(self, context, result, cause):
+    def revert(self, context, result, **kwargs):
         # We never produced a result and therefore can't destroy anything.
-        if not result:
+        if isinstance(result, misc.Failure):
             return
         if context.quota_committed:
             # The reservations have already been committed and can not be
@@ -691,16 +673,15 @@ class QuotaCommitTask(base.CinderTask):
 
     def __init__(self):
         super(QuotaCommitTask, self).__init__(addons=[ACTION])
-        self.requires.update(['reservations', 'volume_properties'])
 
-    def __call__(self, context, reservations, volume_properties):
+    def execute(self, context, reservations, volume_properties):
         QUOTAS.commit(context, reservations)
         context.quota_committed = True
         return {'volume_properties': volume_properties}
 
-    def revert(self, context, result, cause):
+    def revert(self, context, result, **kwargs):
         # We never produced a result and therefore can't destroy anything.
-        if not result:
+        if isinstance(result, misc.Failure):
             return
         volume = result['volume_properties']
         try:
@@ -729,13 +710,14 @@ class VolumeCastTask(base.CinderTask):
     """
 
     def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
-        super(VolumeCastTask, self).__init__(addons=[ACTION])
+        requires = ['image_id', 'scheduler_hints', 'snapshot_id',
+                    'source_volid', 'volume_id', 'volume_type',
+                    'volume_properties']
+        super(VolumeCastTask, self).__init__(addons=[ACTION],
+                                             requires=requires)
         self.volume_rpcapi = volume_rpcapi
         self.scheduler_rpcapi = scheduler_rpcapi
         self.db = db
-        self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
-                              'source_volid', 'volume_id', 'volume_type',
-                              'volume_properties'])
 
     def _cast_create_volume(self, context, request_spec, filter_properties):
         source_volid = request_spec['source_volid']
@@ -786,7 +768,7 @@ class VolumeCastTask(base.CinderTask):
                 image_id=image_id,
                 source_volid=source_volid)
 
-    def __call__(self, context, **kwargs):
+    def execute(self, context, **kwargs):
         scheduler_hints = kwargs.pop('scheduler_hints', None)
         request_spec = kwargs.copy()
         filter_properties = {}
@@ -794,6 +776,20 @@ class VolumeCastTask(base.CinderTask):
             filter_properties['scheduler_hints'] = scheduler_hints
         self._cast_create_volume(context, request_spec, filter_properties)
 
+    def revert(self, context, result, flow_failures, **kwargs):
+        if isinstance(result, misc.Failure):
+            return
+
+        # Restore the source volume status and set the volume to error status.
+        volume_id = kwargs['volume_id']
+        _restore_source_status(context, self.db, kwargs)
+        _error_out_volume(context, self.db, volume_id)
+        LOG.error(_("Volume %s: create failed"), volume_id)
+        exc_info = False
+        if all(flow_failures[-1].exc_info):
+            exc_info = flow_failures[-1].exc_info
+        LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
 
 class OnFailureChangeStatusTask(base.CinderTask):
     """Helper task that sets a volume id to status error.
@@ -808,33 +804,24 @@ class OnFailureChangeStatusTask(base.CinderTask):
     def __init__(self, db):
         super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
         self.db = db
-        self.requires.update(['volume_id'])
-        self.optional.update(['volume_spec'])
 
-    def __call__(self, context, volume_id, volume_spec=None):
+    def execute(self, context, volume_id, volume_spec):
         # Save these items since we only use them if a reversion is triggered.
         return {
             'volume_id': volume_id,
             'volume_spec': volume_spec,
         }
 
-    def revert(self, context, result, cause):
+    def revert(self, context, result, flow_failures, **kwargs):
+        if isinstance(result, misc.Failure):
+            return
         volume_spec = result.get('volume_spec')
-        if not volume_spec:
-            # Attempt to use it from a later task that *should* have populated
-            # this from the database. It is not needed to be found since
-            # reverting will continue without it.
-            volume_spec = _find_result_spec(cause.flow)
 
         # Restore the source volume status and set the volume to error status.
         volume_id = result['volume_id']
         _restore_source_status(context, self.db, volume_spec)
-        _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+        _error_out_volume(context, self.db, volume_id)
         LOG.error(_("Volume %s: create failed"), volume_id)
-        exc_info = False
-        if all(cause.exc_info):
-            exc_info = cause.exc_info
-        LOG.error(_('Unexpected build error:'), exc_info=exc_info)
 
 
 class OnFailureRescheduleTask(base.CinderTask):
@@ -846,10 +833,10 @@ class OnFailureRescheduleTask(base.CinderTask):
     """
 
     def __init__(self, reschedule_context, db, scheduler_rpcapi):
-        super(OnFailureRescheduleTask, self).__init__(addons=[ACTION])
-        self.requires.update(['filter_properties', 'image_id', 'request_spec',
-                              'snapshot_id', 'volume_id'])
-        self.optional.update(['volume_spec'])
+        requires = ['filter_properties', 'image_id', 'request_spec',
+                    'snapshot_id', 'volume_id', 'context']
+        super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
+                                                      requires=requires)
         self.scheduler_rpcapi = scheduler_rpcapi
         self.db = db
         self.reschedule_context = reschedule_context
@@ -878,27 +865,8 @@ class OnFailureRescheduleTask(base.CinderTask):
             exception.ImageUnacceptable,
         ]
 
-    def _is_reschedulable(self, cause):
-        # Figure out the type of the causes exception and compare it against
-        # our black-list of exception types that will not cause rescheduling.
-        exc_type, value = cause.exc_info[:2]
-        # If we don't have a type from exc_info but we do have a exception in
-        # the cause, try to get the type from that instead.
-        if not value:
-            value = cause.exc
-        if not exc_type and value:
-            exc_type = type(value)
-        if exc_type and exc_type in self.no_reschedule_types:
-            return False
-        # Couldn't figure it out, by default assume whatever the cause was can
-        # be fixed by rescheduling.
-        #
-        # NOTE(harlowja): Crosses fingers.
-        return True
-
-    def __call__(self, context, *args, **kwargs):
-        # Save these items since we only use them if a reversion is triggered.
-        return kwargs.copy()
+    def execute(self, **kwargs):
+        pass
 
     def _reschedule(self, context, cause, request_spec, filter_properties,
                     snapshot_id, image_id, volume_id, **kwargs):
@@ -919,7 +887,7 @@ class OnFailureRescheduleTask(base.CinderTask):
                   {'volume_id': volume_id,
                    'method': _make_pretty_name(create_volume),
                    'num': num_attempts,
-                   'reason': _exception_to_unicode(cause.exc)})
+                   'reason': cause.exception_str})
 
         if all(cause.exc_info):
             # Stringify to avoid circular ref problem in json serialization
@@ -958,83 +926,23 @@ class OnFailureRescheduleTask(base.CinderTask):
             LOG.exception(_("Volume %s: resetting 'creating' status failed"),
                           volume_id)
 
-    def revert(self, context, result, cause):
-        volume_spec = result.get('volume_spec')
-        if not volume_spec:
-            # Find it from a prior task that populated this from the database.
-            volume_spec = _find_result_spec(cause.flow)
-        volume_id = result['volume_id']
+    def revert(self, context, result, flow_failures, **kwargs):
+        # Check if we have a cause which can tell us not to reschedule.
+        for failure in flow_failures.values():
+            if failure.check(self.no_reschedule_types):
+                return
 
+        volume_id = kwargs['volume_id']
         # Use a different context when rescheduling.
         if self.reschedule_context:
             context = self.reschedule_context
-
-        # If we are now supposed to reschedule (or unable to), then just
-        # restore the source volume status and set the volume to error status.
-        def do_error_revert():
-            LOG.debug(_("Failing volume %s creation by altering volume status"
-                        " instead of rescheduling"), volume_id)
-            _restore_source_status(context, self.db, volume_spec)
-            _error_out_volume(context, self.db, volume_id, reason=cause.exc)
-            LOG.error(_("Volume %s: create failed"), volume_id)
-
-        # Check if we have a cause which can tell us not to reschedule.
-        if not self._is_reschedulable(cause):
-            do_error_revert()
-        else:
             try:
+                cause = list(flow_failures.values())[0]
                 self._pre_reschedule(context, volume_id)
-                self._reschedule(context, cause, **result)
+                self._reschedule(context, cause, **kwargs)
                 self._post_reschedule(context, volume_id)
             except exception.CinderException:
                 LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
-                # NOTE(harlowja): Do error volume status changing instead.
-                do_error_revert()
-        exc_info = False
-        if all(cause.exc_info):
-            exc_info = cause.exc_info
-        LOG.error(_('Unexpected build error:'), exc_info=exc_info)
-
-
-class NotifySchedulerFailureTask(base.CinderTask):
-    """Helper task that notifies some external service on failure.
-
-    Reversion strategy: On failure of any flow that includes this task the
-    request specification associated with that flow will be extracted and
-    sent as a payload to the notification service under the given methods
-    scheduler topic.
-    """
-
-    def __init__(self, method):
-        super(NotifySchedulerFailureTask, self).__init__(addons=[ACTION])
-        self.requires.update(['request_spec', 'volume_id'])
-        self.method = method
-        self.topic = 'scheduler.%s' % self.method
-        self.publisher_id = notifier.publisher_id("scheduler")
-
-    def __call__(self, context, **kwargs):
-        # Save these items since we only use them if a reversion is triggered.
-        return kwargs.copy()
-
-    def revert(self, context, result, cause):
-        request_spec = result['request_spec']
-        volume_id = result['volume_id']
-        volume_properties = request_spec['volume_properties']
-        payload = {
-            'request_spec': request_spec,
-            'volume_properties': volume_properties,
-            'volume_id': volume_id,
-            'state': 'error',
-            'method': self.method,
-            'reason': unicode(cause.exc),
-        }
-        try:
-            notifier.notify(context, self.publisher_id, self.topic,
-                            notifier.ERROR, payload)
-        except exception.CinderException:
-            LOG.exception(_("Failed notifying on %(topic)s "
-                            "payload %(payload)s") % {'topic': self.topic,
-                                                      'payload': payload})
 
 
 class ExtractSchedulerSpecTask(base.CinderTask):
@@ -1043,12 +951,12 @@ class ExtractSchedulerSpecTask(base.CinderTask):
     Reversion strategy: N/A
     """
 
-    def __init__(self, db):
-        super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION])
+    default_provides = set(['request_spec'])
+
+    def __init__(self, db, **kwargs):
+        super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
+                                                       **kwargs)
         self.db = db
-        self.requires.update(['image_id', 'request_spec', 'snapshot_id',
-                              'volume_id'])
-        self.provides.update(['request_spec'])
 
     def _populate_request_spec(self, context, volume_id, snapshot_id,
                                image_id):
@@ -1077,8 +985,8 @@ class ExtractSchedulerSpecTask(base.CinderTask):
             'volume_type': list(dict(vol_type).iteritems()),
         }
 
-    def __call__(self, context, request_spec, volume_id, snapshot_id,
-                 image_id):
+    def execute(self, context, request_spec, volume_id, snapshot_id,
+                image_id):
         # For RPC version < 1.2 backward compatibility
         if request_spec is None:
             request_spec = self._populate_request_spec(context, volume_id,
@@ -1088,6 +996,33 @@ class ExtractSchedulerSpecTask(base.CinderTask):
         }
 
 
+class ExtractVolumeRefTask(base.CinderTask):
+    """Extracts volume reference for given volume id. """
+
+    default_provides = 'volume_ref'
+
+    def __init__(self, db):
+        super(ExtractVolumeRefTask, self).__init__(addons=[ACTION])
+        self.db = db
+
+    def execute(self, context, volume_id):
+        # NOTE(harlowja): this will fetch the volume from the database, if
+        # the volume has been deleted before we got here then this should fail.
+        #
+        # In the future we might want to have a lock on the volume_id so that
+        # the volume can not be deleted while its still being created?
+        volume_ref = self.db.volume_get(context, volume_id)
+
+        return volume_ref
+
+    def revert(self, context, volume_id, result, **kwargs):
+        if isinstance(result, misc.Failure):
+            return
+
+        _error_out_volume(context, self.db, volume_id)
+        LOG.error(_("Volume %s: create failed"), volume_id)
+
+
 class ExtractVolumeSpecTask(base.CinderTask):
     """Extracts a spec of a volume to be created into a common structure.
 
@@ -1099,22 +1034,17 @@ class ExtractVolumeSpecTask(base.CinderTask):
     Reversion strategy: N/A
     """
 
+    default_provides = 'volume_spec'
+
     def __init__(self, db):
-        super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION])
+        requires = ['image_id', 'snapshot_id', 'source_volid']
+        super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
+                                                    requires=requires)
         self.db = db
-        self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
-                              'source_volid', 'volume_id'])
-        self.provides.update(['volume_spec', 'volume_ref'])
 
-    def __call__(self, context, volume_id, **kwargs):
+    def execute(self, context, volume_ref, **kwargs):
         get_remote_image_service = glance.get_remote_image_service
 
-        # NOTE(harlowja): this will fetch the volume from the database, if
-        # the volume has been deleted before we got here then this should fail.
-        #
-        # In the future we might want to have a lock on the volume_id so that
-        # the volume can not be deleted while its still being created?
-        volume_ref = self.db.volume_get(context, volume_id)
         volume_name = volume_ref['name']
         volume_size = utils.as_int(volume_ref['size'], quiet=False)
 
@@ -1173,21 +1103,14 @@ class ExtractVolumeSpecTask(base.CinderTask):
                 'image_service': image_service,
             })
 
-        return {
-            'volume_spec': specs,
-            # NOTE(harlowja): it appears like further usage of this volume_ref
-            # result actually depend on it being a sqlalchemy object and not
-            # just a plain dictionary so thats why we are storing this here.
-            #
-            # It was attempted to refetch it when needed in subsequent tasks,
-            # but that caused sqlalchemy errors to occur (volume already open
-            # or similar).
-            #
-            # In the future where this task could fail and be recovered from we
-            # will need to store the volume_spec and recreate the volume_ref
-            # on demand.
-            'volume_ref': volume_ref,
-        }
+        return specs
+
+    def revert(self, context, result, **kwargs):
+        if isinstance(result, misc.Failure):
+            return
+        volume_spec = result.get('volume_spec')
+        # Restore the source volume status and set the volume to error status.
+        _restore_source_status(context, self.db, volume_spec)
 
 
 class NotifyVolumeActionTask(base.CinderTask):
@@ -1199,12 +1122,11 @@ class NotifyVolumeActionTask(base.CinderTask):
     def __init__(self, db, host, event_suffix):
         super(NotifyVolumeActionTask, self).__init__(addons=[ACTION,
                                                              event_suffix])
-        self.requires.update(['volume_ref'])
         self.db = db
         self.event_suffix = event_suffix
         self.host = host
 
-    def __call__(self, context, volume_ref):
+    def execute(self, context, volume_ref):
         volume_id = volume_ref['id']
         try:
             volume_utils.notify_about_volume_usage(context, volume_ref,
@@ -1226,11 +1148,12 @@ class CreateVolumeFromSpecTask(base.CinderTask):
     Reversion strategy: N/A
     """
 
+    default_provides = 'volume'
+
     def __init__(self, db, host, driver):
         super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
         self.db = db
         self.driver = driver
-        self.requires.update(['volume_spec', 'volume_ref'])
         # This maps the different volume specification types into the methods
         # that can create said volume type (aka this is a jump table).
         self._create_func_mapping = {
@@ -1472,7 +1395,7 @@ class CreateVolumeFromSpecTask(base.CinderTask):
     def _create_raw_volume(self, context, volume_ref, **kwargs):
         return self.driver.create_volume(volume_ref)
 
-    def __call__(self, context, volume_ref, volume_spec):
+    def execute(self, context, volume_ref, volume_spec):
         # we can't do anything if the driver didn't init
         if not self.driver.initialized:
             LOG.error(_("Unable to create volume, driver not initialized"))
@@ -1538,6 +1461,8 @@ class CreateVolumeFromSpecTask(base.CinderTask):
                               {'volume_id': volume_id, 'model': model_update})
                 raise exception.ExportFailure(reason=ex)
 
+        return volume_ref
+
 
 class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
     """On successful volume creation this will perform final volume actions.
@@ -1553,13 +1478,12 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
 
     def __init__(self, db, host, event_suffix):
         super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
-        self.requires.update(['volume_spec'])
         self.status_translation = {
             'migration_target_creating': 'migration_target',
         }
 
-    def __call__(self, context, volume_ref, volume_spec):
-        volume_id = volume_ref['id']
+    def execute(self, context, volume, volume_spec):
+        volume_id = volume['id']
         new_status = self.status_translation.get(volume_spec.get('status'),
                                                  'available')
         update = {
@@ -1573,7 +1497,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
             # 'building' if this fails)??
             volume_ref = self.db.volume_update(context, volume_id, update)
             # Now use the parent to notify.
-            super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
+            super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
         except exception.CinderException:
             LOG.exception(_("Failed updating volume %(volume_id)s with "
                             "%(update)s") % {'volume_id': volume_id,
@@ -1605,31 +1529,26 @@ def get_api_flow(scheduler_rpcapi, volume_rpcapi, db,
     flow_name = ACTION.replace(":", "_") + "_api"
     api_flow = linear_flow.Flow(flow_name)
 
-    # This injects the initial starting flow values into the workflow so that
-    # the dependency order of the tasks provides/requires can be correctly
-    # determined.
-    api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
-    api_flow.add(ExtractVolumeRequestTask(image_service,
-                                          az_check_functor))
-    api_flow.add(QuotaReserveTask())
-    v_uuid = api_flow.add(EntryCreateTask(db))
-    api_flow.add(QuotaCommitTask())
-
-    # If after committing something fails, ensure we set the db to failure
-    # before reverting any prior tasks.
-    api_flow.add(OnFailureChangeStatusTask(db))
+    api_flow.add(ExtractVolumeRequestTask(
+        image_service,
+        az_check_functor,
+        rebind={'size': 'raw_size',
+                'availability_zone': 'raw_availability_zone',
+                'volume_type': 'raw_volume_type'}))
+    api_flow.add(QuotaReserveTask(),
+                 EntryCreateTask(db),
+                 QuotaCommitTask())
 
     # This will cast it out to either the scheduler or volume manager via
     # the rpc apis provided.
     api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
 
-    # Note(harlowja): this will return the flow as well as the uuid of the
-    # task which will produce the 'volume' database reference (since said
-    # reference is returned to other callers in the api for further usage).
-    return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
+    # Now load (but do not run) the flow using the provided initial data.
+    return taskflow.engines.load(api_flow, store=create_what)
 
 
-def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
+def get_scheduler_flow(context, db, driver, request_spec=None,
+                       filter_properties=None,
                        volume_id=None, snapshot_id=None, image_id=None):
 
     """Constructs and returns the scheduler entrypoint flow.
@@ -1643,29 +1562,23 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
     4. Uses provided driver to to then select and continue processing of
        volume request.
     """
-
-    flow_name = ACTION.replace(":", "_") + "_scheduler"
-    scheduler_flow = linear_flow.Flow(flow_name)
-
-    # This injects the initial starting flow values into the workflow so that
-    # the dependency order of the tasks provides/requires can be correctly
-    # determined.
-    scheduler_flow.add(base.InjectTask({
-        'request_spec': request_spec,
+    create_what = {
+        'context': context,
+        'raw_request_spec': request_spec,
         'filter_properties': filter_properties,
         'volume_id': volume_id,
         'snapshot_id': snapshot_id,
         'image_id': image_id,
-    }, addons=[ACTION]))
+    }
 
-    # This will extract and clean the spec from the starting values.
-    scheduler_flow.add(ExtractSchedulerSpecTask(db))
+    flow_name = ACTION.replace(":", "_") + "_scheduler"
+    scheduler_flow = linear_flow.Flow(flow_name)
 
-    # The decorator application here ensures that the method gets the right
-    # requires attributes automatically by examining the underlying functions
-    # arguments.
+    # This will extract and clean the spec from the starting values.
+    scheduler_flow.add(ExtractSchedulerSpecTask(
+        db,
+        rebind={'request_spec': 'raw_request_spec'}))
 
-    @decorators.task
     def schedule_create_volume(context, request_spec, filter_properties):
 
         def _log_failure(cause):
@@ -1711,16 +1624,16 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
                 _log_failure(e)
                 _error_out_volume(context, db, volume_id, reason=e)
 
-    scheduler_flow.add(schedule_create_volume)
+    scheduler_flow.add(task.FunctorTask(schedule_create_volume))
 
-    return flow_utils.attach_debug_listeners(scheduler_flow)
+    # Now load (but do not run) the flow using the provided initial data.
+    return taskflow.engines.load(scheduler_flow, store=create_what)
 
 
-def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
-                     request_spec=None, filter_properties=None,
-                     allow_reschedule=True,
-                     snapshot_id=None, image_id=None, source_volid=None,
-                     reschedule_context=None):
+def get_manager_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
+                     allow_reschedule, reschedule_context, request_spec,
+                     filter_properties, snapshot_id=None, image_id=None,
+                     source_volid=None):
     """Constructs and returns the manager entrypoint flow.
 
     This flow will do the following:
@@ -1739,44 +1652,29 @@ def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
     flow_name = ACTION.replace(":", "_") + "_manager"
     volume_flow = linear_flow.Flow(flow_name)
 
-    # Determine if we are allowed to reschedule since this affects how
-    # failures will be handled.
-    if not filter_properties:
-        filter_properties = {}
-    if not request_spec and allow_reschedule:
-        LOG.debug(_("No request spec, will not reschedule"))
-        allow_reschedule = False
-    if not filter_properties.get('retry', None) and allow_reschedule:
-        LOG.debug(_("No retry filter property or associated "
-                    "retry info, will not reschedule"))
-        allow_reschedule = False
-
     # This injects the initial starting flow values into the workflow so that
     # the dependency order of the tasks provides/requires can be correctly
     # determined.
-    volume_flow.add(base.InjectTask({
+    create_what = {
+        'context': context,
         'filter_properties': filter_properties,
         'image_id': image_id,
         'request_spec': request_spec,
         'snapshot_id': snapshot_id,
         'source_volid': source_volid,
         'volume_id': volume_id,
-    }, addons=[ACTION]))
-
-    # We can actually just check if we should reschedule on failure ahead of
-    # time instead of trying to determine this later, certain values are needed
-    # to reschedule and without them we should just avoid rescheduling.
-    if not allow_reschedule:
-        # On failure ensure that we just set the volume status to error.
-        LOG.debug(_("Retry info not present, will not reschedule"))
-        volume_flow.add(OnFailureChangeStatusTask(db))
-    else:
+    }
+
+    volume_flow.add(ExtractVolumeRefTask(db))
+
+    if allow_reschedule and request_spec:
         volume_flow.add(OnFailureRescheduleTask(reschedule_context,
                                                 db, scheduler_rpcapi))
 
-    volume_flow.add(ExtractVolumeSpecTask(db))
-    volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
-    volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
-    volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
+    volume_flow.add(ExtractVolumeSpecTask(db),
+                    NotifyVolumeActionTask(db, host, "create.start"),
+                    CreateVolumeFromSpecTask(db, host, driver),
+                    CreateVolumeOnFinishTask(db, host, "create.end"))
 
-    return flow_utils.attach_debug_listeners(volume_flow)
+    # Now load (but do not run) the flow using the provided initial data.
+    return taskflow.engines.load(volume_flow, store=create_what)
diff --git a/cinder/volume/flows/utils.py b/cinder/volume/flows/utils.py
deleted file mode 100644 (file)
index 6c6677e..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
-#    Copyright (c) 2013 OpenStack Foundation
-#    Copyright 2010 United States Government as represented by the
-#    Administrator of the National Aeronautics and Space Administration.
-#    All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-from cinder.openstack.common import log as logging
-
-LOG = logging.getLogger(__name__)
-
-
-def attach_debug_listeners(flow):
-    """Sets up a nice set of debug listeners for the flow.
-
-    These listeners will log when tasks/flows are transitioning from state to
-    state so that said states can be seen in the debug log output which is very
-    useful for figuring out where problems are occurring.
-    """
-
-    def flow_log_change(state, details):
-        # TODO(harlowja): the bug 1214083 is causing problems
-        LOG.debug(_("%(flow)s has moved into state %(state)s from state"
-                    " %(old_state)s") % {'state': state,
-                                         'old_state': details.get('old_state'),
-                                         'flow': str(details['flow'])})
-
-    def task_log_change(state, details):
-        # TODO(harlowja): the bug 1214083 is causing problems
-        LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
-                    " result: %(result)s") % {'state': state,
-                                              'flow': str(details['flow']),
-                                              'runner': str(details['runner']),
-                                              'result': details.get('result')})
-
-    # Register * for all state changes (and not selective state changes to be
-    # called upon) since all the changes is more useful.
-    flow.notifier.register('*', flow_log_change)
-    flow.task_notifier.register('*', task_log_change)
-    return flow
index a0f4e071d5a558d3a9cb7e65029ae91632a48d02..9d23997096eb1f40aeb8009eaf382993c78713e9 100644 (file)
@@ -61,8 +61,6 @@ from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
 from cinder.volume import volume_types
 
-from cinder.taskflow import states
-
 from eventlet.greenpool import GreenPool
 
 LOG = logging.getLogger(__name__)
@@ -289,23 +287,31 @@ class VolumeManager(manager.SchedulerDependentManager):
     def create_volume(self, context, volume_id, request_spec=None,
                       filter_properties=None, allow_reschedule=True,
                       snapshot_id=None, image_id=None, source_volid=None):
+
         """Creates and exports the volume."""
+        context_saved = context.deepcopy()
+        context = context.elevated()
+        if filter_properties is None:
+            filter_properties = {}
 
-        flow = create_volume.get_manager_flow(
-            self.db,
-            self.driver,
-            self.scheduler_rpcapi,
-            self.host,
-            volume_id,
-            request_spec=request_spec,
-            filter_properties=filter_properties,
-            allow_reschedule=allow_reschedule,
-            snapshot_id=snapshot_id,
-            image_id=image_id,
-            source_volid=source_volid,
-            reschedule_context=context.deepcopy())
-
-        assert flow, _('Manager volume flow not retrieved')
+        try:
+            flow_engine = create_volume.get_manager_flow(
+                context,
+                self.db,
+                self.driver,
+                self.scheduler_rpcapi,
+                self.host,
+                volume_id,
+                snapshot_id=snapshot_id,
+                image_id=image_id,
+                source_volid=source_volid,
+                allow_reschedule=allow_reschedule,
+                reschedule_context=context_saved,
+                request_spec=request_spec,
+                filter_properties=filter_properties)
+        except Exception:
+            raise exception.CinderException(
+                _("Failed to create manager volume flow"))
 
         if snapshot_id is not None:
             # Make sure the snapshot is not deleted until we are done with it.
@@ -317,11 +323,11 @@ class VolumeManager(manager.SchedulerDependentManager):
             locked_action = None
 
         def _run_flow():
-            flow.run(context.elevated())
-            if flow.state != states.SUCCESS:
-                msg = _("Failed to successfully complete manager volume "
-                        "workflow")
-                raise exception.CinderException(msg)
+            # This code executes create volume flow. If something goes wrong,
+            # flow reverts all job that was done and reraises an exception.
+            # Otherwise, all data that was generated by flow becomes available
+            # in flow engine's storage.
+            flow_engine.run()
 
         @utils.synchronized(locked_action, external=True)
         def _run_flow_locked():
@@ -332,8 +338,9 @@ class VolumeManager(manager.SchedulerDependentManager):
         else:
             _run_flow_locked()
 
-        self._reset_stats()
-        return volume_id
+        # Fetch created volume from storage
+        volume_ref = flow_engine.storage.fetch('volume')
+        return volume_ref['id']
 
     @utils.require_driver_initialized
     @locked_volume_operation
index d029aaea81e13aef2803d6f98a89e8ab08c4354d..cef4ecd3e1765e34d32d2ea0714d5de84c0e61dd 100644 (file)
@@ -18,6 +18,7 @@ python-keystoneclient>=0.4.1
 python-novaclient>=2.15.0
 python-swiftclient>=1.5
 Routes>=1.12.3
+taskflow>=0.1.1,<0.2
 rtslib-fb>=2.1.39
 six>=1.4.1
 SQLAlchemy>=0.7.8,<=0.7.99
diff --git a/taskflow.conf b/taskflow.conf
deleted file mode 100644 (file)
index a871ff7..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-[DEFAULT]
-
-# The list of primitives to copy from taskflow
-primitives=flow.linear_flow,task
-
-# The base module to hold the copy of taskflow
-base=cinder