- Old TaskFlow code was removed from Cinder.
- TaskFlow 0.1.1 was added to Cinder requirements.
- Create volume flows for volume.api, volume.manager and
scheduler.manager were updated to use taskFlow 0.1.1
Partially implements: blueprint create-volume-flow
Change-Id: Idbac8d001436f02978b366fbb3205ce84c847267
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
-from cinder.taskflow import states
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='cinder.scheduler.filter_scheduler.'
image_id=None, request_spec=None,
filter_properties=None):
- flow = create_volume.get_scheduler_flow(db, self.driver,
- request_spec,
- filter_properties,
- volume_id, snapshot_id,
- image_id)
- assert flow, _('Schedule volume flow not retrieved')
-
- flow.run(context)
- if flow.state != states.SUCCESS:
- LOG.warn(_("Failed to successfully complete"
- " schedule volume using flow: %s"), flow)
+ try:
+ flow_engine = create_volume.get_scheduler_flow(context,
+ db, self.driver,
+ request_spec,
+ filter_properties,
+ volume_id,
+ snapshot_id,
+ image_id)
+ except Exception:
+ raise exception.CinderException(
+ _("Failed to create scheduler manager volume flow"))
+ flow_engine.run()
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import functools
-import inspect
-import types
-
-# These arguments are ones that we will skip when parsing for requirements
-# for a function to operate (when used as a task).
-AUTO_ARGS = ('self', 'context', 'cls')
-
-
-def is_decorated(functor):
- if not isinstance(functor, (types.MethodType, types.FunctionType)):
- return False
- return getattr(extract(functor), '__task__', False)
-
-
-def extract(functor):
- # Extract the underlying functor if its a method since we can not set
- # attributes on instance methods, this is supposedly fixed in python 3
- # and later.
- #
- # TODO(harlowja): add link to this fix.
- assert isinstance(functor, (types.MethodType, types.FunctionType))
- if isinstance(functor, types.MethodType):
- return functor.__func__
- else:
- return functor
-
-
-def _mark_as_task(functor):
- setattr(functor, '__task__', True)
-
-
-def _get_wrapped(function):
- """Get the method at the bottom of a stack of decorators."""
-
- if hasattr(function, '__wrapped__'):
- return getattr(function, '__wrapped__')
-
- if not hasattr(function, 'func_closure') or not function.func_closure:
- return function
-
- def _get_wrapped_function(function):
- if not hasattr(function, 'func_closure') or not function.func_closure:
- return None
-
- for closure in function.func_closure:
- func = closure.cell_contents
-
- deeper_func = _get_wrapped_function(func)
- if deeper_func:
- return deeper_func
- elif hasattr(closure.cell_contents, '__call__'):
- return closure.cell_contents
-
- return _get_wrapped_function(function)
-
-
-def _take_arg(a):
- if a in AUTO_ARGS:
- return False
- # In certain decorator cases it seems like we get the function to be
- # decorated as an argument, we don't want to take that as a real argument.
- if isinstance(a, collections.Callable):
- return False
- return True
-
-
-def wraps(fn):
- """This will not be needed in python 3.2 or greater which already has this
- built-in to its functools.wraps method.
- """
-
- def wrapper(f):
- f = functools.wraps(fn)(f)
- f.__wrapped__ = getattr(fn, '__wrapped__', fn)
- return f
-
- return wrapper
-
-
-def locked(f):
-
- @wraps(f)
- def wrapper(self, *args, **kwargs):
- with self._lock:
- return f(self, *args, **kwargs)
-
- return wrapper
-
-
-def task(*args, **kwargs):
- """Decorates a given function and ensures that all needed attributes of
- that function are set so that the function can be used as a task.
- """
-
- def decorator(f):
- w_f = extract(f)
-
- def noop(*args, **kwargs):
- pass
-
- # Mark as being a task
- _mark_as_task(w_f)
-
- # By default don't revert this.
- w_f.revert = kwargs.pop('revert_with', noop)
-
- # Associate a name of this task that is the module + function name.
- w_f.name = "%s.%s" % (f.__module__, f.__name__)
-
- # Sets the version of the task.
- version = kwargs.pop('version', (1, 0))
- f = _versionize(*version)(f)
-
- # Attach any requirements this function needs for running.
- requires_what = kwargs.pop('requires', [])
- f = _requires(*requires_what, **kwargs)(f)
-
- # Attach any optional requirements this function needs for running.
- optional_what = kwargs.pop('optional', [])
- f = _optional(*optional_what, **kwargs)(f)
-
- # Attach any items this function provides as output
- provides_what = kwargs.pop('provides', [])
- f = _provides(*provides_what, **kwargs)(f)
-
- @wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
- # This is needed to handle when the decorator has args or the decorator
- # doesn't have args, python is rather weird here...
- if kwargs or not args:
- return decorator
- else:
- if isinstance(args[0], collections.Callable):
- return decorator(args[0])
- else:
- return decorator
-
-
-def _versionize(major, minor=None):
- """A decorator that marks the wrapped function with a major & minor version
- number.
- """
-
- if minor is None:
- minor = 0
-
- def decorator(f):
- w_f = extract(f)
- w_f.version = (major, minor)
-
- @wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
- return decorator
-
-
-def _optional(*args, **kwargs):
- """Attaches a set of items that the decorated function would like as input
- to the functions underlying dictionary.
- """
-
- def decorator(f):
- w_f = extract(f)
-
- if not hasattr(w_f, 'optional'):
- w_f.optional = set()
-
- w_f.optional.update([a for a in args if _take_arg(a)])
-
- @wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
- # This is needed to handle when the decorator has args or the decorator
- # doesn't have args, python is rather weird here...
- if kwargs or not args:
- return decorator
- else:
- if isinstance(args[0], collections.Callable):
- return decorator(args[0])
- else:
- return decorator
-
-
-def _requires(*args, **kwargs):
- """Attaches a set of items that the decorated function requires as input
- to the functions underlying dictionary.
- """
-
- def decorator(f):
- w_f = extract(f)
-
- if not hasattr(w_f, 'requires'):
- w_f.requires = set()
-
- if kwargs.pop('auto_extract', True):
- inspect_what = _get_wrapped(f)
- f_args = inspect.getargspec(inspect_what).args
- w_f.requires.update([a for a in f_args if _take_arg(a)])
-
- w_f.requires.update([a for a in args if _take_arg(a)])
-
- @wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
- # This is needed to handle when the decorator has args or the decorator
- # doesn't have args, python is rather weird here...
- if kwargs or not args:
- return decorator
- else:
- if isinstance(args[0], collections.Callable):
- return decorator(args[0])
- else:
- return decorator
-
-
-def _provides(*args, **kwargs):
- """Attaches a set of items that the decorated function provides as output
- to the functions underlying dictionary.
- """
-
- def decorator(f):
- w_f = extract(f)
-
- if not hasattr(f, 'provides'):
- w_f.provides = set()
-
- w_f.provides.update([a for a in args if _take_arg(a)])
-
- @wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
- # This is needed to handle when the decorator has args or the decorator
- # doesn't have args, python is rather weird here...
- if kwargs or not args:
- return decorator
- else:
- if isinstance(args[0], collections.Callable):
- return decorator(args[0])
- else:
- return decorator
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-class TaskFlowException(Exception):
- """Base class for exceptions emitted from this library."""
- pass
-
-
-class Duplicate(TaskFlowException):
- """Raised when a duplicate entry is found."""
- pass
-
-
-class NotFound(TaskFlowException):
- """Raised when some entry in some object doesn't exist."""
- pass
-
-
-class AlreadyExists(TaskFlowException):
- """Raised when some entry in some object already exists."""
- pass
-
-
-class ClosedException(TaskFlowException):
- """Raised when an access on a closed object occurs."""
- pass
-
-
-class InvalidStateException(TaskFlowException):
- """Raised when a task/job/workflow is in an invalid state when an
- operation is attempting to apply to said task/job/workflow.
- """
- pass
-
-
-class UnclaimableJobException(TaskFlowException):
- """Raised when a job can not be claimed."""
- pass
-
-
-class JobNotFound(TaskFlowException):
- """Raised when a job entry can not be found."""
- pass
-
-
-class MissingDependencies(InvalidStateException):
- """Raised when a task has dependencies that can not be satisfied."""
- message = ("%(task)s requires %(requirements)s but no other task produces"
- " said requirements")
-
- def __init__(self, task, requirements):
- message = self.message % {'task': task, 'requirements': requirements}
- super(MissingDependencies, self).__init__(message)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-import threading
-import uuid as uuidlib
-
-import six
-
-
-from cinder.taskflow import decorators
-from cinder.taskflow import exceptions as exc
-from cinder.taskflow import states
-from cinder.taskflow import utils
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Flow(object):
- """The base abstract class of all flow implementations.
-
- It provides a set of parents to flows that have a concept of parent flows
- as well as a state and state utility functions to the deriving classes. It
- also provides a name and an identifier (uuid or other) to the flow so that
- it can be uniquely identifed among many flows.
-
- Flows are expected to provide (if desired) the following methods:
- - add
- - add_many
- - interrupt
- - reset
- - rollback
- - run
- - soft_reset
- """
-
- # Common states that certain actions can be performed in. If the flow
- # is not in these sets of states then it is likely that the flow operation
- # can not succeed.
- RESETTABLE_STATES = set([
- states.INTERRUPTED,
- states.SUCCESS,
- states.PENDING,
- states.FAILURE,
- ])
- SOFT_RESETTABLE_STATES = set([
- states.INTERRUPTED,
- ])
- UNINTERRUPTIBLE_STATES = set([
- states.FAILURE,
- states.SUCCESS,
- states.PENDING,
- ])
- RUNNABLE_STATES = set([
- states.PENDING,
- ])
-
- def __init__(self, name, parents=None, uuid=None):
- self._name = str(name)
- # The state of this flow.
- self._state = states.PENDING
- # If this flow has a parent flow/s which need to be reverted if
- # this flow fails then please include them here to allow this child
- # to call the parents...
- if parents:
- self.parents = tuple(parents)
- else:
- self.parents = ()
- # Any objects that want to listen when a wf/task starts/stops/completes
- # or errors should be registered here. This can be used to monitor
- # progress and record tasks finishing (so that it becomes possible to
- # store the result of a task in some persistent or semi-persistent
- # storage backend).
- self.notifier = utils.TransitionNotifier()
- self.task_notifier = utils.TransitionNotifier()
- # Ensure that modifications and/or multiple runs aren't happening
- # at the same time in the same flow at the same time.
- self._lock = threading.RLock()
- # Assign this flow a unique identifer.
- if uuid:
- self._id = str(uuid)
- else:
- self._id = str(uuidlib.uuid4())
-
- @property
- def name(self):
- """A non-unique name for this flow (human readable)"""
- return self._name
-
- @property
- def uuid(self):
- """Uniquely identifies this flow"""
- return "f-%s" % (self._id)
-
- @property
- def state(self):
- """Provides a read-only view of the flow state."""
- return self._state
-
- def _change_state(self, context, new_state):
- was_changed = False
- old_state = self.state
- with self._lock:
- if self.state != new_state:
- old_state = self.state
- self._state = new_state
- was_changed = True
- if was_changed:
- # Don't notify while holding the lock.
- self.notifier.notify(self.state, details={
- 'context': context,
- 'flow': self,
- 'old_state': old_state,
- })
-
- def __str__(self):
- lines = ["Flow: %s" % (self.name)]
- lines.append("%s" % (self.uuid))
- lines.append("%s" % (len(self.parents)))
- lines.append("%s" % (self.state))
- return "; ".join(lines)
-
- @abc.abstractmethod
- def add(self, task):
- """Adds a given task to this flow.
-
- Returns the uuid that is associated with the task for later operations
- before and after it is ran.
- """
- raise NotImplementedError()
-
- @decorators.locked
- def add_many(self, tasks):
- """Adds many tasks to this flow.
-
- Returns a list of uuids (one for each task added).
- """
- uuids = []
- for t in tasks:
- uuids.append(self.add(t))
- return uuids
-
- def interrupt(self):
- """Attempts to interrupt the current flow and any tasks that are
- currently not running in the flow.
-
- Returns how many tasks were interrupted (if any).
- """
- if self.state in self.UNINTERRUPTIBLE_STATES:
- raise exc.InvalidStateException(("Can not interrupt when"
- " in state %s") % (self.state))
- # Note(harlowja): Do *not* acquire the lock here so that the flow may
- # be interrupted while running. This does mean the the above check may
- # not be valid but we can worry about that if it becomes an issue.
- old_state = self.state
- if old_state != states.INTERRUPTED:
- self._state = states.INTERRUPTED
- self.notifier.notify(self.state, details={
- 'context': None,
- 'flow': self,
- 'old_state': old_state,
- })
- return 0
-
- @decorators.locked
- def reset(self):
- """Fully resets the internal state of this flow, allowing for the flow
- to be ran again.
-
- Note: Listeners are also reset.
- """
- if self.state not in self.RESETTABLE_STATES:
- raise exc.InvalidStateException(("Can not reset when"
- " in state %s") % (self.state))
- self.notifier.reset()
- self.task_notifier.reset()
- self._change_state(None, states.PENDING)
-
- @decorators.locked
- def soft_reset(self):
- """Partially resets the internal state of this flow, allowing for the
- flow to be ran again from an interrupted state only.
- """
- if self.state not in self.SOFT_RESETTABLE_STATES:
- raise exc.InvalidStateException(("Can not soft reset when"
- " in state %s") % (self.state))
- self._change_state(None, states.PENDING)
-
- @decorators.locked
- def run(self, context, *args, **kwargs):
- """Executes the workflow."""
- if self.state not in self.RUNNABLE_STATES:
- raise exc.InvalidStateException("Unable to run flow when "
- "in state %s" % (self.state))
-
- @decorators.locked
- def rollback(self, context, cause):
- """Performs rollback of this workflow and any attached parent workflows
- if present.
- """
- pass
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import logging
-
-from cinder.openstack.common import excutils
-
-from cinder.taskflow import decorators
-from cinder.taskflow import exceptions as exc
-from cinder.taskflow import states
-from cinder.taskflow import utils
-
-from cinder.taskflow.patterns import base
-
-LOG = logging.getLogger(__name__)
-
-
-class Flow(base.Flow):
- """"A linear chain of tasks that can be applied in order as one unit and
- rolled back as one unit using the reverse order that the tasks have
- been applied in.
-
- Note(harlowja): Each task in the chain must have requirements
- which are satisfied by the previous task/s in the chain.
- """
-
- def __init__(self, name, parents=None, uuid=None):
- super(Flow, self).__init__(name, parents, uuid)
- # The tasks which have been applied will be collected here so that they
- # can be reverted in the correct order on failure.
- self._accumulator = utils.RollbackAccumulator()
- # Tasks results are stored here. Lookup is by the uuid that was
- # returned from the add function.
- self.results = {}
- # The previously left off iterator that can be used to resume from
- # the last task (if interrupted and soft-reset).
- self._leftoff_at = None
- # All runners to run are collected here.
- self._runners = []
- self._connected = False
- # The resumption strategy to use.
- self.resumer = None
-
- @decorators.locked
- def add(self, task):
- """Adds a given task to this flow."""
- assert isinstance(task, collections.Callable)
- r = utils.Runner(task)
- r.runs_before = list(reversed(self._runners))
- self._runners.append(r)
- self._reset_internals()
- return r.uuid
-
- def _reset_internals(self):
- self._connected = False
- self._leftoff_at = None
-
- def _associate_providers(self, runner):
- # Ensure that some previous task provides this input.
- who_provides = {}
- task_requires = runner.requires
- for r in task_requires:
- provider = None
- for before_me in runner.runs_before:
- if r in before_me.provides:
- provider = before_me
- break
- if provider:
- who_provides[r] = provider
- # Ensure that the last task provides all the needed input for this
- # task to run correctly.
- missing_requires = task_requires - set(who_provides.keys())
- if missing_requires:
- raise exc.MissingDependencies(runner, sorted(missing_requires))
- runner.providers.update(who_provides)
-
- def __str__(self):
- lines = ["LinearFlow: %s" % (self.name)]
- lines.append("%s" % (self.uuid))
- lines.append("%s" % (len(self._runners)))
- lines.append("%s" % (len(self.parents)))
- lines.append("%s" % (self.state))
- return "; ".join(lines)
-
- @decorators.locked
- def remove(self, uuid):
- index_removed = -1
- for (i, r) in enumerate(self._runners):
- if r.uuid == uuid:
- index_removed = i
- break
- if index_removed == -1:
- raise ValueError("No runner found with uuid %s" % (uuid))
- else:
- removed = self._runners.pop(index_removed)
- self._reset_internals()
- # Go and remove it from any runner after the removed runner since
- # those runners may have had an attachment to it.
- for r in self._runners[index_removed:]:
- try:
- r.runs_before.remove(removed)
- except (IndexError, ValueError):
- pass
-
- def __len__(self):
- return len(self._runners)
-
- def _connect(self):
- if self._connected:
- return self._runners
- for r in self._runners:
- r.providers = {}
- for r in reversed(self._runners):
- self._associate_providers(r)
- self._connected = True
- return self._runners
-
- def _ordering(self):
- return iter(self._connect())
-
- @decorators.locked
- def run(self, context, *args, **kwargs):
- super(Flow, self).run(context, *args, **kwargs)
-
- def resume_it():
- if self._leftoff_at is not None:
- return ([], self._leftoff_at)
- if self.resumer:
- (finished, leftover) = self.resumer.resume(self,
- self._ordering())
- else:
- finished = []
- leftover = self._ordering()
- return (finished, leftover)
-
- self._change_state(context, states.STARTED)
- try:
- those_finished, leftover = resume_it()
- except Exception:
- with excutils.save_and_reraise_exception():
- self._change_state(context, states.FAILURE)
-
- def run_it(runner, failed=False, result=None, simulate_run=False):
- try:
- # Add the task to be rolled back *immediately* so that even if
- # the task fails while producing results it will be given a
- # chance to rollback.
- rb = utils.RollbackTask(context, runner.task, result=None)
- self._accumulator.add(rb)
- self.task_notifier.notify(states.STARTED, details={
- 'context': context,
- 'flow': self,
- 'runner': runner,
- })
- if not simulate_run:
- result = runner(context, *args, **kwargs)
- else:
- if failed:
- # TODO(harlowja): make this configurable??
- # If we previously failed, we want to fail again at
- # the same place.
- if not result:
- # If no exception or exception message was provided
- # or captured from the previous run then we need to
- # form one for this task.
- result = "%s failed running." % (runner.task)
- if isinstance(result, basestring):
- result = exc.InvalidStateException(result)
- if not isinstance(result, Exception):
- LOG.warn("Can not raise a non-exception"
- " object: %s", result)
- result = exc.InvalidStateException()
- raise result
- # Adjust the task result in the accumulator before
- # notifying others that the task has finished to
- # avoid the case where a listener might throw an
- # exception.
- rb.result = result
- runner.result = result
- self.results[runner.uuid] = result
- self.task_notifier.notify(states.SUCCESS, details={
- 'context': context,
- 'flow': self,
- 'runner': runner,
- })
- except Exception as e:
- runner.result = e
- cause = utils.FlowFailure(runner, self, e)
- with excutils.save_and_reraise_exception():
- # Notify any listeners that the task has errored.
- self.task_notifier.notify(states.FAILURE, details={
- 'context': context,
- 'flow': self,
- 'runner': runner,
- })
- self.rollback(context, cause)
-
- if len(those_finished):
- self._change_state(context, states.RESUMING)
- for (r, details) in those_finished:
- # Fake running the task so that we trigger the same
- # notifications and state changes (and rollback that
- # would have happened in a normal flow).
- failed = states.FAILURE in details.get('states', [])
- result = details.get('result')
- run_it(r, failed=failed, result=result, simulate_run=True)
-
- self._leftoff_at = leftover
- self._change_state(context, states.RUNNING)
- if self.state == states.INTERRUPTED:
- return
-
- was_interrupted = False
- for r in leftover:
- r.reset()
- run_it(r)
- if self.state == states.INTERRUPTED:
- was_interrupted = True
- break
-
- if not was_interrupted:
- # Only gets here if everything went successfully.
- self._change_state(context, states.SUCCESS)
- self._leftoff_at = None
-
- @decorators.locked
- def reset(self):
- super(Flow, self).reset()
- self.results = {}
- self.resumer = None
- self._accumulator.reset()
- self._reset_internals()
-
- @decorators.locked
- def rollback(self, context, cause):
- # Performs basic task by task rollback by going through the reverse
- # order that tasks have finished and asking said task to undo whatever
- # it has done. If this flow has any parent flows then they will
- # also be called to rollback any tasks said parents contain.
- #
- # Note(harlowja): if a flow can more simply revert a whole set of
- # tasks via a simpler command then it can override this method to
- # accomplish that.
- #
- # For example, if each task was creating a file in a directory, then
- # it's easier to just remove the directory than to ask each task to
- # delete its file individually.
- self._change_state(context, states.REVERTING)
- try:
- self._accumulator.rollback(cause)
- finally:
- self._change_state(context, states.FAILURE)
- # Rollback any parents flows if they exist...
- for p in self.parents:
- p.rollback(context, cause)
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# Job states.
-CLAIMED = 'CLAIMED'
-FAILURE = 'FAILURE'
-PENDING = 'PENDING'
-RUNNING = 'RUNNING'
-SUCCESS = 'SUCCESS'
-UNCLAIMED = 'UNCLAIMED'
-
-# Flow states.
-FAILURE = FAILURE
-INTERRUPTED = 'INTERRUPTED'
-PENDING = 'PENDING'
-RESUMING = 'RESUMING'
-REVERTING = 'REVERTING'
-RUNNING = RUNNING
-STARTED = 'STARTED'
-SUCCESS = SUCCESS
-
-# Task states.
-FAILURE = FAILURE
-STARTED = STARTED
-SUCCESS = SUCCESS
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-
-import six
-
-from cinder.taskflow import utils
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Task(object):
- """An abstraction that defines a potential piece of work that can be
- applied and can be reverted to undo the work as a single unit.
- """
- def __init__(self, name):
- self.name = name
- # An *immutable* input 'resource' name set this task depends
- # on existing before this task can be applied.
- self.requires = set()
- # An *immutable* input 'resource' name set this task would like to
- # depends on existing before this task can be applied (but does not
- # strongly depend on existing).
- self.optional = set()
- # An *immutable* output 'resource' name set this task
- # produces that other tasks may depend on this task providing.
- self.provides = set()
- # This identifies the version of the task to be ran which
- # can be useful in resuming older versions of tasks. Standard
- # major, minor version semantics apply.
- self.version = (1, 0)
-
- def __str__(self):
- return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
-
- @abc.abstractmethod
- def __call__(self, context, *args, **kwargs):
- """Activate a given task which will perform some operation and return.
-
- This method can be used to apply some given context and given set
- of args and kwargs to accomplish some goal. Note that the result
- that is returned needs to be serializable so that it can be passed
- back into this task if reverting is triggered.
- """
- raise NotImplementedError()
-
- def revert(self, context, result, cause):
- """Revert this task using the given context, result that the apply
- provided as well as any information which may have caused
- said reversion.
- """
- pass
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import contextlib
-import copy
-import logging
-import re
-import sys
-import threading
-import time
-import types
-import uuid as uuidlib
-
-
-from cinder.taskflow import decorators
-
-LOG = logging.getLogger(__name__)
-
-
-def get_attr(task, field, default=None):
- if decorators.is_decorated(task):
- # If its a decorated functor then the attributes will be either
- # in the underlying function of the instancemethod or the function
- # itself.
- task = decorators.extract(task)
- return getattr(task, field, default)
-
-
-def join(itr, with_what=","):
- pieces = [str(i) for i in itr]
- return with_what.join(pieces)
-
-
-def get_many_attr(obj, *attrs):
- many = []
- for a in attrs:
- many.append(get_attr(obj, a, None))
- return many
-
-
-def get_task_version(task):
- """Gets a tasks *string* version, whether it is a task object/function."""
- task_version = get_attr(task, 'version')
- if isinstance(task_version, (list, tuple)):
- task_version = join(task_version, with_what=".")
- if task_version is not None and not isinstance(task_version, basestring):
- task_version = str(task_version)
- return task_version
-
-
-def get_task_name(task):
- """Gets a tasks *string* name, whether it is a task object/function."""
- task_name = ""
- if isinstance(task, (types.MethodType, types.FunctionType)):
- # If its a function look for the attributes that should have been
- # set using the task() decorator provided in the decorators file. If
- # those have not been set, then we should at least have enough basic
- # information (not a version) to form a useful task name.
- task_name = get_attr(task, 'name')
- if not task_name:
- name_pieces = [a for a in get_many_attr(task,
- '__module__',
- '__name__')
- if a is not None]
- task_name = join(name_pieces, ".")
- else:
- task_name = str(task)
- return task_name
-
-
-def is_version_compatible(version_1, version_2):
- """Checks for major version compatibility of two *string" versions."""
- if version_1 == version_2:
- # Equivalent exactly, so skip the rest.
- return True
-
- def _convert_to_pieces(version):
- try:
- pieces = []
- for p in version.split("."):
- p = p.strip()
- if not len(p):
- pieces.append(0)
- continue
- # Clean off things like 1alpha, or 2b and just select the
- # digit that starts that entry instead.
- p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
- if p_match:
- p = p_match.group(1)
- pieces.append(int(p))
- except (AttributeError, TypeError, ValueError):
- pieces = []
- return pieces
-
- version_1_pieces = _convert_to_pieces(version_1)
- version_2_pieces = _convert_to_pieces(version_2)
- if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
- return False
-
- # Ensure major version compatibility to start.
- major1 = version_1_pieces[0]
- major2 = version_2_pieces[0]
- if major1 != major2:
- return False
- return True
-
-
-def await(check_functor, timeout=None):
- if timeout is not None:
- end_time = time.time() + max(0, timeout)
- else:
- end_time = None
- # Use the same/similar scheme that the python condition class uses.
- delay = 0.0005
- while not check_functor():
- time.sleep(delay)
- if end_time is not None:
- remaining = end_time - time.time()
- if remaining <= 0:
- return False
- delay = min(delay * 2, remaining, 0.05)
- else:
- delay = min(delay * 2, 0.05)
- return True
-
-
-class LastFedIter(object):
- """An iterator which yields back the first item and then yields back
- results from the provided iterator.
- """
-
- def __init__(self, first, rest_itr):
- self.first = first
- self.rest_itr = rest_itr
-
- def __iter__(self):
- yield self.first
- for i in self.rest_itr:
- yield i
-
-
-class FlowFailure(object):
- """When a task failure occurs the following object will be given to revert
- and can be used to interrogate what caused the failure.
- """
-
- def __init__(self, runner, flow, exception):
- self.runner = runner
- self.flow = flow
- self.exc = exception
- self.exc_info = sys.exc_info()
-
-
-class RollbackTask(object):
- """A helper task that on being called will call the underlying callable
- tasks revert method (if said method exists).
- """
-
- def __init__(self, context, task, result):
- self.task = task
- self.result = result
- self.context = context
-
- def __str__(self):
- return str(self.task)
-
- def __call__(self, cause):
- if ((hasattr(self.task, "revert") and
- isinstance(self.task.revert, collections.Callable))):
- self.task.revert(self.context, self.result, cause)
-
-
-class Runner(object):
- """A helper class that wraps a task and can find the needed inputs for
- the task to run, as well as providing a uuid and other useful functionality
- for users of the task.
-
- TODO(harlowja): replace with the task details object or a subclass of
- that???
- """
-
- def __init__(self, task, uuid=None):
- assert isinstance(task, collections.Callable)
- self.task = task
- self.providers = {}
- self.runs_before = []
- self.result = None
- if not uuid:
- self._id = str(uuidlib.uuid4())
- else:
- self._id = str(uuid)
-
- @property
- def uuid(self):
- return "r-%s" % (self._id)
-
- @property
- def requires(self):
- return set(get_attr(self.task, 'requires', []))
-
- @property
- def provides(self):
- return set(get_attr(self.task, 'provides', []))
-
- @property
- def optional(self):
- return set(get_attr(self.task, 'optional', []))
-
- @property
- def version(self):
- return get_task_version(self.task)
-
- @property
- def name(self):
- return get_task_name(self.task)
-
- def reset(self):
- self.result = None
-
- def __str__(self):
- lines = ["Runner: %s" % (self.name)]
- lines.append("%s" % (self.uuid))
- lines.append("%s" % (self.version))
- return "; ".join(lines)
-
- def __call__(self, *args, **kwargs):
- # Find all of our inputs first.
- kwargs = dict(kwargs)
- for (k, who_made) in self.providers.iteritems():
- if who_made.result and k in who_made.result:
- kwargs[k] = who_made.result[k]
- else:
- kwargs[k] = None
- optional_keys = self.optional
- optional_missing_keys = optional_keys - set(kwargs.keys())
- if optional_missing_keys:
- for k in optional_missing_keys:
- for r in self.runs_before:
- r_provides = r.provides
- if k in r_provides and r.result and k in r.result:
- kwargs[k] = r.result[k]
- break
- # And now finally run.
- self.result = self.task(*args, **kwargs)
- return self.result
-
-
-class TransitionNotifier(object):
- """A utility helper class that can be used to subscribe to
- notifications of events occurring as well as allow a entity to post said
- notifications to subscribers.
- """
-
- RESERVED_KEYS = ('details',)
- ANY = '*'
-
- def __init__(self):
- self._listeners = collections.defaultdict(list)
-
- def reset(self):
- self._listeners = collections.defaultdict(list)
-
- def notify(self, state, details):
- listeners = list(self._listeners.get(self.ANY, []))
- for i in self._listeners[state]:
- if i not in listeners:
- listeners.append(i)
- if not listeners:
- return
- for (callback, args, kwargs) in listeners:
- if args is None:
- args = []
- if kwargs is None:
- kwargs = {}
- kwargs['details'] = details
- try:
- callback(state, *args, **kwargs)
- except Exception:
- LOG.exception(("Failure calling callback %s to notify about"
- " state transition %s"), callback, state)
-
- def register(self, state, callback, args=None, kwargs=None):
- assert isinstance(callback, collections.Callable)
- for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
- if cb is callback:
- raise ValueError("Callback %s already registered" % (callback))
- if kwargs:
- for k in self.RESERVED_KEYS:
- if k in kwargs:
- raise KeyError(("Reserved key '%s' not allowed in "
- "kwargs") % k)
- kwargs = copy.copy(kwargs)
- if args:
- args = copy.copy(args)
- self._listeners[state].append((callback, args, kwargs))
-
- def deregister(self, state, callback):
- if state not in self._listeners:
- return
- for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
- if cb is callback:
- self._listeners[state].pop(i)
- break
-
-
-class RollbackAccumulator(object):
- """A utility class that can help in organizing 'undo' like code
- so that said code be rolled back on failure (automatically or manually)
- by activating rollback callables that were inserted during said codes
- progression.
- """
-
- def __init__(self):
- self._rollbacks = []
-
- def add(self, *callables):
- self._rollbacks.extend(callables)
-
- def reset(self):
- self._rollbacks = []
-
- def __len__(self):
- return len(self._rollbacks)
-
- def __iter__(self):
- # Rollbacks happen in the reverse order that they were added.
- return reversed(self._rollbacks)
-
- def __enter__(self):
- return self
-
- def rollback(self, cause):
- LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
- for (i, f) in enumerate(self):
- LOG.debug("Calling rollback %s: %s", i + 1, f)
- try:
- f(cause)
- except Exception:
- LOG.exception(("Failed rolling back %s: %s due "
- "to inner exception."), i + 1, f)
-
- def __exit__(self, type, value, tb):
- if any((value, type, tb)):
- self.rollback(value)
-
-
-class ReaderWriterLock(object):
- """A simple reader-writer lock.
-
- Several readers can hold the lock simultaneously, and only one writer.
- Write locks have priority over reads to prevent write starvation.
-
- Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
- """
-
- def __init__(self):
- self.rwlock = 0
- self.writers_waiting = 0
- self.monitor = threading.Lock()
- self.readers_ok = threading.Condition(self.monitor)
- self.writers_ok = threading.Condition(self.monitor)
-
- @contextlib.contextmanager
- def acquire(self, read=True):
- """Acquire a read or write lock in a context manager."""
- try:
- if read:
- self.acquire_read()
- else:
- self.acquire_write()
- yield self
- finally:
- self.release()
-
- def acquire_read(self):
- """Acquire a read lock.
-
- Several threads can hold this typeof lock.
- It is exclusive with write locks.
- """
-
- self.monitor.acquire()
- while self.rwlock < 0 or self.writers_waiting:
- self.readers_ok.wait()
- self.rwlock += 1
- self.monitor.release()
-
- def acquire_write(self):
- """Acquire a write lock.
-
- Only one thread can hold this lock, and only when no read locks
- are also held.
- """
-
- self.monitor.acquire()
- while self.rwlock != 0:
- self.writers_waiting += 1
- self.writers_ok.wait()
- self.writers_waiting -= 1
- self.rwlock = -1
- self.monitor.release()
-
- def release(self):
- """Release a lock, whether read or write."""
-
- self.monitor.acquire()
- if self.rwlock < 0:
- self.rwlock = 0
- else:
- self.rwlock -= 1
- wake_writers = self.writers_waiting and self.rwlock == 0
- wake_readers = self.writers_waiting == 0
- self.monitor.release()
- if wake_writers:
- self.writers_ok.acquire()
- self.writers_ok.notify()
- self.writers_ok.release()
- elif wake_readers:
- self.readers_ok.acquire()
- self.readers_ok.notifyAll()
- self.readers_ok.release()
-
-
-class LazyPluggable(object):
- """A pluggable backend loaded lazily based on some value."""
-
- def __init__(self, pivot, **backends):
- self.__backends = backends
- self.__pivot = pivot
- self.__backend = None
-
- def __get_backend(self):
- if not self.__backend:
- backend_name = 'sqlalchemy'
- backend = self.__backends[backend_name]
- if isinstance(backend, tuple):
- name = backend[0]
- fromlist = backend[1]
- else:
- name = backend
- fromlist = backend
-
- self.__backend = __import__(name, None, None, fromlist)
- return self.__backend
-
- def __getattr__(self, key):
- backend = self.__get_backend()
- return getattr(backend, key)
import mox
from oslo.config import cfg
+from taskflow.engines.action_engine import engine
from cinder.backup import driver as backup_driver
from cinder.brick.iscsi import iscsi
from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
-from cinder.taskflow.patterns import linear_flow
from cinder import test
from cinder.tests.brick.fake_lvm import FakeBrickLVM
from cinder.tests import conf_fixture
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
-from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
lambda *args, **kwargs: None)
- orig_flow = linear_flow.Flow.run
+ orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
- self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+ self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
- orig_flow = linear_flow.Flow.run
+ orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
- self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+ self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
def fake_create_volume(*args, **kwargs):
raise exception.CinderException('fake exception')
- def fake_reschedule_or_error(self, context, *args, **kwargs):
- self.assertFalse(context.is_admin)
- self.assertNotIn('admin', context.roles)
- #compare context passed in with the context we saved
- self.assertDictMatch(self.saved_ctxt.__dict__,
- context.__dict__)
-
#create context for testing
ctxt = self.context.deepcopy()
if 'admin' in ctxt.roles:
#create one copy of context for future comparison
self.saved_ctxt = ctxt.deepcopy()
- self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
- fake_reschedule_or_error)
self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
volume_src = tests_utils.create_volume(self.context,
import cinder.policy
from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
-from cinder import units
from cinder import utils
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
-from cinder.taskflow import states
-
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
default=True,
return False
create_what = {
- 'size': size,
+ 'context': context,
+ 'raw_size': size,
'name': name,
'description': description,
'snapshot': snapshot,
'image_id': image_id,
- 'volume_type': volume_type,
+ 'raw_volume_type': volume_type,
'metadata': metadata,
- 'availability_zone': availability_zone,
+ 'raw_availability_zone': availability_zone,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
}
- (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
- self.volume_rpcapi,
- self.db,
- self.image_service,
- check_volume_az_zone,
- create_what)
-
- assert flow, _('Create volume flow not retrieved')
- flow.run(context)
- if flow.state != states.SUCCESS:
- raise exception.CinderException(_("Failed to successfully complete"
- " create volume workflow"))
-
- # Extract the volume information from the task uuid that was specified
- # to produce said information.
- volume = None
+
try:
- volume = flow.results[uuid]['volume']
- except KeyError:
- pass
+ flow_engine = create_volume.get_api_flow(self.scheduler_rpcapi,
+ self.volume_rpcapi,
+ self.db,
+ self.image_service,
+ check_volume_az_zone,
+ create_what)
+ except Exception:
+ raise exception.CinderException(
+ _("Failed to create api volume flow"))
- # Raise an error, nobody provided it??
- assert volume, _('Expected volume result not found')
+ flow_engine.run()
+ volume = flow_engine.storage.fetch('volume')
return volume
@wrap_check_policy
# under the License.
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
-from cinder.taskflow import task
+from taskflow import task
def _make_task_name(cls, addons=None):
implement the given task as the task name.
"""
- def __init__(self, addons=None):
+ def __init__(self, addons=None, **kwargs):
super(CinderTask, self).__init__(_make_task_name(self.__class__,
- addons))
-
-
-class InjectTask(CinderTask):
- """This injects a dict into the flow.
-
- This injection is done so that the keys (and values) provided can be
- dependended on by tasks further down the line. Since taskflow is dependency
- based this can be considered the bootstrapping task that provides an
- initial set of values for other tasks to get started with. If this did not
- exist then tasks would fail locating there dependent tasks and the values
- said dependent tasks produce.
-
- Reversion strategy: N/A
- """
-
- def __init__(self, inject_what, addons=None):
- super(InjectTask, self).__init__(addons=addons)
- self.provides.update(inject_what.keys())
- self._inject = inject_what
-
- def __call__(self, context):
- return dict(self._inject)
+ addons),
+ **kwargs)
import traceback
from oslo.config import cfg
+import taskflow.engines
+from taskflow.patterns import linear_flow
+from taskflow import task
+from taskflow.utils import misc
from cinder import exception
from cinder.image import glance
from cinder.openstack.common import timeutils
from cinder import policy
from cinder import quota
-from cinder.taskflow import decorators
-from cinder.taskflow.patterns import linear_flow
-from cinder.taskflow import task
from cinder import units
from cinder import utils
from cinder.volume.flows import base
-from cinder.volume.flows import utils as flow_utils
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
return ".".join(meth_pieces)
-def _find_result_spec(flow):
- """Find the last task that produced a valid volume_spec and returns it."""
- for there_result in flow.results.values():
- if not there_result or not 'volume_spec' in there_result:
- continue
- if there_result['volume_spec']:
- return there_result['volume_spec']
- return None
-
-
def _restore_source_status(context, db, volume_spec):
# NOTE(harlowja): Only if the type of the volume that was being created is
# the source volume type should we try to reset the source volume status
Reversion strategy: N/A
"""
- def __init__(self, image_service, az_check_functor=None):
- super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
- # This task will produce the following outputs (said outputs can be
- # saved to durable storage in the future so that the flow can be
- # reconstructed elsewhere and continued).
- self.provides.update(['availability_zone', 'size', 'snapshot_id',
- 'source_volid', 'volume_type', 'volume_type_id',
- 'encryption_key_id'])
- # This task requires the following inputs to operate (provided
- # automatically to __call__(). This is done so that the flow can
- # be reconstructed elsewhere and continue running (in the future).
- #
- # It also is used to be able to link tasks that produce items to tasks
- # that consume items (thus allowing the linking of the flow to be
- # mostly automatic).
- self.requires.update(['availability_zone', 'image_id', 'metadata',
- 'size', 'snapshot', 'source_volume',
- 'volume_type', 'key_manager',
- 'backup_source_volume'])
+ # This task will produce the following outputs (said outputs can be
+ # saved to durable storage in the future so that the flow can be
+ # reconstructed elsewhere and continued).
+ default_provides = set(['availability_zone', 'size', 'snapshot_id',
+ 'source_volid', 'volume_type', 'volume_type_id',
+ 'encryption_key_id'])
+
+ def __init__(self, image_service, az_check_functor=None, **kwargs):
+ super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
+ **kwargs)
self.image_service = image_service
self.az_check_functor = az_check_functor
if not self.az_check_functor:
return volume_type_id
- def __call__(self, context, size, snapshot, image_id, source_volume,
- availability_zone, volume_type, metadata,
- key_manager, backup_source_volume):
+ def execute(self, context, size, snapshot, image_id, source_volume,
+ availability_zone, volume_type, metadata,
+ key_manager, backup_source_volume):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
Reversion strategy: remove the volume_id created from the database.
"""
+ default_provides = set(['volume_properties', 'volume_id', 'volume'])
+
def __init__(self, db):
- super(EntryCreateTask, self).__init__(addons=[ACTION])
+ requires = ['availability_zone', 'description', 'metadata',
+ 'name', 'reservations', 'size', 'snapshot_id',
+ 'source_volid', 'volume_type_id', 'encryption_key_id']
+ super(EntryCreateTask, self).__init__(addons=[ACTION],
+ requires=requires)
self.db = db
- self.requires.update(['availability_zone', 'description', 'metadata',
- 'name', 'reservations', 'size', 'snapshot_id',
- 'source_volid', 'volume_type_id',
- 'encryption_key_id'])
- self.provides.update(['volume_properties', 'volume_id'])
+ self.provides.update()
- def __call__(self, context, **kwargs):
+ def execute(self, context, **kwargs):
"""Creates a database entry for the given inputs and returns details.
Accesses the database and creates a new entry for the to be created
'volume': volume,
}
- def revert(self, context, result, cause):
+ def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
- if not result:
+ if isinstance(result, misc.Failure):
return
if context.quota_committed:
# Committed quota doesn't rollback as the volume has already been
an automated or manual process.
"""
+ default_provides = set(['reservations'])
+
def __init__(self):
super(QuotaReserveTask, self).__init__(addons=[ACTION])
- self.requires.update(['size', 'volume_type_id'])
- self.provides.update(['reservations'])
- def __call__(self, context, size, volume_type_id):
+ def execute(self, context, size, volume_type_id):
try:
reserve_opts = {'volumes': 1, 'gigabytes': size}
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
"already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
'd_consumed': _consumed('volumes')})
- allowed = quotas['volumes']
raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
else:
# If nothing was reraised, ensure we reraise the initial error
raise
- def revert(self, context, result, cause):
+ def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
- if not result:
+ if isinstance(result, misc.Failure):
return
if context.quota_committed:
# The reservations have already been committed and can not be
def __init__(self):
super(QuotaCommitTask, self).__init__(addons=[ACTION])
- self.requires.update(['reservations', 'volume_properties'])
- def __call__(self, context, reservations, volume_properties):
+ def execute(self, context, reservations, volume_properties):
QUOTAS.commit(context, reservations)
context.quota_committed = True
return {'volume_properties': volume_properties}
- def revert(self, context, result, cause):
+ def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
- if not result:
+ if isinstance(result, misc.Failure):
return
volume = result['volume_properties']
try:
"""
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
- super(VolumeCastTask, self).__init__(addons=[ACTION])
+ requires = ['image_id', 'scheduler_hints', 'snapshot_id',
+ 'source_volid', 'volume_id', 'volume_type',
+ 'volume_properties']
+ super(VolumeCastTask, self).__init__(addons=[ACTION],
+ requires=requires)
self.volume_rpcapi = volume_rpcapi
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
- self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
- 'source_volid', 'volume_id', 'volume_type',
- 'volume_properties'])
def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid']
image_id=image_id,
source_volid=source_volid)
- def __call__(self, context, **kwargs):
+ def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)
request_spec = kwargs.copy()
filter_properties = {}
filter_properties['scheduler_hints'] = scheduler_hints
self._cast_create_volume(context, request_spec, filter_properties)
+ def revert(self, context, result, flow_failures, **kwargs):
+ if isinstance(result, misc.Failure):
+ return
+
+ # Restore the source volume status and set the volume to error status.
+ volume_id = kwargs['volume_id']
+ _restore_source_status(context, self.db, kwargs)
+ _error_out_volume(context, self.db, volume_id)
+ LOG.error(_("Volume %s: create failed"), volume_id)
+ exc_info = False
+ if all(flow_failures[-1].exc_info):
+ exc_info = flow_failures[-1].exc_info
+ LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
class OnFailureChangeStatusTask(base.CinderTask):
"""Helper task that sets a volume id to status error.
def __init__(self, db):
super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
self.db = db
- self.requires.update(['volume_id'])
- self.optional.update(['volume_spec'])
- def __call__(self, context, volume_id, volume_spec=None):
+ def execute(self, context, volume_id, volume_spec):
# Save these items since we only use them if a reversion is triggered.
return {
'volume_id': volume_id,
'volume_spec': volume_spec,
}
- def revert(self, context, result, cause):
+ def revert(self, context, result, flow_failures, **kwargs):
+ if isinstance(result, misc.Failure):
+ return
volume_spec = result.get('volume_spec')
- if not volume_spec:
- # Attempt to use it from a later task that *should* have populated
- # this from the database. It is not needed to be found since
- # reverting will continue without it.
- volume_spec = _find_result_spec(cause.flow)
# Restore the source volume status and set the volume to error status.
volume_id = result['volume_id']
_restore_source_status(context, self.db, volume_spec)
- _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+ _error_out_volume(context, self.db, volume_id)
LOG.error(_("Volume %s: create failed"), volume_id)
- exc_info = False
- if all(cause.exc_info):
- exc_info = cause.exc_info
- LOG.error(_('Unexpected build error:'), exc_info=exc_info)
class OnFailureRescheduleTask(base.CinderTask):
"""
def __init__(self, reschedule_context, db, scheduler_rpcapi):
- super(OnFailureRescheduleTask, self).__init__(addons=[ACTION])
- self.requires.update(['filter_properties', 'image_id', 'request_spec',
- 'snapshot_id', 'volume_id'])
- self.optional.update(['volume_spec'])
+ requires = ['filter_properties', 'image_id', 'request_spec',
+ 'snapshot_id', 'volume_id', 'context']
+ super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
+ requires=requires)
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
self.reschedule_context = reschedule_context
exception.ImageUnacceptable,
]
- def _is_reschedulable(self, cause):
- # Figure out the type of the causes exception and compare it against
- # our black-list of exception types that will not cause rescheduling.
- exc_type, value = cause.exc_info[:2]
- # If we don't have a type from exc_info but we do have a exception in
- # the cause, try to get the type from that instead.
- if not value:
- value = cause.exc
- if not exc_type and value:
- exc_type = type(value)
- if exc_type and exc_type in self.no_reschedule_types:
- return False
- # Couldn't figure it out, by default assume whatever the cause was can
- # be fixed by rescheduling.
- #
- # NOTE(harlowja): Crosses fingers.
- return True
-
- def __call__(self, context, *args, **kwargs):
- # Save these items since we only use them if a reversion is triggered.
- return kwargs.copy()
+ def execute(self, **kwargs):
+ pass
def _reschedule(self, context, cause, request_spec, filter_properties,
snapshot_id, image_id, volume_id, **kwargs):
{'volume_id': volume_id,
'method': _make_pretty_name(create_volume),
'num': num_attempts,
- 'reason': _exception_to_unicode(cause.exc)})
+ 'reason': cause.exception_str})
if all(cause.exc_info):
# Stringify to avoid circular ref problem in json serialization
LOG.exception(_("Volume %s: resetting 'creating' status failed"),
volume_id)
- def revert(self, context, result, cause):
- volume_spec = result.get('volume_spec')
- if not volume_spec:
- # Find it from a prior task that populated this from the database.
- volume_spec = _find_result_spec(cause.flow)
- volume_id = result['volume_id']
+ def revert(self, context, result, flow_failures, **kwargs):
+ # Check if we have a cause which can tell us not to reschedule.
+ for failure in flow_failures.values():
+ if failure.check(self.no_reschedule_types):
+ return
+ volume_id = kwargs['volume_id']
# Use a different context when rescheduling.
if self.reschedule_context:
context = self.reschedule_context
-
- # If we are now supposed to reschedule (or unable to), then just
- # restore the source volume status and set the volume to error status.
- def do_error_revert():
- LOG.debug(_("Failing volume %s creation by altering volume status"
- " instead of rescheduling"), volume_id)
- _restore_source_status(context, self.db, volume_spec)
- _error_out_volume(context, self.db, volume_id, reason=cause.exc)
- LOG.error(_("Volume %s: create failed"), volume_id)
-
- # Check if we have a cause which can tell us not to reschedule.
- if not self._is_reschedulable(cause):
- do_error_revert()
- else:
try:
+ cause = list(flow_failures.values())[0]
self._pre_reschedule(context, volume_id)
- self._reschedule(context, cause, **result)
+ self._reschedule(context, cause, **kwargs)
self._post_reschedule(context, volume_id)
except exception.CinderException:
LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
- # NOTE(harlowja): Do error volume status changing instead.
- do_error_revert()
- exc_info = False
- if all(cause.exc_info):
- exc_info = cause.exc_info
- LOG.error(_('Unexpected build error:'), exc_info=exc_info)
-
-
-class NotifySchedulerFailureTask(base.CinderTask):
- """Helper task that notifies some external service on failure.
-
- Reversion strategy: On failure of any flow that includes this task the
- request specification associated with that flow will be extracted and
- sent as a payload to the notification service under the given methods
- scheduler topic.
- """
-
- def __init__(self, method):
- super(NotifySchedulerFailureTask, self).__init__(addons=[ACTION])
- self.requires.update(['request_spec', 'volume_id'])
- self.method = method
- self.topic = 'scheduler.%s' % self.method
- self.publisher_id = notifier.publisher_id("scheduler")
-
- def __call__(self, context, **kwargs):
- # Save these items since we only use them if a reversion is triggered.
- return kwargs.copy()
-
- def revert(self, context, result, cause):
- request_spec = result['request_spec']
- volume_id = result['volume_id']
- volume_properties = request_spec['volume_properties']
- payload = {
- 'request_spec': request_spec,
- 'volume_properties': volume_properties,
- 'volume_id': volume_id,
- 'state': 'error',
- 'method': self.method,
- 'reason': unicode(cause.exc),
- }
- try:
- notifier.notify(context, self.publisher_id, self.topic,
- notifier.ERROR, payload)
- except exception.CinderException:
- LOG.exception(_("Failed notifying on %(topic)s "
- "payload %(payload)s") % {'topic': self.topic,
- 'payload': payload})
class ExtractSchedulerSpecTask(base.CinderTask):
Reversion strategy: N/A
"""
- def __init__(self, db):
- super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION])
+ default_provides = set(['request_spec'])
+
+ def __init__(self, db, **kwargs):
+ super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
+ **kwargs)
self.db = db
- self.requires.update(['image_id', 'request_spec', 'snapshot_id',
- 'volume_id'])
- self.provides.update(['request_spec'])
def _populate_request_spec(self, context, volume_id, snapshot_id,
image_id):
'volume_type': list(dict(vol_type).iteritems()),
}
- def __call__(self, context, request_spec, volume_id, snapshot_id,
- image_id):
+ def execute(self, context, request_spec, volume_id, snapshot_id,
+ image_id):
# For RPC version < 1.2 backward compatibility
if request_spec is None:
request_spec = self._populate_request_spec(context, volume_id,
}
+class ExtractVolumeRefTask(base.CinderTask):
+ """Extracts volume reference for given volume id. """
+
+ default_provides = 'volume_ref'
+
+ def __init__(self, db):
+ super(ExtractVolumeRefTask, self).__init__(addons=[ACTION])
+ self.db = db
+
+ def execute(self, context, volume_id):
+ # NOTE(harlowja): this will fetch the volume from the database, if
+ # the volume has been deleted before we got here then this should fail.
+ #
+ # In the future we might want to have a lock on the volume_id so that
+ # the volume can not be deleted while its still being created?
+ volume_ref = self.db.volume_get(context, volume_id)
+
+ return volume_ref
+
+ def revert(self, context, volume_id, result, **kwargs):
+ if isinstance(result, misc.Failure):
+ return
+
+ _error_out_volume(context, self.db, volume_id)
+ LOG.error(_("Volume %s: create failed"), volume_id)
+
+
class ExtractVolumeSpecTask(base.CinderTask):
"""Extracts a spec of a volume to be created into a common structure.
Reversion strategy: N/A
"""
+ default_provides = 'volume_spec'
+
def __init__(self, db):
- super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION])
+ requires = ['image_id', 'snapshot_id', 'source_volid']
+ super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
+ requires=requires)
self.db = db
- self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
- 'source_volid', 'volume_id'])
- self.provides.update(['volume_spec', 'volume_ref'])
- def __call__(self, context, volume_id, **kwargs):
+ def execute(self, context, volume_ref, **kwargs):
get_remote_image_service = glance.get_remote_image_service
- # NOTE(harlowja): this will fetch the volume from the database, if
- # the volume has been deleted before we got here then this should fail.
- #
- # In the future we might want to have a lock on the volume_id so that
- # the volume can not be deleted while its still being created?
- volume_ref = self.db.volume_get(context, volume_id)
volume_name = volume_ref['name']
volume_size = utils.as_int(volume_ref['size'], quiet=False)
'image_service': image_service,
})
- return {
- 'volume_spec': specs,
- # NOTE(harlowja): it appears like further usage of this volume_ref
- # result actually depend on it being a sqlalchemy object and not
- # just a plain dictionary so thats why we are storing this here.
- #
- # It was attempted to refetch it when needed in subsequent tasks,
- # but that caused sqlalchemy errors to occur (volume already open
- # or similar).
- #
- # In the future where this task could fail and be recovered from we
- # will need to store the volume_spec and recreate the volume_ref
- # on demand.
- 'volume_ref': volume_ref,
- }
+ return specs
+
+ def revert(self, context, result, **kwargs):
+ if isinstance(result, misc.Failure):
+ return
+ volume_spec = result.get('volume_spec')
+ # Restore the source volume status and set the volume to error status.
+ _restore_source_status(context, self.db, volume_spec)
class NotifyVolumeActionTask(base.CinderTask):
def __init__(self, db, host, event_suffix):
super(NotifyVolumeActionTask, self).__init__(addons=[ACTION,
event_suffix])
- self.requires.update(['volume_ref'])
self.db = db
self.event_suffix = event_suffix
self.host = host
- def __call__(self, context, volume_ref):
+ def execute(self, context, volume_ref):
volume_id = volume_ref['id']
try:
volume_utils.notify_about_volume_usage(context, volume_ref,
Reversion strategy: N/A
"""
+ default_provides = 'volume'
+
def __init__(self, db, host, driver):
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
- self.requires.update(['volume_spec', 'volume_ref'])
# This maps the different volume specification types into the methods
# that can create said volume type (aka this is a jump table).
self._create_func_mapping = {
def _create_raw_volume(self, context, volume_ref, **kwargs):
return self.driver.create_volume(volume_ref)
- def __call__(self, context, volume_ref, volume_spec):
+ def execute(self, context, volume_ref, volume_spec):
# we can't do anything if the driver didn't init
if not self.driver.initialized:
LOG.error(_("Unable to create volume, driver not initialized"))
{'volume_id': volume_id, 'model': model_update})
raise exception.ExportFailure(reason=ex)
+ return volume_ref
+
class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
"""On successful volume creation this will perform final volume actions.
def __init__(self, db, host, event_suffix):
super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
- self.requires.update(['volume_spec'])
self.status_translation = {
'migration_target_creating': 'migration_target',
}
- def __call__(self, context, volume_ref, volume_spec):
- volume_id = volume_ref['id']
+ def execute(self, context, volume, volume_spec):
+ volume_id = volume['id']
new_status = self.status_translation.get(volume_spec.get('status'),
'available')
update = {
# 'building' if this fails)??
volume_ref = self.db.volume_update(context, volume_id, update)
# Now use the parent to notify.
- super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
+ super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
except exception.CinderException:
LOG.exception(_("Failed updating volume %(volume_id)s with "
"%(update)s") % {'volume_id': volume_id,
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
- # This injects the initial starting flow values into the workflow so that
- # the dependency order of the tasks provides/requires can be correctly
- # determined.
- api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
- api_flow.add(ExtractVolumeRequestTask(image_service,
- az_check_functor))
- api_flow.add(QuotaReserveTask())
- v_uuid = api_flow.add(EntryCreateTask(db))
- api_flow.add(QuotaCommitTask())
-
- # If after committing something fails, ensure we set the db to failure
- # before reverting any prior tasks.
- api_flow.add(OnFailureChangeStatusTask(db))
+ api_flow.add(ExtractVolumeRequestTask(
+ image_service,
+ az_check_functor,
+ rebind={'size': 'raw_size',
+ 'availability_zone': 'raw_availability_zone',
+ 'volume_type': 'raw_volume_type'}))
+ api_flow.add(QuotaReserveTask(),
+ EntryCreateTask(db),
+ QuotaCommitTask())
# This will cast it out to either the scheduler or volume manager via
# the rpc apis provided.
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
- # Note(harlowja): this will return the flow as well as the uuid of the
- # task which will produce the 'volume' database reference (since said
- # reference is returned to other callers in the api for further usage).
- return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
+ # Now load (but do not run) the flow using the provided initial data.
+ return taskflow.engines.load(api_flow, store=create_what)
-def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
+def get_scheduler_flow(context, db, driver, request_spec=None,
+ filter_properties=None,
volume_id=None, snapshot_id=None, image_id=None):
"""Constructs and returns the scheduler entrypoint flow.
4. Uses provided driver to to then select and continue processing of
volume request.
"""
-
- flow_name = ACTION.replace(":", "_") + "_scheduler"
- scheduler_flow = linear_flow.Flow(flow_name)
-
- # This injects the initial starting flow values into the workflow so that
- # the dependency order of the tasks provides/requires can be correctly
- # determined.
- scheduler_flow.add(base.InjectTask({
- 'request_spec': request_spec,
+ create_what = {
+ 'context': context,
+ 'raw_request_spec': request_spec,
'filter_properties': filter_properties,
'volume_id': volume_id,
'snapshot_id': snapshot_id,
'image_id': image_id,
- }, addons=[ACTION]))
+ }
- # This will extract and clean the spec from the starting values.
- scheduler_flow.add(ExtractSchedulerSpecTask(db))
+ flow_name = ACTION.replace(":", "_") + "_scheduler"
+ scheduler_flow = linear_flow.Flow(flow_name)
- # The decorator application here ensures that the method gets the right
- # requires attributes automatically by examining the underlying functions
- # arguments.
+ # This will extract and clean the spec from the starting values.
+ scheduler_flow.add(ExtractSchedulerSpecTask(
+ db,
+ rebind={'request_spec': 'raw_request_spec'}))
- @decorators.task
def schedule_create_volume(context, request_spec, filter_properties):
def _log_failure(cause):
_log_failure(e)
_error_out_volume(context, db, volume_id, reason=e)
- scheduler_flow.add(schedule_create_volume)
+ scheduler_flow.add(task.FunctorTask(schedule_create_volume))
- return flow_utils.attach_debug_listeners(scheduler_flow)
+ # Now load (but do not run) the flow using the provided initial data.
+ return taskflow.engines.load(scheduler_flow, store=create_what)
-def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
- request_spec=None, filter_properties=None,
- allow_reschedule=True,
- snapshot_id=None, image_id=None, source_volid=None,
- reschedule_context=None):
+def get_manager_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
+ allow_reschedule, reschedule_context, request_spec,
+ filter_properties, snapshot_id=None, image_id=None,
+ source_volid=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
flow_name = ACTION.replace(":", "_") + "_manager"
volume_flow = linear_flow.Flow(flow_name)
- # Determine if we are allowed to reschedule since this affects how
- # failures will be handled.
- if not filter_properties:
- filter_properties = {}
- if not request_spec and allow_reschedule:
- LOG.debug(_("No request spec, will not reschedule"))
- allow_reschedule = False
- if not filter_properties.get('retry', None) and allow_reschedule:
- LOG.debug(_("No retry filter property or associated "
- "retry info, will not reschedule"))
- allow_reschedule = False
-
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
- volume_flow.add(base.InjectTask({
+ create_what = {
+ 'context': context,
'filter_properties': filter_properties,
'image_id': image_id,
'request_spec': request_spec,
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'volume_id': volume_id,
- }, addons=[ACTION]))
-
- # We can actually just check if we should reschedule on failure ahead of
- # time instead of trying to determine this later, certain values are needed
- # to reschedule and without them we should just avoid rescheduling.
- if not allow_reschedule:
- # On failure ensure that we just set the volume status to error.
- LOG.debug(_("Retry info not present, will not reschedule"))
- volume_flow.add(OnFailureChangeStatusTask(db))
- else:
+ }
+
+ volume_flow.add(ExtractVolumeRefTask(db))
+
+ if allow_reschedule and request_spec:
volume_flow.add(OnFailureRescheduleTask(reschedule_context,
db, scheduler_rpcapi))
- volume_flow.add(ExtractVolumeSpecTask(db))
- volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
- volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
- volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
+ volume_flow.add(ExtractVolumeSpecTask(db),
+ NotifyVolumeActionTask(db, host, "create.start"),
+ CreateVolumeFromSpecTask(db, host, driver),
+ CreateVolumeOnFinishTask(db, host, "create.end"))
- return flow_utils.attach_debug_listeners(volume_flow)
+ # Now load (but do not run) the flow using the provided initial data.
+ return taskflow.engines.load(volume_flow, store=create_what)
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
-# Copyright (c) 2013 OpenStack Foundation
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from cinder.openstack.common import log as logging
-
-LOG = logging.getLogger(__name__)
-
-
-def attach_debug_listeners(flow):
- """Sets up a nice set of debug listeners for the flow.
-
- These listeners will log when tasks/flows are transitioning from state to
- state so that said states can be seen in the debug log output which is very
- useful for figuring out where problems are occurring.
- """
-
- def flow_log_change(state, details):
- # TODO(harlowja): the bug 1214083 is causing problems
- LOG.debug(_("%(flow)s has moved into state %(state)s from state"
- " %(old_state)s") % {'state': state,
- 'old_state': details.get('old_state'),
- 'flow': str(details['flow'])})
-
- def task_log_change(state, details):
- # TODO(harlowja): the bug 1214083 is causing problems
- LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
- " result: %(result)s") % {'state': state,
- 'flow': str(details['flow']),
- 'runner': str(details['runner']),
- 'result': details.get('result')})
-
- # Register * for all state changes (and not selective state changes to be
- # called upon) since all the changes is more useful.
- flow.notifier.register('*', flow_log_change)
- flow.task_notifier.register('*', task_log_change)
- return flow
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
-from cinder.taskflow import states
-
from eventlet.greenpool import GreenPool
LOG = logging.getLogger(__name__)
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
+
"""Creates and exports the volume."""
+ context_saved = context.deepcopy()
+ context = context.elevated()
+ if filter_properties is None:
+ filter_properties = {}
- flow = create_volume.get_manager_flow(
- self.db,
- self.driver,
- self.scheduler_rpcapi,
- self.host,
- volume_id,
- request_spec=request_spec,
- filter_properties=filter_properties,
- allow_reschedule=allow_reschedule,
- snapshot_id=snapshot_id,
- image_id=image_id,
- source_volid=source_volid,
- reschedule_context=context.deepcopy())
-
- assert flow, _('Manager volume flow not retrieved')
+ try:
+ flow_engine = create_volume.get_manager_flow(
+ context,
+ self.db,
+ self.driver,
+ self.scheduler_rpcapi,
+ self.host,
+ volume_id,
+ snapshot_id=snapshot_id,
+ image_id=image_id,
+ source_volid=source_volid,
+ allow_reschedule=allow_reschedule,
+ reschedule_context=context_saved,
+ request_spec=request_spec,
+ filter_properties=filter_properties)
+ except Exception:
+ raise exception.CinderException(
+ _("Failed to create manager volume flow"))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
locked_action = None
def _run_flow():
- flow.run(context.elevated())
- if flow.state != states.SUCCESS:
- msg = _("Failed to successfully complete manager volume "
- "workflow")
- raise exception.CinderException(msg)
+ # This code executes create volume flow. If something goes wrong,
+ # flow reverts all job that was done and reraises an exception.
+ # Otherwise, all data that was generated by flow becomes available
+ # in flow engine's storage.
+ flow_engine.run()
@utils.synchronized(locked_action, external=True)
def _run_flow_locked():
else:
_run_flow_locked()
- self._reset_stats()
- return volume_id
+ # Fetch created volume from storage
+ volume_ref = flow_engine.storage.fetch('volume')
+ return volume_ref['id']
@utils.require_driver_initialized
@locked_volume_operation
python-novaclient>=2.15.0
python-swiftclient>=1.5
Routes>=1.12.3
+taskflow>=0.1.1,<0.2
rtslib-fb>=2.1.39
six>=1.4.1
SQLAlchemy>=0.7.8,<=0.7.99
+++ /dev/null
-[DEFAULT]
-
-# The list of primitives to copy from taskflow
-primitives=flow.linear_flow,task
-
-# The base module to hold the copy of taskflow
-base=cinder