" exists for volume id %(volume_id)s")
+class ExportFailure(Invalid):
+ message = _("Failed to export for volume: %(reason)s")
+
+
+class MetadataCreateFailure(Invalid):
+ message = _("Failed to create metadata for volume: %(reason)s")
+
+
+class MetadataUpdateFailure(Invalid):
+ message = _("Failed to update metadata for volume: %(reason)s")
+
+
+class MetadataCopyFailure(Invalid):
+ message = _("Failed to copy metadata to volume: %(reason)s")
+
+
class ImageCopyFailure(Invalid):
message = _("Failed to copy image to volume: %(reason)s")
policy.set_brain(policy.Brain.load_json(data, default_rule))
+def enforce_action(context, action):
+ """Checks that the action can be done by the given context.
+
+ Applies a check to ensure the context's project_id and user_id can be
+ applied to the given action using the policy enforcement api.
+ """
+
+ target = {
+ 'project_id': context.project_id,
+ 'user_id': context.user_id,
+ }
+ enforce(context, action, target)
+
+
def enforce(context, action, target):
"""Verifies that the action is valid on the target in this context.
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import api as notifier
+from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
+from cinder.taskflow import states
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='cinder.scheduler.filter_scheduler.'
def create_volume(self, context, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
- try:
- if request_spec is None:
- # For RPC version < 1.2 backward compatibility
- request_spec = {}
- volume_ref = db.volume_get(context, volume_id)
- size = volume_ref.get('size')
- availability_zone = volume_ref.get('availability_zone')
- volume_type_id = volume_ref.get('volume_type_id')
- vol_type = db.volume_type_get(context, volume_type_id)
- volume_properties = {'size': size,
- 'availability_zone': availability_zone,
- 'volume_type_id': volume_type_id}
- request_spec.update(
- {'volume_id': volume_id,
- 'snapshot_id': snapshot_id,
- 'image_id': image_id,
- 'volume_properties': volume_properties,
- 'volume_type': dict(vol_type).iteritems()})
-
- self.driver.schedule_create_volume(context, request_spec,
- filter_properties)
- except exception.NoValidHost as ex:
- volume_state = {'volume_state': {'status': 'error'}}
- self._set_volume_state_and_notify('create_volume',
- volume_state,
- context, ex, request_spec)
- except Exception as ex:
- with excutils.save_and_reraise_exception():
- volume_state = {'volume_state': {'status': 'error'}}
- self._set_volume_state_and_notify('create_volume',
- volume_state,
- context, ex, request_spec)
- def _set_volume_state_and_notify(self, method, updates, context, ex,
- request_spec):
- LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
- {'method': method, 'ex': ex})
-
- volume_state = updates['volume_state']
- properties = request_spec.get('volume_properties', {})
-
- volume_id = request_spec.get('volume_id', None)
+ flow = create_volume.get_scheduler_flow(db, self.driver,
+ request_spec,
+ filter_properties,
+ volume_id, snapshot_id,
+ image_id)
+ assert flow, _('Schedule volume flow not retrieved')
- if volume_id:
- db.volume_update(context, volume_id, volume_state)
-
- payload = dict(request_spec=request_spec,
- volume_properties=properties,
- volume_id=volume_id,
- state=volume_state,
- method=method,
- reason=ex)
-
- notifier.notify(context, notifier.publisher_id("scheduler"),
- 'scheduler.' + method, notifier.ERROR, payload)
+ flow.run(context)
+ if flow.state != states.SUCCESS:
+ LOG.warn(_("Failed to successfully complete"
+ " schedule volume using flow: %s"), flow)
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
tgt_host,
force_host_copy)
+
+ def _set_volume_state_and_notify(self, method, updates, context, ex,
+ request_spec):
+ # TODO(harlowja): move into a task that just does this later.
+
+ LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
+ {'method': method, 'ex': ex})
+
+ volume_state = updates['volume_state']
+ properties = request_spec.get('volume_properties', {})
+
+ volume_id = request_spec.get('volume_id', None)
+
+ if volume_id:
+ db.volume_update(context, volume_id, volume_state)
+
+ payload = dict(request_spec=request_spec,
+ volume_properties=properties,
+ volume_id=volume_id,
+ state=volume_state,
+ method=method,
+ reason=ex)
+
+ notifier.notify(context, notifier.publisher_id("scheduler"),
+ 'scheduler.' + method, notifier.ERROR, payload)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import functools
+import inspect
+import types
+
+# These arguments are ones that we will skip when parsing for requirements
+# for a function to operate (when used as a task).
+AUTO_ARGS = ('self', 'context', 'cls')
+
+
+def is_decorated(functor):
+ if not isinstance(functor, (types.MethodType, types.FunctionType)):
+ return False
+ return getattr(extract(functor), '__task__', False)
+
+
+def extract(functor):
+ # Extract the underlying functor if its a method since we can not set
+ # attributes on instance methods, this is supposedly fixed in python 3
+ # and later.
+ #
+ # TODO(harlowja): add link to this fix.
+ assert isinstance(functor, (types.MethodType, types.FunctionType))
+ if isinstance(functor, types.MethodType):
+ return functor.__func__
+ else:
+ return functor
+
+
+def _mark_as_task(functor):
+ setattr(functor, '__task__', True)
+
+
+def _get_wrapped(function):
+ """Get the method at the bottom of a stack of decorators."""
+
+ if hasattr(function, '__wrapped__'):
+ return getattr(function, '__wrapped__')
+
+ if not hasattr(function, 'func_closure') or not function.func_closure:
+ return function
+
+ def _get_wrapped_function(function):
+ if not hasattr(function, 'func_closure') or not function.func_closure:
+ return None
+
+ for closure in function.func_closure:
+ func = closure.cell_contents
+
+ deeper_func = _get_wrapped_function(func)
+ if deeper_func:
+ return deeper_func
+ elif hasattr(closure.cell_contents, '__call__'):
+ return closure.cell_contents
+
+ return _get_wrapped_function(function)
+
+
+def _take_arg(a):
+ if a in AUTO_ARGS:
+ return False
+ # In certain decorator cases it seems like we get the function to be
+ # decorated as an argument, we don't want to take that as a real argument.
+ if isinstance(a, collections.Callable):
+ return False
+ return True
+
+
+def wraps(fn):
+ """This will not be needed in python 3.2 or greater which already has this
+ built-in to its functools.wraps method.
+ """
+
+ def wrapper(f):
+ f = functools.wraps(fn)(f)
+ f.__wrapped__ = getattr(fn, '__wrapped__', fn)
+ return f
+
+ return wrapper
+
+
+def locked(f):
+
+ @wraps(f)
+ def wrapper(self, *args, **kwargs):
+ with self._lock:
+ return f(self, *args, **kwargs)
+
+ return wrapper
+
+
+def task(*args, **kwargs):
+ """Decorates a given function and ensures that all needed attributes of
+ that function are set so that the function can be used as a task.
+ """
+
+ def decorator(f):
+ w_f = extract(f)
+
+ def noop(*args, **kwargs):
+ pass
+
+ # Mark as being a task
+ _mark_as_task(w_f)
+
+ # By default don't revert this.
+ w_f.revert = kwargs.pop('revert_with', noop)
+
+ # Associate a name of this task that is the module + function name.
+ w_f.name = "%s.%s" % (f.__module__, f.__name__)
+
+ # Sets the version of the task.
+ version = kwargs.pop('version', (1, 0))
+ f = _versionize(*version)(f)
+
+ # Attach any requirements this function needs for running.
+ requires_what = kwargs.pop('requires', [])
+ f = _requires(*requires_what, **kwargs)(f)
+
+ # Attach any optional requirements this function needs for running.
+ optional_what = kwargs.pop('optional', [])
+ f = _optional(*optional_what, **kwargs)(f)
+
+ # Attach any items this function provides as output
+ provides_what = kwargs.pop('provides', [])
+ f = _provides(*provides_what, **kwargs)(f)
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ # This is needed to handle when the decorator has args or the decorator
+ # doesn't have args, python is rather weird here...
+ if kwargs or not args:
+ return decorator
+ else:
+ if isinstance(args[0], collections.Callable):
+ return decorator(args[0])
+ else:
+ return decorator
+
+
+def _versionize(major, minor=None):
+ """A decorator that marks the wrapped function with a major & minor version
+ number.
+ """
+
+ if minor is None:
+ minor = 0
+
+ def decorator(f):
+ w_f = extract(f)
+ w_f.version = (major, minor)
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ return decorator
+
+
+def _optional(*args, **kwargs):
+ """Attaches a set of items that the decorated function would like as input
+ to the functions underlying dictionary.
+ """
+
+ def decorator(f):
+ w_f = extract(f)
+
+ if not hasattr(w_f, 'optional'):
+ w_f.optional = set()
+
+ w_f.optional.update([a for a in args if _take_arg(a)])
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ # This is needed to handle when the decorator has args or the decorator
+ # doesn't have args, python is rather weird here...
+ if kwargs or not args:
+ return decorator
+ else:
+ if isinstance(args[0], collections.Callable):
+ return decorator(args[0])
+ else:
+ return decorator
+
+
+def _requires(*args, **kwargs):
+ """Attaches a set of items that the decorated function requires as input
+ to the functions underlying dictionary.
+ """
+
+ def decorator(f):
+ w_f = extract(f)
+
+ if not hasattr(w_f, 'requires'):
+ w_f.requires = set()
+
+ if kwargs.pop('auto_extract', True):
+ inspect_what = _get_wrapped(f)
+ f_args = inspect.getargspec(inspect_what).args
+ w_f.requires.update([a for a in f_args if _take_arg(a)])
+
+ w_f.requires.update([a for a in args if _take_arg(a)])
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ # This is needed to handle when the decorator has args or the decorator
+ # doesn't have args, python is rather weird here...
+ if kwargs or not args:
+ return decorator
+ else:
+ if isinstance(args[0], collections.Callable):
+ return decorator(args[0])
+ else:
+ return decorator
+
+
+def _provides(*args, **kwargs):
+ """Attaches a set of items that the decorated function provides as output
+ to the functions underlying dictionary.
+ """
+
+ def decorator(f):
+ w_f = extract(f)
+
+ if not hasattr(f, 'provides'):
+ w_f.provides = set()
+
+ w_f.provides.update([a for a in args if _take_arg(a)])
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ # This is needed to handle when the decorator has args or the decorator
+ # doesn't have args, python is rather weird here...
+ if kwargs or not args:
+ return decorator
+ else:
+ if isinstance(args[0], collections.Callable):
+ return decorator(args[0])
+ else:
+ return decorator
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class TaskFlowException(Exception):
+ """Base class for exceptions emitted from this library."""
+ pass
+
+
+class Duplicate(TaskFlowException):
+ """Raised when a duplicate entry is found."""
+ pass
+
+
+class NotFound(TaskFlowException):
+ """Raised when some entry in some object doesn't exist."""
+ pass
+
+
+class AlreadyExists(TaskFlowException):
+ """Raised when some entry in some object already exists."""
+ pass
+
+
+class ClosedException(TaskFlowException):
+ """Raised when an access on a closed object occurs."""
+ pass
+
+
+class InvalidStateException(TaskFlowException):
+ """Raised when a task/job/workflow is in an invalid state when an
+ operation is attempting to apply to said task/job/workflow.
+ """
+ pass
+
+
+class UnclaimableJobException(TaskFlowException):
+ """Raised when a job can not be claimed."""
+ pass
+
+
+class JobNotFound(TaskFlowException):
+ """Raised when a job entry can not be found."""
+ pass
+
+
+class MissingDependencies(InvalidStateException):
+ """Raised when a task has dependencies that can not be satisified."""
+ message = ("%(task)s requires %(requirements)s but no other task produces"
+ " said requirements")
+
+ def __init__(self, task, requirements):
+ message = self.message % {'task': task, 'requirements': requirements}
+ super(MissingDependencies, self).__init__(message)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import threading
+
+from cinder.openstack.common import uuidutils
+
+from cinder.taskflow import decorators
+from cinder.taskflow import exceptions as exc
+from cinder.taskflow import states
+from cinder.taskflow import utils
+
+
+class Flow(object):
+ """The base abstract class of all flow implementations.
+
+ It provides a set of parents to flows that have a concept of parent flows
+ as well as a state and state utility functions to the deriving classes. It
+ also provides a name and an identifier (uuid or other) to the flow so that
+ it can be uniquely identifed among many flows.
+
+ Flows are expected to provide (if desired) the following methods:
+ - add
+ - add_many
+ - interrupt
+ - reset
+ - rollback
+ - run
+ - soft_reset
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ # Common states that certain actions can be performed in. If the flow
+ # is not in these sets of states then it is likely that the flow operation
+ # can not succeed.
+ RESETTABLE_STATES = set([
+ states.INTERRUPTED,
+ states.SUCCESS,
+ states.PENDING,
+ states.FAILURE,
+ ])
+ SOFT_RESETTABLE_STATES = set([
+ states.INTERRUPTED,
+ ])
+ UNINTERRUPTIBLE_STATES = set([
+ states.FAILURE,
+ states.SUCCESS,
+ states.PENDING,
+ ])
+ RUNNABLE_STATES = set([
+ states.PENDING,
+ ])
+
+ def __init__(self, name, parents=None, uuid=None):
+ self._name = str(name)
+ # The state of this flow.
+ self._state = states.PENDING
+ # If this flow has a parent flow/s which need to be reverted if
+ # this flow fails then please include them here to allow this child
+ # to call the parents...
+ if parents:
+ self.parents = tuple(parents)
+ else:
+ self.parents = ()
+ # Any objects that want to listen when a wf/task starts/stops/completes
+ # or errors should be registered here. This can be used to monitor
+ # progress and record tasks finishing (so that it becomes possible to
+ # store the result of a task in some persistent or semi-persistent
+ # storage backend).
+ self.notifier = utils.TransitionNotifier()
+ self.task_notifier = utils.TransitionNotifier()
+ # Ensure that modifications and/or multiple runs aren't happening
+ # at the same time in the same flow at the same time.
+ self._lock = threading.RLock()
+ # Assign this flow a unique identifer.
+ if uuid:
+ self._id = str(uuid)
+ else:
+ self._id = uuidutils.generate_uuid()
+
+ @property
+ def name(self):
+ """A non-unique name for this flow (human readable)"""
+ return self._name
+
+ @property
+ def uuid(self):
+ """Uniquely identifies this flow"""
+ return "f-%s" % (self._id)
+
+ @property
+ def state(self):
+ """Provides a read-only view of the flow state."""
+ return self._state
+
+ def _change_state(self, context, new_state):
+ was_changed = False
+ old_state = self.state
+ with self._lock:
+ if self.state != new_state:
+ old_state = self.state
+ self._state = new_state
+ was_changed = True
+ if was_changed:
+ # Don't notify while holding the lock.
+ self.notifier.notify(self.state, details={
+ 'context': context,
+ 'flow': self,
+ 'old_state': old_state,
+ })
+
+ def __str__(self):
+ lines = ["Flow: %s" % (self.name)]
+ lines.append("%s" % (self.uuid))
+ lines.append("%s" % (len(self.parents)))
+ lines.append("%s" % (self.state))
+ return "; ".join(lines)
+
+ @abc.abstractmethod
+ def add(self, task):
+ """Adds a given task to this flow.
+
+ Returns the uuid that is associated with the task for later operations
+ before and after it is ran.
+ """
+ raise NotImplementedError()
+
+ @decorators.locked
+ def add_many(self, tasks):
+ """Adds many tasks to this flow.
+
+ Returns a list of uuids (one for each task added).
+ """
+ uuids = []
+ for t in tasks:
+ uuids.append(self.add(t))
+ return uuids
+
+ def interrupt(self):
+ """Attempts to interrupt the current flow and any tasks that are
+ currently not running in the flow.
+
+ Returns how many tasks were interrupted (if any).
+ """
+ if self.state in self.UNINTERRUPTIBLE_STATES:
+ raise exc.InvalidStateException(("Can not interrupt when"
+ " in state %s") % (self.state))
+ # Note(harlowja): Do *not* acquire the lock here so that the flow may
+ # be interrupted while running. This does mean the the above check may
+ # not be valid but we can worry about that if it becomes an issue.
+ old_state = self.state
+ if old_state != states.INTERRUPTED:
+ self._state = states.INTERRUPTED
+ self.notifier.notify(self.state, details={
+ 'context': None,
+ 'flow': self,
+ 'old_state': old_state,
+ })
+ return 0
+
+ @decorators.locked
+ def reset(self):
+ """Fully resets the internal state of this flow, allowing for the flow
+ to be ran again.
+
+ Note: Listeners are also reset.
+ """
+ if self.state not in self.RESETTABLE_STATES:
+ raise exc.InvalidStateException(("Can not reset when"
+ " in state %s") % (self.state))
+ self.notifier.reset()
+ self.task_notifier.reset()
+ self._change_state(None, states.PENDING)
+
+ @decorators.locked
+ def soft_reset(self):
+ """Partially resets the internal state of this flow, allowing for the
+ flow to be ran again from an interrupted state only.
+ """
+ if self.state not in self.SOFT_RESETTABLE_STATES:
+ raise exc.InvalidStateException(("Can not soft reset when"
+ " in state %s") % (self.state))
+ self._change_state(None, states.PENDING)
+
+ @decorators.locked
+ def run(self, context, *args, **kwargs):
+ """Executes the workflow."""
+ if self.state not in self.RUNNABLE_STATES:
+ raise exc.InvalidStateException("Unable to run flow when "
+ "in state %s" % (self.state))
+
+ @decorators.locked
+ def rollback(self, context, cause):
+ """Performs rollback of this workflow and any attached parent workflows
+ if present.
+ """
+ pass
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import logging
+
+from cinder.openstack.common import excutils
+
+from cinder.taskflow import decorators
+from cinder.taskflow import exceptions as exc
+from cinder.taskflow import states
+from cinder.taskflow import utils
+
+from cinder.taskflow.patterns import base
+
+LOG = logging.getLogger(__name__)
+
+
+class Flow(base.Flow):
+ """"A linear chain of tasks that can be applied in order as one unit and
+ rolled back as one unit using the reverse order that the tasks have
+ been applied in.
+
+ Note(harlowja): Each task in the chain must have requirements
+ which are satisfied by the previous task/s in the chain.
+ """
+
+ def __init__(self, name, parents=None, uuid=None):
+ super(Flow, self).__init__(name, parents, uuid)
+ # The tasks which have been applied will be collected here so that they
+ # can be reverted in the correct order on failure.
+ self._accumulator = utils.RollbackAccumulator()
+ # Tasks results are stored here. Lookup is by the uuid that was
+ # returned from the add function.
+ self.results = {}
+ # The previously left off iterator that can be used to resume from
+ # the last task (if interrupted and soft-reset).
+ self._leftoff_at = None
+ # All runners to run are collected here.
+ self._runners = []
+ self._connected = False
+ # The resumption strategy to use.
+ self.resumer = None
+
+ @decorators.locked
+ def add(self, task):
+ """Adds a given task to this flow."""
+ assert isinstance(task, collections.Callable)
+ r = utils.Runner(task)
+ r.runs_before = list(reversed(self._runners))
+ self._runners.append(r)
+ self._reset_internals()
+ return r.uuid
+
+ def _reset_internals(self):
+ self._connected = False
+ self._leftoff_at = None
+
+ def _associate_providers(self, runner):
+ # Ensure that some previous task provides this input.
+ who_provides = {}
+ task_requires = runner.requires
+ for r in task_requires:
+ provider = None
+ for before_me in runner.runs_before:
+ if r in before_me.provides:
+ provider = before_me
+ break
+ if provider:
+ who_provides[r] = provider
+ # Ensure that the last task provides all the needed input for this
+ # task to run correctly.
+ missing_requires = task_requires - set(who_provides.keys())
+ if missing_requires:
+ raise exc.MissingDependencies(runner, sorted(missing_requires))
+ runner.providers.update(who_provides)
+
+ def __str__(self):
+ lines = ["LinearFlow: %s" % (self.name)]
+ lines.append("%s" % (self.uuid))
+ lines.append("%s" % (len(self._runners)))
+ lines.append("%s" % (len(self.parents)))
+ lines.append("%s" % (self.state))
+ return "; ".join(lines)
+
+ @decorators.locked
+ def remove(self, uuid):
+ index_removed = -1
+ for (i, r) in enumerate(self._runners):
+ if r.uuid == uuid:
+ index_removed = i
+ break
+ if index_removed == -1:
+ raise ValueError("No runner found with uuid %s" % (uuid))
+ else:
+ removed = self._runners.pop(index_removed)
+ self._reset_internals()
+ # Go and remove it from any runner after the removed runner since
+ # those runners may have had an attachment to it.
+ for r in self._runners[index_removed:]:
+ try:
+ r.runs_before.remove(removed)
+ except (IndexError, ValueError):
+ pass
+
+ def __len__(self):
+ return len(self._runners)
+
+ def _connect(self):
+ if self._connected:
+ return self._runners
+ for r in self._runners:
+ r.providers = {}
+ for r in reversed(self._runners):
+ self._associate_providers(r)
+ self._connected = True
+ return self._runners
+
+ def _ordering(self):
+ return iter(self._connect())
+
+ @decorators.locked
+ def run(self, context, *args, **kwargs):
+ super(Flow, self).run(context, *args, **kwargs)
+
+ def resume_it():
+ if self._leftoff_at is not None:
+ return ([], self._leftoff_at)
+ if self.resumer:
+ (finished, leftover) = self.resumer.resume(self,
+ self._ordering())
+ else:
+ finished = []
+ leftover = self._ordering()
+ return (finished, leftover)
+
+ self._change_state(context, states.STARTED)
+ try:
+ those_finished, leftover = resume_it()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self._change_state(context, states.FAILURE)
+
+ def run_it(runner, failed=False, result=None, simulate_run=False):
+ try:
+ # Add the task to be rolled back *immediately* so that even if
+ # the task fails while producing results it will be given a
+ # chance to rollback.
+ rb = utils.RollbackTask(context, runner.task, result=None)
+ self._accumulator.add(rb)
+ self.task_notifier.notify(states.STARTED, details={
+ 'context': context,
+ 'flow': self,
+ 'runner': runner,
+ })
+ if not simulate_run:
+ result = runner(context, *args, **kwargs)
+ else:
+ if failed:
+ # TODO(harlowja): make this configurable??
+ # If we previously failed, we want to fail again at
+ # the same place.
+ if not result:
+ # If no exception or exception message was provided
+ # or captured from the previous run then we need to
+ # form one for this task.
+ result = "%s failed running." % (runner.task)
+ if isinstance(result, basestring):
+ result = exc.InvalidStateException(result)
+ if not isinstance(result, Exception):
+ LOG.warn("Can not raise a non-exception"
+ " object: %s", result)
+ result = exc.InvalidStateException()
+ raise result
+ # Adjust the task result in the accumulator before
+ # notifying others that the task has finished to
+ # avoid the case where a listener might throw an
+ # exception.
+ rb.result = result
+ runner.result = result
+ self.results[runner.uuid] = result
+ self.task_notifier.notify(states.SUCCESS, details={
+ 'context': context,
+ 'flow': self,
+ 'runner': runner,
+ })
+ except Exception as e:
+ runner.result = e
+ cause = utils.FlowFailure(runner, self, e)
+ with excutils.save_and_reraise_exception():
+ # Notify any listeners that the task has errored.
+ self.task_notifier.notify(states.FAILURE, details={
+ 'context': context,
+ 'flow': self,
+ 'runner': runner,
+ })
+ self.rollback(context, cause)
+
+ if len(those_finished):
+ self._change_state(context, states.RESUMING)
+ for (r, details) in those_finished:
+ # Fake running the task so that we trigger the same
+ # notifications and state changes (and rollback that
+ # would have happened in a normal flow).
+ failed = states.FAILURE in details.get('states', [])
+ result = details.get('result')
+ run_it(r, failed=failed, result=result, simulate_run=True)
+
+ self._leftoff_at = leftover
+ self._change_state(context, states.RUNNING)
+ if self.state == states.INTERRUPTED:
+ return
+
+ was_interrupted = False
+ for r in leftover:
+ r.reset()
+ run_it(r)
+ if self.state == states.INTERRUPTED:
+ was_interrupted = True
+ break
+
+ if not was_interrupted:
+ # Only gets here if everything went successfully.
+ self._change_state(context, states.SUCCESS)
+ self._leftoff_at = None
+
+ @decorators.locked
+ def reset(self):
+ super(Flow, self).reset()
+ self.results = {}
+ self.resumer = None
+ self._accumulator.reset()
+ self._reset_internals()
+
+ @decorators.locked
+ def rollback(self, context, cause):
+ # Performs basic task by task rollback by going through the reverse
+ # order that tasks have finished and asking said task to undo whatever
+ # it has done. If this flow has any parent flows then they will
+ # also be called to rollback any tasks said parents contain.
+ #
+ # Note(harlowja): if a flow can more simply revert a whole set of
+ # tasks via a simpler command then it can override this method to
+ # accomplish that.
+ #
+ # For example, if each task was creating a file in a directory, then
+ # it's easier to just remove the directory than to ask each task to
+ # delete its file individually.
+ self._change_state(context, states.REVERTING)
+ try:
+ self._accumulator.rollback(cause)
+ finally:
+ self._change_state(context, states.FAILURE)
+ # Rollback any parents flows if they exist...
+ for p in self.parents:
+ p.rollback(context, cause)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Job states.
+CLAIMED = 'CLAIMED'
+FAILURE = 'FAILURE'
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+SUCCESS = 'SUCCESS'
+UNCLAIMED = 'UNCLAIMED'
+
+# Flow states.
+FAILURE = FAILURE
+INTERRUPTED = 'INTERRUPTED'
+PENDING = 'PENDING'
+RESUMING = 'RESUMING'
+REVERTING = 'REVERTING'
+RUNNING = RUNNING
+STARTED = 'STARTED'
+SUCCESS = SUCCESS
+
+# Task states.
+FAILURE = FAILURE
+STARTED = STARTED
+SUCCESS = SUCCESS
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+from cinder.taskflow import utils
+
+
+class Task(object):
+ """An abstraction that defines a potential piece of work that can be
+ applied and can be reverted to undo the work as a single unit.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, name):
+ self.name = name
+ # An *immutable* input 'resource' name set this task depends
+ # on existing before this task can be applied.
+ self.requires = set()
+ # An *immutable* input 'resource' name set this task would like to
+ # depends on existing before this task can be applied (but does not
+ # strongly depend on existing).
+ self.optional = set()
+ # An *immutable* output 'resource' name set this task
+ # produces that other tasks may depend on this task providing.
+ self.provides = set()
+ # This identifies the version of the task to be ran which
+ # can be useful in resuming older versions of tasks. Standard
+ # major, minor version semantics apply.
+ self.version = (1, 0)
+
+ def __str__(self):
+ return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
+
+ @abc.abstractmethod
+ def __call__(self, context, *args, **kwargs):
+ """Activate a given task which will perform some operation and return.
+
+ This method can be used to apply some given context and given set
+ of args and kwargs to accomplish some goal. Note that the result
+ that is returned needs to be serializable so that it can be passed
+ back into this task if reverting is triggered.
+ """
+ raise NotImplementedError()
+
+ def revert(self, context, result, cause):
+ """Revert this task using the given context, result that the apply
+ provided as well as any information which may have caused
+ said reversion.
+ """
+ pass
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import contextlib
+import copy
+import logging
+import re
+import sys
+import threading
+import time
+import types
+
+from cinder.openstack.common import uuidutils
+
+from cinder.taskflow import decorators
+
+LOG = logging.getLogger(__name__)
+
+
+def get_attr(task, field, default=None):
+ if decorators.is_decorated(task):
+ # If its a decorated functor then the attributes will be either
+ # in the underlying function of the instancemethod or the function
+ # itself.
+ task = decorators.extract(task)
+ return getattr(task, field, default)
+
+
+def join(itr, with_what=","):
+ pieces = [str(i) for i in itr]
+ return with_what.join(pieces)
+
+
+def get_many_attr(obj, *attrs):
+ many = []
+ for a in attrs:
+ many.append(get_attr(obj, a, None))
+ return many
+
+
+def get_task_version(task):
+ """Gets a tasks *string* version, whether it is a task object/function."""
+ task_version = get_attr(task, 'version')
+ if isinstance(task_version, (list, tuple)):
+ task_version = join(task_version, with_what=".")
+ if task_version is not None and not isinstance(task_version, basestring):
+ task_version = str(task_version)
+ return task_version
+
+
+def get_task_name(task):
+ """Gets a tasks *string* name, whether it is a task object/function."""
+ task_name = ""
+ if isinstance(task, (types.MethodType, types.FunctionType)):
+ # If its a function look for the attributes that should have been
+ # set using the task() decorator provided in the decorators file. If
+ # those have not been set, then we should at least have enough basic
+ # information (not a version) to form a useful task name.
+ task_name = get_attr(task, 'name')
+ if not task_name:
+ name_pieces = [a for a in get_many_attr(task,
+ '__module__',
+ '__name__')
+ if a is not None]
+ task_name = join(name_pieces, ".")
+ else:
+ task_name = str(task)
+ return task_name
+
+
+def is_version_compatible(version_1, version_2):
+ """Checks for major version compatibility of two *string" versions."""
+ if version_1 == version_2:
+ # Equivalent exactly, so skip the rest.
+ return True
+
+ def _convert_to_pieces(version):
+ try:
+ pieces = []
+ for p in version.split("."):
+ p = p.strip()
+ if not len(p):
+ pieces.append(0)
+ continue
+ # Clean off things like 1alpha, or 2b and just select the
+ # digit that starts that entry instead.
+ p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
+ if p_match:
+ p = p_match.group(1)
+ pieces.append(int(p))
+ except (AttributeError, TypeError, ValueError):
+ pieces = []
+ return pieces
+
+ version_1_pieces = _convert_to_pieces(version_1)
+ version_2_pieces = _convert_to_pieces(version_2)
+ if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
+ return False
+
+ # Ensure major version compatibility to start.
+ major1 = version_1_pieces[0]
+ major2 = version_2_pieces[0]
+ if major1 != major2:
+ return False
+ return True
+
+
+def await(check_functor, timeout=None):
+ if timeout is not None:
+ end_time = time.time() + max(0, timeout)
+ else:
+ end_time = None
+ # Use the same/similar scheme that the python condition class uses.
+ delay = 0.0005
+ while not check_functor():
+ time.sleep(delay)
+ if end_time is not None:
+ remaining = end_time - time.time()
+ if remaining <= 0:
+ return False
+ delay = min(delay * 2, remaining, 0.05)
+ else:
+ delay = min(delay * 2, 0.05)
+ return True
+
+
+class LastFedIter(object):
+ """An iterator which yields back the first item and then yields back
+ results from the provided iterator.
+ """
+
+ def __init__(self, first, rest_itr):
+ self.first = first
+ self.rest_itr = rest_itr
+
+ def __iter__(self):
+ yield self.first
+ for i in self.rest_itr:
+ yield i
+
+
+class FlowFailure(object):
+ """When a task failure occurs the following object will be given to revert
+ and can be used to interrogate what caused the failure.
+ """
+
+ def __init__(self, runner, flow, exception):
+ self.runner = runner
+ self.flow = flow
+ self.exc = exception
+ self.exc_info = sys.exc_info()
+
+
+class RollbackTask(object):
+ """A helper task that on being called will call the underlying callable
+ tasks revert method (if said method exists).
+ """
+
+ def __init__(self, context, task, result):
+ self.task = task
+ self.result = result
+ self.context = context
+
+ def __str__(self):
+ return str(self.task)
+
+ def __call__(self, cause):
+ if ((hasattr(self.task, "revert") and
+ isinstance(self.task.revert, collections.Callable))):
+ self.task.revert(self.context, self.result, cause)
+
+
+class Runner(object):
+ """A helper class that wraps a task and can find the needed inputs for
+ the task to run, as well as providing a uuid and other useful functionality
+ for users of the task.
+
+ TODO(harlowja): replace with the task details object or a subclass of
+ that???
+ """
+
+ def __init__(self, task, uuid=None):
+ assert isinstance(task, collections.Callable)
+ self.task = task
+ self.providers = {}
+ self.runs_before = []
+ self.result = None
+ if not uuid:
+ self._id = uuidutils.generate_uuid()
+ else:
+ self._id = str(uuid)
+
+ @property
+ def uuid(self):
+ return "r-%s" % (self._id)
+
+ @property
+ def requires(self):
+ return set(get_attr(self.task, 'requires', []))
+
+ @property
+ def provides(self):
+ return set(get_attr(self.task, 'provides', []))
+
+ @property
+ def optional(self):
+ return set(get_attr(self.task, 'optional', []))
+
+ @property
+ def version(self):
+ return get_task_version(self.task)
+
+ @property
+ def name(self):
+ return get_task_name(self.task)
+
+ def reset(self):
+ self.result = None
+
+ def __str__(self):
+ lines = ["Runner: %s" % (self.name)]
+ lines.append("%s" % (self.uuid))
+ lines.append("%s" % (self.version))
+ return "; ".join(lines)
+
+ def __call__(self, *args, **kwargs):
+ # Find all of our inputs first.
+ kwargs = dict(kwargs)
+ for (k, who_made) in self.providers.iteritems():
+ if who_made.result and k in who_made.result:
+ kwargs[k] = who_made.result[k]
+ else:
+ kwargs[k] = None
+ optional_keys = self.optional
+ optional_missing_keys = optional_keys - set(kwargs.keys())
+ if optional_missing_keys:
+ for k in optional_missing_keys:
+ for r in self.runs_before:
+ r_provides = r.provides
+ if k in r_provides and r.result and k in r.result:
+ kwargs[k] = r.result[k]
+ break
+ # And now finally run.
+ self.result = self.task(*args, **kwargs)
+ return self.result
+
+
+class TransitionNotifier(object):
+ """A utility helper class that can be used to subscribe to
+ notifications of events occuring as well as allow a entity to post said
+ notifications to subscribers.
+ """
+
+ RESERVED_KEYS = ('details',)
+ ANY = '*'
+
+ def __init__(self):
+ self._listeners = collections.defaultdict(list)
+
+ def reset(self):
+ self._listeners = collections.defaultdict(list)
+
+ def notify(self, state, details):
+ listeners = list(self._listeners.get(self.ANY, []))
+ for i in self._listeners[state]:
+ if i not in listeners:
+ listeners.append(i)
+ if not listeners:
+ return
+ for (callback, args, kwargs) in listeners:
+ if args is None:
+ args = []
+ if kwargs is None:
+ kwargs = {}
+ kwargs['details'] = details
+ try:
+ callback(state, *args, **kwargs)
+ except Exception:
+ LOG.exception(("Failure calling callback %s to notify about"
+ " state transition %s"), callback, state)
+
+ def register(self, state, callback, args=None, kwargs=None):
+ assert isinstance(callback, collections.Callable)
+ for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
+ if cb is callback:
+ raise ValueError("Callback %s already registered" % (callback))
+ if kwargs:
+ for k in self.RESERVED_KEYS:
+ if k in kwargs:
+ raise KeyError(("Reserved key '%s' not allowed in "
+ "kwargs") % k)
+ kwargs = copy.copy(kwargs)
+ if args:
+ args = copy.copy(args)
+ self._listeners[state].append((callback, args, kwargs))
+
+ def deregister(self, state, callback):
+ if state not in self._listeners:
+ return
+ for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
+ if cb is callback:
+ self._listeners[state].pop(i)
+ break
+
+
+class RollbackAccumulator(object):
+ """A utility class that can help in organizing 'undo' like code
+ so that said code be rolled back on failure (automatically or manually)
+ by activating rollback callables that were inserted during said codes
+ progression.
+ """
+
+ def __init__(self):
+ self._rollbacks = []
+
+ def add(self, *callables):
+ self._rollbacks.extend(callables)
+
+ def reset(self):
+ self._rollbacks = []
+
+ def __len__(self):
+ return len(self._rollbacks)
+
+ def __iter__(self):
+ # Rollbacks happen in the reverse order that they were added.
+ return reversed(self._rollbacks)
+
+ def __enter__(self):
+ return self
+
+ def rollback(self, cause):
+ LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
+ for (i, f) in enumerate(self):
+ LOG.debug("Calling rollback %s: %s", i + 1, f)
+ try:
+ f(cause)
+ except Exception:
+ LOG.exception(("Failed rolling back %s: %s due "
+ "to inner exception."), i + 1, f)
+
+ def __exit__(self, type, value, tb):
+ if any((value, type, tb)):
+ self.rollback(value)
+
+
+class ReaderWriterLock(object):
+ """A simple reader-writer lock.
+
+ Several readers can hold the lock simultaneously, and only one writer.
+ Write locks have priority over reads to prevent write starvation.
+
+ Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
+ """
+
+ def __init__(self):
+ self.rwlock = 0
+ self.writers_waiting = 0
+ self.monitor = threading.Lock()
+ self.readers_ok = threading.Condition(self.monitor)
+ self.writers_ok = threading.Condition(self.monitor)
+
+ @contextlib.contextmanager
+ def acquire(self, read=True):
+ """Acquire a read or write lock in a context manager."""
+ try:
+ if read:
+ self.acquire_read()
+ else:
+ self.acquire_write()
+ yield self
+ finally:
+ self.release()
+
+ def acquire_read(self):
+ """Acquire a read lock.
+
+ Several threads can hold this typeof lock.
+ It is exclusive with write locks.
+ """
+
+ self.monitor.acquire()
+ while self.rwlock < 0 or self.writers_waiting:
+ self.readers_ok.wait()
+ self.rwlock += 1
+ self.monitor.release()
+
+ def acquire_write(self):
+ """Acquire a write lock.
+
+ Only one thread can hold this lock, and only when no read locks
+ are also held.
+ """
+
+ self.monitor.acquire()
+ while self.rwlock != 0:
+ self.writers_waiting += 1
+ self.writers_ok.wait()
+ self.writers_waiting -= 1
+ self.rwlock = -1
+ self.monitor.release()
+
+ def release(self):
+ """Release a lock, whether read or write."""
+
+ self.monitor.acquire()
+ if self.rwlock < 0:
+ self.rwlock = 0
+ else:
+ self.rwlock -= 1
+ wake_writers = self.writers_waiting and self.rwlock == 0
+ wake_readers = self.writers_waiting == 0
+ self.monitor.release()
+ if wake_writers:
+ self.writers_ok.acquire()
+ self.writers_ok.notify()
+ self.writers_ok.release()
+ elif wake_readers:
+ self.readers_ok.acquire()
+ self.readers_ok.notifyAll()
+ self.readers_ok.release()
+
+
+class LazyPluggable(object):
+ """A pluggable backend loaded lazily based on some value."""
+
+ def __init__(self, pivot, **backends):
+ self.__backends = backends
+ self.__pivot = pivot
+ self.__backend = None
+
+ def __get_backend(self):
+ if not self.__backend:
+ backend_name = 'sqlalchemy'
+ backend = self.__backends[backend_name]
+ if isinstance(backend, tuple):
+ name = backend[0]
+ fromlist = backend[1]
+ else:
+ name = backend
+ fromlist = backend
+
+ self.__backend = __import__(name, None, None, fromlist)
+ return self.__backend
+
+ def __getattr__(self, key):
+ backend = self.__get_backend()
+ return getattr(backend, key)
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
+from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
- 'launched_at': '',
+ 'launched_at': 'DONTCARE',
'size': 0,
}
self.assertDictMatch(msg['payload'], expected)
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
- 'launched_at': '',
+ 'launched_at': 'DONTCARE',
'size': 0,
}
self.assertDictMatch(msg['payload'], expected)
def test_create_volume_from_unelevated_context(self):
"""Test context does't change after volume creation failure."""
- def fake_create_volume(context, volume_ref, snapshot_ref,
- sourcevol_ref, image_service, image_id,
- image_location):
+ def fake_create_volume(*args, **kwargs):
raise exception.CinderException('fake exception')
- def fake_reschedule_or_error(context, volume_id, exc_info,
- snapshot_id, image_id, request_spec,
- filter_properties):
+ def fake_reschedule_or_error(self, context, *args, **kwargs):
self.assertFalse(context.is_admin)
self.assertFalse('admin' in context.roles)
#compare context passed in with the context we saved
#create one copy of context for future comparison
self.saved_ctxt = ctxt.deepcopy()
- self.stubs.Set(self.volume, '_reschedule_or_error',
+ self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
fake_reschedule_or_error)
- self.stubs.Set(self.volume, '_create_volume',
- fake_create_volume)
+ self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
volume_src = self._create_volume()
self.assertRaises(exception.CinderException,
db.volume_update(self.context, src_vref['id'], {'status': 'error'})
raise exception.CinderException('fake exception')
- def fake_reschedule_or_error(context, volume_id, exc_info,
- snapshot_id, image_id, request_spec,
- filter_properties):
- pass
-
- self.stubs.Set(self.volume, '_reschedule_or_error',
- fake_reschedule_or_error)
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
volume_src = self._create_volume()
raise exception.ConfigNotFound(path=os.path.abspath(config_path))
+def as_int(obj, quiet=True):
+ # Try "2" -> 2
+ try:
+ return int(obj)
+ except (ValueError, TypeError):
+ pass
+ # Try "2.5" -> 2
+ try:
+ return int(float(obj))
+ except (ValueError, TypeError):
+ pass
+ # Eck, not sure what this is then.
+ if not quiet:
+ raise TypeError(_("Can not translate %s to integer.") % (obj))
+ return obj
+
+
+def check_exclusive_options(**kwargs):
+ """Checks that only one of the provided options is actually not-none.
+
+ Iterates over all the kwargs passed in and checks that only one of said
+ arguments is not-none, if more than one is not-none then an exception will
+ be raised with the names of those arguments who were not-none.
+ """
+
+ if not kwargs:
+ return
+
+ pretty_keys = kwargs.pop("pretty_keys", True)
+ exclusive_options = {}
+ for (k, v) in kwargs.iteritems():
+ if v is not None:
+ exclusive_options[k] = True
+
+ if len(exclusive_options) > 1:
+ # Change the format of the names from pythonic to
+ # something that is more readable.
+ #
+ # Ex: 'the_key' -> 'the key'
+ if pretty_keys:
+ names = [k.replace('_', ' ') for k in kwargs.keys()]
+ else:
+ names = kwargs.keys()
+ names = ", ".join(sorted(names))
+ msg = (_("May specify only one of %s") % (names))
+ raise exception.InvalidInput(reason=msg)
+
+
def fetchfile(url, target):
LOG.debug(_('Fetching %s') % url)
execute('curl', '--fail', url, '-o', target)
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import units
from cinder import utils
+from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_types
+from cinder.taskflow import states
+
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
default=True,
CONF.import_opt('storage_availability_zone', 'cinder.volume.manager')
LOG = logging.getLogger(__name__)
-GB = units.GiB
QUOTAS = quota.QUOTAS
self.availability_zone_names = ()
super(API, self).__init__(db_driver)
- def create(self, context, size, name, description, snapshot=None,
- image_id=None, volume_type=None, metadata=None,
- availability_zone=None, source_volume=None,
- scheduler_hints=None):
-
- exclusive_options = (snapshot, image_id, source_volume)
- exclusive_options_set = sum(1 for option in
- exclusive_options if option is not None)
- if exclusive_options_set > 1:
- msg = (_("May specify only one of snapshot, imageRef "
- "or source volume"))
- raise exception.InvalidInput(reason=msg)
-
- check_policy(context, 'create')
- if snapshot is not None:
- if snapshot['status'] != "available":
- msg = _("status must be available")
- raise exception.InvalidSnapshot(reason=msg)
- if not size:
- size = snapshot['volume_size']
- elif size < snapshot['volume_size']:
- msg = _("Volume size cannot be lesser than"
- " the Snapshot size")
- raise exception.InvalidInput(reason=msg)
- snapshot_id = snapshot['id']
- else:
- snapshot_id = None
-
- if source_volume is not None:
- if source_volume['status'] == "error":
- msg = _("Unable to clone volumes that are in an error state")
- raise exception.InvalidSourceVolume(reason=msg)
- if not size:
- size = source_volume['size']
- else:
- if size < source_volume['size']:
- msg = _("Clones currently must be "
- ">= original volume size.")
- raise exception.InvalidInput(reason=msg)
- source_volid = source_volume['id']
- else:
- source_volid = None
-
- def as_int(s):
- try:
- return int(s)
- except (ValueError, TypeError):
- return s
-
- # tolerate size as stringified int
- size = as_int(size)
-
- if not isinstance(size, int) or size <= 0:
- msg = (_("Volume size '%s' must be an integer and greater than 0")
- % size)
- raise exception.InvalidInput(reason=msg)
-
- if (image_id and not (source_volume or snapshot)):
- # check image existence
- image_meta = self.image_service.show(context, image_id)
- image_size_in_gb = (int(image_meta['size']) + GB - 1) / GB
- #check image size is not larger than volume size.
- if image_size_in_gb > size:
- msg = _('Size of specified image is larger than volume size.')
- raise exception.InvalidInput(reason=msg)
- # Check image minDisk requirement is met for the particular volume
- if size < image_meta.get('min_disk', 0):
- msg = _('Image minDisk size is larger than the volume size.')
- raise exception.InvalidInput(reason=msg)
-
- if availability_zone is None:
- if snapshot is not None:
- availability_zone = snapshot['volume']['availability_zone']
- elif source_volume is not None:
- availability_zone = source_volume['availability_zone']
- else:
- availability_zone = CONF.storage_availability_zone
- else:
- self._check_availabilty_zone(availability_zone)
-
- if CONF.cloned_volume_same_az:
- if (snapshot and
- snapshot['volume']['availability_zone'] !=
- availability_zone):
- msg = _("Volume must be in the same "
- "availability zone as the snapshot")
- raise exception.InvalidInput(reason=msg)
- elif source_volume and \
- source_volume['availability_zone'] != availability_zone:
- msg = _("Volume must be in the same "
- "availability zone as the source volume")
- raise exception.InvalidInput(reason=msg)
-
- if not volume_type and not source_volume:
- volume_type = volume_types.get_default_volume_type()
-
- if not volume_type and source_volume:
- volume_type_id = source_volume['volume_type_id']
- else:
- volume_type_id = volume_type.get('id')
-
- try:
- reserve_opts = {'volumes': 1, 'gigabytes': size}
- QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
- reservations = QUOTAS.reserve(context, **reserve_opts)
- except exception.OverQuota as e:
- overs = e.kwargs['overs']
- usages = e.kwargs['usages']
- quotas = e.kwargs['quotas']
-
- def _consumed(name):
- return (usages[name]['reserved'] + usages[name]['in_use'])
-
- for over in overs:
- if 'gigabytes' in over:
- msg = _("Quota exceeded for %(s_pid)s, tried to create "
- "%(s_size)sG volume (%(d_consumed)dG of "
- "%(d_quota)dG already consumed)")
- LOG.warn(msg % {'s_pid': context.project_id,
- 's_size': size,
- 'd_consumed': _consumed(over),
- 'd_quota': quotas[over]})
- raise exception.VolumeSizeExceedsAvailableQuota()
- elif 'volumes' in over:
- msg = _("Quota exceeded for %(s_pid)s, tried to create "
- "volume (%(d_consumed)d volumes"
- "already consumed)")
- LOG.warn(msg % {'s_pid': context.project_id,
- 'd_consumed': _consumed(over)})
- raise exception.VolumeLimitExceeded(allowed=quotas[over])
-
- self._check_metadata_properties(context, metadata)
- options = {'size': size,
- 'user_id': context.user_id,
- 'project_id': context.project_id,
- 'snapshot_id': snapshot_id,
- 'availability_zone': availability_zone,
- 'status': "creating",
- 'attach_status': "detached",
- 'display_name': name,
- 'display_description': description,
- 'volume_type_id': volume_type_id,
- 'metadata': metadata,
- 'source_volid': source_volid}
-
- try:
- volume = self.db.volume_create(context, options)
- QUOTAS.commit(context, reservations)
- except Exception:
- with excutils.save_and_reraise_exception():
- try:
- self.db.volume_destroy(context, volume['id'])
- finally:
- QUOTAS.rollback(context, reservations)
-
- request_spec = {'volume_properties': options,
- 'volume_type': volume_type,
- 'volume_id': volume['id'],
- 'snapshot_id': volume['snapshot_id'],
- 'image_id': image_id,
- 'source_volid': volume['source_volid']}
-
- if scheduler_hints:
- filter_properties = {'scheduler_hints': scheduler_hints}
- else:
- filter_properties = {}
-
- self._cast_create_volume(context, request_spec, filter_properties)
-
- return volume
-
- def _cast_create_volume(self, context, request_spec, filter_properties):
-
- # NOTE(Rongze Zhu): It is a simple solution for bug 1008866
- # If snapshot_id is set, make the call create volume directly to
- # the volume host where the snapshot resides instead of passing it
- # through the scheduler. So snapshot can be copy to new volume.
-
- source_volid = request_spec['source_volid']
- volume_id = request_spec['volume_id']
- snapshot_id = request_spec['snapshot_id']
- image_id = request_spec['image_id']
-
- if snapshot_id and CONF.snapshot_same_host:
- snapshot_ref = self.db.snapshot_get(context, snapshot_id)
- source_volume_ref = self.db.volume_get(context,
- snapshot_ref['volume_id'])
- now = timeutils.utcnow()
- values = {'host': source_volume_ref['host'], 'scheduled_at': now}
- volume_ref = self.db.volume_update(context, volume_id, values)
-
- # bypass scheduler and send request directly to volume
- self.volume_rpcapi.create_volume(
- context,
- volume_ref,
- volume_ref['host'],
- request_spec=request_spec,
- filter_properties=filter_properties,
- allow_reschedule=False,
- snapshot_id=snapshot_id,
- image_id=image_id)
- elif source_volid:
- source_volume_ref = self.db.volume_get(context,
- source_volid)
- now = timeutils.utcnow()
- values = {'host': source_volume_ref['host'], 'scheduled_at': now}
- volume_ref = self.db.volume_update(context, volume_id, values)
-
- # bypass scheduler and send request directly to volume
- self.volume_rpcapi.create_volume(
- context,
- volume_ref,
- volume_ref['host'],
- request_spec=request_spec,
- filter_properties=filter_properties,
- allow_reschedule=False,
- snapshot_id=snapshot_id,
- image_id=image_id,
- source_volid=source_volid)
- else:
- self.scheduler_rpcapi.create_volume(
- context,
- CONF.volume_topic,
- volume_id,
- snapshot_id,
- image_id,
- request_spec=request_spec,
- filter_properties=filter_properties)
-
- def _check_availabilty_zone(self, availability_zone):
+ def _valid_availabilty_zone(self, availability_zone):
#NOTE(bcwaldon): This approach to caching fails to handle the case
# that an availability zone is disabled/removed.
if availability_zone in self.availability_zone_names:
- return
+ return True
+ if CONF.storage_availability_zone == availability_zone:
+ return True
azs = self.list_availability_zones()
self.availability_zone_names = [az['name'] for az in azs]
-
- if availability_zone not in self.availability_zone_names:
- msg = _("Availability zone is invalid")
- LOG.warn(msg)
- raise exception.InvalidInput(reason=msg)
+ return availability_zone in self.availability_zone_names
def list_availability_zones(self):
"""Describe the known availability zones
return tuple(azs)
+ def create(self, context, size, name, description, snapshot=None,
+ image_id=None, volume_type=None, metadata=None,
+ availability_zone=None, source_volume=None,
+ scheduler_hints=None):
+
+ def check_volume_az_zone(availability_zone):
+ try:
+ return self._valid_availabilty_zone(availability_zone)
+ except exception.CinderException:
+ LOG.exception(_("Unable to query if %s is in the "
+ "availability zone set"), availability_zone)
+ return False
+
+ create_what = {
+ 'size': size,
+ 'name': name,
+ 'description': description,
+ 'snapshot': snapshot,
+ 'image_id': image_id,
+ 'volume_type': volume_type,
+ 'metadata': metadata,
+ 'availability_zone': availability_zone,
+ 'source_volume': source_volume,
+ 'scheduler_hints': scheduler_hints,
+ }
+ (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
+ self.volume_rpcapi,
+ self.db,
+ self.image_service,
+ check_volume_az_zone,
+ create_what)
+
+ assert flow, _('Create volume flow not retrieved')
+ flow.run(context)
+ if flow.state != states.SUCCESS:
+ raise exception.CinderException(_("Failed to successfully complete"
+ " create volume workflow"))
+
+ # Extract the volume information from the task uuid that was specified
+ # to produce said information.
+ volume = None
+ try:
+ volume = flow.results[uuid]['volume']
+ except KeyError:
+ pass
+
+ # Raise an error, nobody provided it??
+ assert volume, _('Expected volume result not found')
+ return volume
+
@wrap_check_policy
def delete(self, context, volume, force=False):
if context.is_admin and context.project_id != volume['project_id']:
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+# Copyright (c) 2013 OpenStack, LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import traceback
+
+from oslo.config import cfg
+
+from cinder import exception
+from cinder.image import glance
+from cinder import policy
+from cinder import quota
+from cinder import units
+from cinder import utils
+
+from cinder.openstack.common import excutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.notifier import api as notifier
+from cinder.openstack.common import timeutils
+from cinder.volume import utils as volume_utils
+from cinder.volume import volume_types
+
+from cinder.taskflow import decorators
+from cinder.taskflow.patterns import linear_flow
+from cinder.taskflow import task
+
+LOG = logging.getLogger(__name__)
+
+ACTION = 'volume:create'
+CONF = cfg.CONF
+GB = units.GiB
+QUOTAS = quota.QUOTAS
+
+# Only in these 'sources' status can we attempt to create a volume from a
+# source volume or a source snapshot, other status states we can not create
+# from, 'error' being the common example.
+PROCEED_STATUS = ('available',)
+
+# When a volume errors out we have the ability to save a piece of the exception
+# that caused said failure, but we don't want to save the whole message since
+# that could be very large, just save up to this number of characters.
+REASON_LENGTH = 128
+
+# These attributes we will attempt to save for the volume if they exist
+# in the source image metadata.
+IMAGE_ATTRIBUTES = (
+ 'checksum',
+ 'container_format',
+ 'disk_format',
+ 'min_disk',
+ 'min_ram',
+ 'size',
+)
+
+
+def _make_pretty_name(method):
+ """Makes a pretty name for a function/method."""
+ meth_pieces = [method.__name__]
+ # If its an instance method attempt to tack on the class name
+ if hasattr(method, 'im_self') and method.im_self is not None:
+ try:
+ meth_pieces.insert(0, method.im_self.__class__.__name__)
+ except AttributeError:
+ pass
+ return ".".join(meth_pieces)
+
+
+def _find_result_spec(flow):
+ """Find the last task that produced a valid volume_spec and returns it."""
+ for there_result in flow.results.values():
+ if not there_result or not 'volume_spec' in there_result:
+ continue
+ if there_result['volume_spec']:
+ return there_result['volume_spec']
+ return None
+
+
+def _make_task_name(klass, addons=None):
+ """Makes a pretty name for a task class."""
+ components = [klass.__module__, klass.__name__]
+ if addons:
+ for a in addons:
+ components.append(str(a))
+ return "%s#%s" % (ACTION, ".".join(components))
+
+
+def _restore_source_status(context, db, volume_spec):
+ # NOTE(harlowja): Only if the type of the volume that was being created is
+ # the source volume type should we try to reset the source volume status
+ # back to its original value.
+ if not volume_spec or volume_spec.get('type') != 'source_vol':
+ return
+ source_volid = volume_spec['source_volid']
+ source_status = volume_spec['source_volstatus']
+ try:
+ LOG.debug(_('Restoring source %(source_volid)s status to %(status)s') %
+ {'status': source_status, 'source_volid': source_volid})
+ db.volume_update(context, source_volid, {'status': source_status})
+ except exception.CinderException:
+ # NOTE(harlowja): Don't let this cause further exceptions since this is
+ # a non-critical failure.
+ LOG.exception(_("Failed setting source volume %(source_volid)s back to"
+ " its initial %(source_status)s status") %
+ {'source_status': source_status,
+ 'source_volid': source_volid})
+
+
+def _error_out_volume(context, db, volume_id, reason=None):
+
+ def _clean_reason(reason):
+ if reason is None:
+ return '???'
+ reason = str(reason)
+ if len(reason) <= REASON_LENGTH:
+ return reason
+ else:
+ return reason[0:REASON_LENGTH] + '...'
+
+ update = {
+ 'status': 'error',
+ }
+ reason = _clean_reason(reason)
+ # TODO(harlowja): re-enable when we can support this in the database.
+ # if reason:
+ # status['details'] = reason
+ try:
+ LOG.debug(_('Updating volume: %(volume_id)s with %(update)s'
+ ' due to: %(reason)s') % {'volume_id': volume_id,
+ 'reason': reason,
+ 'update': update})
+ db.volume_update(context, volume_id, update)
+ except exception.CinderException:
+ # Don't let this cause further exceptions.
+ LOG.exception(_("Failed updating volume %(volume_id)s with"
+ " %(update)s") % {'volume_id': volume_id,
+ 'update': update})
+
+
+class CinderTask(task.Task):
+ """The root task class for all cinder tasks.
+
+ It automatically names the given task using the module and class that
+ implement the given task as the task name.
+
+ TODO(harlowja): this likely should be moved later to a common folder in the
+ future when more of cinders flows are implemented.
+ """
+
+ def __init__(self, addons=None):
+ super(CinderTask, self).__init__(_make_task_name(self.__class__,
+ addons))
+
+
+class ValuesInjectTask(CinderTask):
+ """This injects a dict into the flow.
+
+ This injection is done so that the keys (and values) provided can be
+ dependended on by tasks further down the line. Since taskflow is dependency
+ based this can be considered the bootstrapping task that provides an
+ initial set of values for other tasks to get started with. If this did not
+ exist then tasks would fail locating there dependent tasks and the values
+ said dependent tasks produce.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, inject_what):
+ super(ValuesInjectTask, self).__init__()
+ self.provides.update(inject_what.keys())
+ self._inject = inject_what
+
+ def __call__(self, context):
+ return dict(self._inject)
+
+
+class ExtractVolumeRequestTask(CinderTask):
+ """Processes an api request values into a validated set of values.
+
+ This tasks responsibility is to take in a set of inputs that will form
+ a potential volume request and validates those values against a set of
+ conditions and/or translates those values into a valid set and then returns
+ the validated/translated values for use by other tasks.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, image_service, az_check_functor=None):
+ super(ExtractVolumeRequestTask, self).__init__()
+ # This task will produce the following outputs (said outputs can be
+ # saved to durable storage in the future so that the flow can be
+ # reconstructed elsewhere and continued).
+ self.provides.update(['availability_zone', 'size', 'snapshot_id',
+ 'source_volid', 'volume_type', 'volume_type_id'])
+ # This task requires the following inputs to operate (provided
+ # automatically to __call__(). This is done so that the flow can
+ # be reconstructed elsewhere and continue running (in the future).
+ #
+ # It also is used to be able to link tasks that produce items to tasks
+ # that consume items (thus allowing the linking of the flow to be
+ # mostly automatic).
+ self.requires.update(['availability_zone', 'image_id', 'metadata',
+ 'size', 'snapshot', 'source_volume',
+ 'volume_type'])
+ self.image_service = image_service
+ self.az_check_functor = az_check_functor
+ if not self.az_check_functor:
+ self.az_check_functor = lambda az: True
+
+ @staticmethod
+ def _extract_snapshot(snapshot):
+ """Extracts the snapshot id from the provided snapshot (if provided).
+
+ This function validates the input snapshot dict and checks that the
+ status of that snapshot is valid for creating a volume from.
+ """
+
+ snapshot_id = None
+ if snapshot is not None:
+ if snapshot['status'] not in PROCEED_STATUS:
+ msg = _("Originating snapshot status must be one"
+ " of %s values")
+ msg = msg % (", ".join(PROCEED_STATUS))
+ # TODO(harlowja): what happens if the status changes after this
+ # initial snapshot status check occurs??? Seems like someone
+ # could delete the snapshot after this check passes but before
+ # the volume is offically created?
+ raise exception.InvalidSnapshot(reason=msg)
+ snapshot_id = snapshot['id']
+ return snapshot_id
+
+ @staticmethod
+ def _extract_source_volume(source_volume):
+ """Extracts the volume id from the provided volume (if provided).
+
+ This function validates the input source_volume dict and checks that
+ the status of that source_volume is valid for creating a volume from.
+ """
+
+ source_volid = None
+ if source_volume is not None:
+ if source_volume['status'] not in PROCEED_STATUS:
+ msg = _("Unable to create a volume from an originating source"
+ " volume when its status is not one of %s"
+ " values")
+ msg = msg % (", ".join(PROCEED_STATUS))
+ # TODO(harlowja): what happens if the status changes after this
+ # initial volume status check occurs??? Seems like someone
+ # could delete the volume after this check passes but before
+ # the volume is offically created?
+ raise exception.InvalidVolume(reason=msg)
+ source_volid = source_volume['id']
+ return source_volid
+
+ @staticmethod
+ def _extract_size(size, source_volume, snapshot):
+ """Extracts and validates the volume size.
+
+ This function will validate or when not provided fill in the provided
+ size variable from the source_volume or snapshot and then does
+ validation on the size that is found and returns said validated size.
+ """
+
+ def validate_snap_size(size):
+ if snapshot and size < snapshot['volume_size']:
+ msg = _("Volume size %(size)s cannot be lesser than"
+ " the snapshot size %(snap_size)s. "
+ "They must be >= original snapshot size.")
+ msg = msg % {'size': size,
+ 'snap_size': snapshot['volume_size']}
+ raise exception.InvalidInput(reason=msg)
+
+ def validate_source_size(size):
+ if source_volume and size < source_volume['size']:
+ msg = _("Clones currently disallowed when "
+ "%(size)s < %(source_size)s. "
+ "They must be >= original volume size.")
+ msg = msg % {'size': size,
+ 'source_size': source_volume['size']}
+ raise exception.InvalidInput(reason=msg)
+
+ def validate_int(size):
+ if not isinstance(size, int) or size <= 0:
+ msg = _("Volume size %(size)s must be an integer and"
+ " greater than 0") % {'size': size}
+ raise exception.InvalidInput(reason=msg)
+
+ # Figure out which validation functions we should be applying
+ # on the size value that we extract.
+ validator_functors = [validate_int]
+ if source_volume:
+ validator_functors.append(validate_source_size)
+ elif snapshot:
+ validator_functors.append(validate_snap_size)
+
+ # If the size is not provided then try to provide it.
+ if not size and source_volume:
+ size = source_volume['size']
+ elif not size and snapshot:
+ size = snapshot['volume_size']
+
+ size = utils.as_int(size)
+ LOG.debug("Validating volume %(size)s using %(functors)s" %
+ {'size': size,
+ 'functors': ", ".join([_make_pretty_name(func)
+ for func in validator_functors])})
+ for func in validator_functors:
+ func(size)
+ return size
+
+ def _check_image_metadata(self, context, image_id, size):
+ """Checks image existence and validates that the image metadata."""
+
+ # Check image existence
+ if not image_id:
+ return
+
+ # NOTE(harlowja): this should raise an error if the image does not
+ # exist, this is expected as it signals that the image_id is missing.
+ image_meta = self.image_service.show(context, image_id)
+
+ # Check image size is not larger than volume size.
+ image_size = utils.as_int(image_meta['size'], quiet=False)
+ image_size_in_gb = (image_size + GB - 1) / GB
+ if image_size_in_gb > size:
+ msg = _('Size of specified image %(image_size)s'
+ ' is larger than volume size %(volume_size)s.')
+ msg = msg % {'image_size': image_size_in_gb, 'volume_size': size}
+ raise exception.InvalidInput(reason=msg)
+
+ # Check image min_disk requirement is met for the particular volume
+ min_disk = image_meta.get('min_disk', 0)
+ if size < min_disk:
+ msg = _('Image minDisk size %(min_disk)s is larger'
+ ' than the volume size %(volume_size)s.')
+ msg = msg % {'min_disk': min_disk, 'volume_size': size}
+ raise exception.InvalidInput(reason=msg)
+
+ @staticmethod
+ def _check_metadata_properties(metadata=None):
+ """Checks that the volume metadata properties are valid."""
+
+ if not metadata:
+ metadata = {}
+
+ for (k, v) in metadata.iteritems():
+ if len(k) == 0:
+ msg = _("Metadata property key blank")
+ LOG.warn(msg)
+ raise exception.InvalidVolumeMetadata(reason=msg)
+ if len(k) > 255:
+ msg = _("Metadata property key %s greater than 255 "
+ "characters") % k
+ LOG.warn(msg)
+ raise exception.InvalidVolumeMetadataSize(reason=msg)
+ if len(v) > 255:
+ msg = _("Metadata property key %s value greater than"
+ " 255 characters") % k
+ LOG.warn(msg)
+ raise exception.InvalidVolumeMetadataSize(reason=msg)
+
+ def _extract_availability_zone(self, availability_zone, snapshot,
+ source_volume):
+ """Extracts and returns a validated availability zone.
+
+ This function will extract the availability zone (if not provided) from
+ the snapshot or source_volume and then performs a set of validation
+ checks on the provided or extracted availability zone and then returns
+ the validated availability zone.
+ """
+
+ # Try to extract the availability zone from the corresponding snapshot
+ # or source volume if either is valid so that we can be in the same
+ # availability zone as the source.
+ if availability_zone is None:
+ if snapshot:
+ try:
+ availability_zone = snapshot['volume']['availability_zone']
+ except (TypeError, KeyError):
+ pass
+ if source_volume and availability_zone is None:
+ try:
+ availability_zone = source_volume['availability_zone']
+ except (TypeError, KeyError):
+ pass
+
+ if availability_zone is None:
+ availability_zone = CONF.storage_availability_zone
+ if not self.az_check_functor(availability_zone):
+ msg = _("Availability zone '%s' is invalid") % (availability_zone)
+ LOG.warn(msg)
+ raise exception.InvalidInput(reason=msg)
+
+ # If the configuration only allows cloning to the same availability
+ # zone then we need to enforce that.
+ if CONF.cloned_volume_same_az:
+ snap_az = None
+ try:
+ snap_az = snapshot['volume']['availability_zone']
+ except (TypeError, KeyError):
+ pass
+ if snap_az and snap_az != availability_zone:
+ msg = _("Volume must be in the same "
+ "availability zone as the snapshot")
+ raise exception.InvalidInput(reason=msg)
+ source_vol_az = None
+ try:
+ source_vol_az = source_volume['availability_zone']
+ except (TypeError, KeyError):
+ pass
+ if source_vol_az and source_vol_az != availability_zone:
+ msg = _("Volume must be in the same "
+ "availability zone as the source volume")
+ raise exception.InvalidInput(reason=msg)
+
+ return availability_zone
+
+ def __call__(self, context, size, snapshot, image_id, source_volume,
+ availability_zone, volume_type, metadata):
+
+ utils.check_exclusive_options(snapshot=snapshot,
+ imageRef=image_id,
+ source_volume=source_volume)
+ policy.enforce_action(context, ACTION)
+
+ # TODO(harlowja): what guarantee is there that the snapshot or source
+ # volume will remain available after we do this initial verification??
+ snapshot_id = self._extract_snapshot(snapshot)
+ source_volid = self._extract_source_volume(source_volume)
+ size = self._extract_size(size, source_volume, snapshot)
+
+ self._check_image_metadata(context, image_id, size)
+
+ availability_zone = self._extract_availability_zone(availability_zone,
+ snapshot,
+ source_volume)
+
+ if not volume_type and not source_volume:
+ volume_type = volume_types.get_default_volume_type()
+ if not volume_type and source_volume:
+ volume_type_id = source_volume['volume_type_id']
+ else:
+ volume_type_id = volume_type.get('id')
+
+ self._check_metadata_properties(metadata)
+
+ return {
+ 'size': size,
+ 'snapshot_id': snapshot_id,
+ 'source_volid': source_volid,
+ 'availability_zone': availability_zone,
+ 'volume_type': volume_type,
+ 'volume_type_id': volume_type_id,
+ }
+
+
+class EntryCreateTask(CinderTask):
+ """Creates an entry for the given volume creation in the database.
+
+ Reversion strategy: remove the volume_id created from the database.
+ """
+
+ def __init__(self, db):
+ super(EntryCreateTask, self).__init__()
+ self.db = db
+ self.requires.update(['availability_zone', 'description', 'metadata',
+ 'name', 'reservations', 'size', 'snapshot_id',
+ 'source_volid', 'volume_type_id'])
+ self.provides.update(['volume_properties', 'volume_id'])
+
+ def __call__(self, context, **kwargs):
+ """Creates a database entry for the given inputs and returns details.
+
+ Accesses the database and creates a new entry for the to be created
+ volume using the given volume properties which are extracted from the
+ input kwargs (and associated requirements this task needs). These
+ requirements should be previously satisifed and validated by a
+ pre-cursor task.
+ """
+
+ volume_properties = {
+ 'size': kwargs.pop('size'),
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'status': 'creating',
+ 'attach_status': 'detached',
+ # Rename these to the internal name.
+ 'display_description': kwargs.pop('description'),
+ 'display_name': kwargs.pop('name'),
+ }
+
+ # Merge in the other required arguments which should provide the rest
+ # of the volume property fields (if applicable).
+ volume_properties.update(kwargs)
+ volume = self.db.volume_create(context, volume_properties)
+
+ return {
+ 'volume_id': volume['id'],
+ 'volume_properties': volume_properties,
+ # NOTE(harlowja): it appears like further usage of this volume
+ # result actually depend on it being a sqlalchemy object and not
+ # just a plain dictionary so thats why we are storing this here.
+ #
+ # In the future where this task results can be serialized and
+ # restored automatically for continued running we will need to
+ # resolve the serialization & recreation of this object since raw
+ # sqlalchemy objects can't be serialized.
+ 'volume': volume,
+ }
+
+ def revert(self, context, result, cause):
+ # We never produced a result and therefore can't destroy anything.
+ if not result:
+ return
+ vol_id = result['volume_id']
+ try:
+ self.db.volume_destroy(context, vol_id)
+ except exception.CinderException:
+ # We are already reverting, therefore we should silence this
+ # exception since a second exception being active will be bad.
+ #
+ # NOTE(harlowja): Being unable to destroy a volume is pretty
+ # bad though!!
+ LOG.exception(_("Failed destroying volume entry %s"), vol_id)
+
+
+class QuotaReserveTask(CinderTask):
+ """Reserves a single volume with the given size & the given volume type.
+
+ Reversion strategy: rollback the quota reservation.
+
+ Warning Warning: if the process that is running this reserve and commit
+ process fails (or is killed before the quota is rolled back or commited
+ it does appear like the quota will never be rolled back). This makes
+ software upgrades hard (inflight operations will need to be stopped or
+ allowed to complete before the upgrade can occur). *In the future* when
+ taskflow has persistence built-in this should be easier to correct via
+ an automated or manual process.
+ """
+
+ def __init__(self):
+ super(QuotaReserveTask, self).__init__()
+ self.requires.update(['size', 'volume_type_id'])
+ self.provides.update(['reservations'])
+
+ def __call__(self, context, size, volume_type_id):
+ try:
+ reserve_opts = {'volumes': 1, 'gigabytes': size}
+ QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
+ reservations = QUOTAS.reserve(context, **reserve_opts)
+ return {
+ 'reservations': reservations,
+ }
+ except exception.OverQuota as e:
+ overs = e.kwargs['overs']
+ quotas = e.kwargs['quotas']
+ usages = e.kwargs['usages']
+
+ def _consumed(name):
+ return (usages[name]['reserved'] + usages[name]['in_use'])
+
+ def _is_over(name):
+ for over in overs:
+ if name in over:
+ return True
+ return False
+
+ if _is_over('gigabytes'):
+ msg = _("Quota exceeded for %(s_pid)s, tried to create "
+ "%(s_size)sG volume (%(d_consumed)dG "
+ "of %(d_quota)dG already consumed)")
+ LOG.warn(msg % {'s_pid': context.project_id,
+ 's_size': size,
+ 'd_consumed': _consumed('gigabytes'),
+ 'd_quota': quotas['gigabytes']})
+ raise exception.VolumeSizeExceedsAvailableQuota()
+ elif _is_over('volumes'):
+ msg = _("Quota exceeded for %(s_pid)s, tried to create "
+ "volume (%(d_consumed)d volumes "
+ "already consumed)")
+ LOG.warn(msg % {'s_pid': context.project_id,
+ 'd_consumed': _consumed('volumes')})
+ allowed = quotas['volumes']
+ raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
+ else:
+ # If nothing was reraised, ensure we reraise the initial error
+ raise
+
+ def revert(self, context, result, cause):
+ # We never produced a result and therefore can't destroy anything.
+ if not result:
+ return
+ # We actually produced an output that we can revert so lets attempt
+ # to use said output to rollback the reservation.
+ reservations = result['reservations']
+ try:
+ QUOTAS.rollback(context, reservations)
+ except exception.CinderException:
+ # We are already reverting, therefore we should silence this
+ # exception since a second exception being active will be bad.
+ LOG.exception(_("Failed rolling back quota for"
+ " %s reservations"), reservations)
+
+
+class QuotaCommitTask(CinderTask):
+ """Commits the reservation.
+
+ Reversion strategy: N/A (the rollback will be handled by the task that did
+ the initial reservation (see: QuotaReserveTask).
+
+ Warning Warning: if the process that is running this reserve and commit
+ process fails (or is killed before the quota is rolled back or commited
+ it does appear like the quota will never be rolled back). This makes
+ software upgrades hard (inflight operations will need to be stopped or
+ allowed to complete before the upgrade can occur). *In the future* when
+ taskflow has persistence built-in this should be easier to correct via
+ an automated or manual process.
+ """
+
+ def __init__(self):
+ super(QuotaCommitTask, self).__init__()
+ self.requires.update(['reservations'])
+
+ def __call__(self, context, reservations):
+ QUOTAS.commit(context, reservations)
+
+
+class VolumeCastTask(CinderTask):
+ """Performs a volume create cast to the scheduler or to the volume manager.
+
+ This which will signal a transition of the api workflow to another child
+ and/or related workflow on another component.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
+ super(VolumeCastTask, self).__init__()
+ self.volume_rpcapi = volume_rpcapi
+ self.scheduler_rpcapi = scheduler_rpcapi
+ self.db = db
+ self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
+ 'source_volid', 'volume_id', 'volume_type',
+ 'volume_properties'])
+
+ def _cast_create_volume(self, context, request_spec, filter_properties):
+ # NOTE(Rongze Zhu): A simple solution for bug 1008866.
+ #
+ # If snapshot_id is set, make the call create volume directly to
+ # the volume host where the snapshot resides instead of passing it
+ # through the scheduler. So snapshot can be copy to new volume.
+ source_volid = request_spec['source_volid']
+ volume_id = request_spec['volume_id']
+ snapshot_id = request_spec['snapshot_id']
+ image_id = request_spec['image_id']
+ host = None
+
+ if snapshot_id and CONF.snapshot_same_host:
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ source_volume_ref = self.db.volume_get(context,
+ snapshot_ref['volume_id'])
+ host = source_volume_ref['host']
+ elif source_volid:
+ source_volume_ref = self.db.volume_get(context, source_volid)
+ host = source_volume_ref['host']
+
+ if not host:
+ # Cast to the scheduler and let it handle whatever is needed
+ # to select the target host for this volume.
+ self.scheduler_rpcapi.create_volume(
+ context,
+ CONF.volume_topic,
+ volume_id,
+ snapshot_id,
+ image_id,
+ request_spec=request_spec,
+ filter_properties=filter_properties)
+ else:
+ # Bypass the scheduler and send the request directly to the volume
+ # manager.
+ now = timeutils.utcnow()
+ values = {'host': host, 'scheduled_at': now}
+ volume_ref = self.db.volume_update(context, volume_id, values)
+ self.volume_rpcapi.create_volume(
+ context,
+ volume_ref,
+ volume_ref['host'],
+ request_spec=request_spec,
+ filter_properties=filter_properties,
+ allow_reschedule=False,
+ snapshot_id=snapshot_id,
+ image_id=image_id)
+
+ def __call__(self, context, **kwargs):
+ scheduler_hints = kwargs.pop('scheduler_hints', None)
+ request_spec = kwargs.copy()
+ filter_properties = {}
+ if scheduler_hints:
+ filter_properties['scheduler_hints'] = scheduler_hints
+ self._cast_create_volume(context, request_spec, filter_properties)
+
+
+class OnFailureChangeStatusTask(CinderTask):
+ """Helper task that sets a volume id to status error.
+
+ Reversion strategy: On failure of any flow that includes this task the
+ volume id that is associated with this task will be have its status set
+ to error. If a volume specification is provided and the type of that spec
+ is a source volume said source volume will have its status status updated
+ as well.
+ """
+
+ def __init__(self, db):
+ super(OnFailureChangeStatusTask, self).__init__()
+ self.db = db
+ self.requires.update(['volume_id'])
+ self.optional.update(['volume_spec'])
+
+ def __call__(self, context, volume_id, volume_spec=None):
+ # Save these items since we only use them if a reversion is triggered.
+ return {
+ 'volume_id': volume_id,
+ 'volume_spec': volume_spec,
+ }
+
+ def revert(self, context, result, cause):
+ volume_spec = result.get('volume_spec')
+ if not volume_spec:
+ # Attempt to use it from a later task that *should* have populated
+ # this from the database. It is not needed to be found since
+ # reverting will continue without it.
+ volume_spec = _find_result_spec(cause.flow)
+
+ # Restore the source volume status and set the volume to error status.
+ volume_id = result['volume_id']
+ _restore_source_status(context, self.db, volume_spec)
+ _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+ LOG.error(_("Volume %s: create failed"), volume_id)
+ exc_info = False
+ if all(cause.exc_info):
+ exc_info = cause.exc_info
+ LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
+
+class OnFailureRescheduleTask(CinderTask):
+ """Triggers a rescheduling request to be sent when reverting occurs.
+
+ Reversion strategy: Triggers the rescheduling mechanism whereby a cast gets
+ sent to the scheduler rpc api to allow for an attempt X of Y for scheduling
+ this volume elsewhere.
+ """
+
+ def __init__(self, reschedule_context, db, scheduler_rpcapi):
+ super(OnFailureRescheduleTask, self).__init__()
+ self.requires.update(['filter_properties', 'image_id', 'request_spec',
+ 'snapshot_id', 'volume_id'])
+ self.optional.update(['volume_spec'])
+ self.scheduler_rpcapi = scheduler_rpcapi
+ self.db = db
+ self.reschedule_context = reschedule_context
+ # These exception types will trigger the volume to be set into error
+ # status rather than being rescheduled.
+ self.no_reschedule_types = [
+ # The volume has already finished being created when the exports
+ # occur, rescheduling would be bad if it happened due to exports
+ # not succeeding.
+ exception.ExportFailure,
+ # Image copying happens after volume creation so rescheduling due
+ # to copy failure will mean the same volume will be created at
+ # another place when it still exists locally.
+ exception.ImageCopyFailure,
+ # Metadata updates happen after the volume has been created so if
+ # they fail, rescheduling will likely attempt to create the volume
+ # on another machine when it still exists locally.
+ exception.MetadataCopyFailure,
+ exception.MetadataCreateFailure,
+ exception.MetadataUpdateFailure,
+ # The volume/snapshot has been removed from the database, that
+ # can not be fixed by rescheduling.
+ exception.VolumeNotFound,
+ exception.SnapshotNotFound,
+ exception.VolumeTypeNotFound,
+ ]
+
+ def _is_reschedulable(self, cause):
+ (exc_type, value, traceback) = cause.exc_info
+ if not exc_type and cause.exc:
+ exc_type = type(cause.exc)
+ if not exc_type:
+ return True
+ if exc_type in self.no_reschedule_types:
+ return False
+ return True
+
+ def __call__(self, context, *args, **kwargs):
+ # Save these items since we only use them if a reversion is triggered.
+ return kwargs.copy()
+
+ def _reschedule(self, context, cause, request_spec, filter_properties,
+ snapshot_id, image_id, volume_id, **kwargs):
+ """Actions that happen during the rescheduling attempt occur here."""
+
+ create_volume = self.scheduler_rpcapi.create_volume
+ if not filter_properties:
+ filter_properties = {}
+ if 'retry' not in filter_properties:
+ filter_properties['retry'] = {}
+
+ retry_info = filter_properties['retry']
+ num_attempts = retry_info.get('num_attempts', 0)
+ request_spec['volume_id'] = volume_id
+
+ LOG.debug(_("Volume %(volume_id)s: re-scheduling %(method)s "
+ "attempt %(num)d due to %(reason)s") %
+ {'volume_id': volume_id,
+ 'method': _make_pretty_name(create_volume),
+ 'num': num_attempts, 'reason': unicode(cause.exc)})
+
+ if all(cause.exc_info):
+ # Stringify to avoid circular ref problem in json serialization
+ retry_info['exc'] = traceback.format_exception(*cause.exc_info)
+
+ return create_volume(context, CONF.volume_topic, volume_id,
+ snapshot_id, image_id, request_spec,
+ filter_properties)
+
+ def _post_reschedule(self, context, volume_id):
+ """Actions that happen after the rescheduling attempt occur here."""
+
+ LOG.debug(_("Volume %s: re-scheduled"), volume_id)
+
+ def _pre_reschedule(self, context, volume_id):
+ """Actions that happen before the rescheduling attempt occur here."""
+
+ try:
+ # Reset the volume state.
+ #
+ # NOTE(harlowja): this is awkward to be done here, shouldn't
+ # this happen at the scheduler itself and not before it gets
+ # sent to the scheduler? (since what happens if it never gets
+ # there??). It's almost like we need a status of 'on-the-way-to
+ # scheduler' in the future.
+ update = {
+ 'status': 'creating',
+ 'scheduled_at': timeutils.utcnow(),
+ }
+ LOG.debug(_("Updating volume %(volume_id)s with %(update)s") %
+ {'update': update, 'volume_id': volume_id})
+ self.db.volume_update(context, volume_id, update)
+ except exception.CinderException:
+ # Don't let resetting the status cause the rescheduling to fail.
+ LOG.exception(_("Volume %s: resetting 'creating' status failed"),
+ volume_id)
+
+ def revert(self, context, result, cause):
+ volume_spec = result.get('volume_spec')
+ if not volume_spec:
+ # Find it from a prior task that populated this from the database.
+ volume_spec = _find_result_spec(cause.flow)
+ volume_id = result['volume_id']
+
+ # Use a different context when rescheduling.
+ if self.reschedule_context:
+ context = self.reschedule_context
+
+ # If we are now supposed to reschedule (or unable to), then just
+ # restore the source volume status and set the volume to error status.
+ def do_error_revert():
+ LOG.debug(_("Failing volume %s creation by altering volume status"
+ " instead of rescheduling"), volume_id)
+ _restore_source_status(context, self.db, volume_spec)
+ _error_out_volume(context, self.db, volume_id, reason=cause.exc)
+ LOG.error(_("Volume %s: create failed"), volume_id)
+
+ # Check if we have a cause which can tell us not to reschedule.
+ if not self._is_reschedulable(cause):
+ do_error_revert()
+ else:
+ try:
+ self._pre_reschedule(context, volume_id)
+ self._reschedule(context, cause, **result)
+ self._post_reschedule(context, volume_id)
+ except exception.CinderException:
+ LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
+ # NOTE(harlowja): Do error volume status changing instead.
+ do_error_revert()
+ exc_info = False
+ if all(cause.exc_info):
+ exc_info = cause.exc_info
+ LOG.error(_('Unexpected build error:'), exc_info=exc_info)
+
+
+class NotifySchedulerFailureTask(CinderTask):
+ """Helper task that notifies some external service on failure.
+
+ Reversion strategy: On failure of any flow that includes this task the
+ request specification associated with that flow will be extracted and
+ sent as a payload to the notification service under the given methods
+ scheduler topic.
+ """
+
+ def __init__(self, method):
+ super(NotifySchedulerFailureTask, self).__init__()
+ self.requires.update(['request_spec', 'volume_id'])
+ self.method = method
+ self.topic = 'scheduler.%s' % self.method
+ self.publisher_id = notifier.publisher_id("scheduler")
+
+ def __call__(self, context, **kwargs):
+ # Save these items since we only use them if a reversion is triggered.
+ return kwargs.copy()
+
+ def revert(self, context, result, cause):
+ request_spec = result['request_spec']
+ volume_id = result['volume_id']
+ volume_properties = request_spec['volume_properties']
+ payload = {
+ 'request_spec': request_spec,
+ 'volume_properties': volume_properties,
+ 'volume_id': volume_id,
+ 'state': 'error',
+ 'method': self.method,
+ 'reason': unicode(cause.exc),
+ }
+ try:
+ notifier.notify(context, self.publisher_id, self.topic,
+ notifier.ERROR, payload)
+ except exception.CinderException:
+ LOG.exception(_("Failed notifying on %(topic)s "
+ "payload %(payload)s") % {'topic': self.topic,
+ 'payload': payload})
+
+
+class ExtractSchedulerSpecTask(CinderTask):
+ """Extracts a spec object from a partial and/or incomplete request spec.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, db):
+ super(ExtractSchedulerSpecTask, self).__init__()
+ self.db = db
+ self.requires.update(['image_id', 'request_spec', 'snapshot_id',
+ 'volume_id'])
+ self.provides.update(['request_spec'])
+
+ def _populate_request_spec(self, context, volume_id, snapshot_id,
+ image_id):
+ # Create the full request spec using the volume_id.
+ #
+ # NOTE(harlowja): this will fetch the volume from the database, if
+ # the volume has been deleted before we got here then this should fail.
+ #
+ # In the future we might want to have a lock on the volume_id so that
+ # the volume can not be deleted while its still being created?
+ if not volume_id:
+ msg = _("No volume_id provided to populate a request_spec from")
+ raise exception.InvalidInput(reason=msg)
+ volume_ref = self.db.volume_get(context, volume_id)
+ volume_type_id = volume_ref.get('volume_type_id')
+ vol_type = self.db.volume_type_get(context, volume_type_id)
+ return {
+ 'volume_id': volume_id,
+ 'snapshot_id': snapshot_id,
+ 'image_id': image_id,
+ 'volume_properties': {
+ 'size': utils.as_int(volume_ref.get('size'), quiet=False),
+ 'availability_zone': volume_ref.get('availability_zone'),
+ 'volume_type_id': volume_type_id,
+ },
+ 'volume_type': list(dict(vol_type).iteritems()),
+ }
+
+ def __call__(self, context, request_spec, volume_id, snapshot_id,
+ image_id):
+ # For RPC version < 1.2 backward compatibility
+ if request_spec is None:
+ request_spec = self._populate_request_spec(context, volume_id,
+ snapshot_id, image_id)
+ return {
+ 'request_spec': request_spec,
+ }
+
+
+class ExtractVolumeSpecTask(CinderTask):
+ """Extracts a spec of a volume to be created into a common structure.
+
+ This task extracts and organizes the input requirements into a common
+ and easier to analyze structure for later tasks to use. It will also
+ attach the underlying database volume reference which can be used by
+ other tasks to reference for further details about the volume to be.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, db):
+ super(ExtractVolumeSpecTask, self).__init__()
+ self.db = db
+ self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
+ 'source_volid', 'volume_id'])
+ self.provides.update(['volume_spec', 'volume_ref'])
+
+ def __call__(self, context, volume_id, **kwargs):
+ get_remote_image_service = glance.get_remote_image_service
+
+ # NOTE(harlowja): this will fetch the volume from the database, if
+ # the volume has been deleted before we got here then this should fail.
+ #
+ # In the future we might want to have a lock on the volume_id so that
+ # the volume can not be deleted while its still being created?
+ volume_ref = self.db.volume_get(context, volume_id)
+ volume_name = volume_ref['name']
+ volume_size = utils.as_int(volume_ref['size'], quiet=False)
+
+ # Create a dictionary that will represent the volume to be so that
+ # later tasks can easily switch between the different types and create
+ # the volume according to the volume types specifications (which are
+ # represented in this dictionary).
+ specs = {
+ 'status': volume_ref['status'],
+ 'type': 'raw', # This will have the type of the volume to be
+ # created, which should be one of [raw, snap,
+ # source_vol, image]
+ 'volume_id': volume_ref['id'],
+ 'volume_name': volume_name,
+ 'volume_size': volume_size,
+ }
+
+ if kwargs.get('snapshot_id'):
+ # We are making a snapshot based volume instead of a raw volume.
+ specs.update({
+ 'type': 'snap',
+ 'snapshot_id': kwargs['snapshot_id'],
+ })
+ elif kwargs.get('source_volid'):
+ # We are making a source based volume instead of a raw volume.
+ #
+ # NOTE(harlowja): This will likely fail if the source volume
+ # disappeared by the time this call occurred.
+ source_volid = kwargs['source_volid']
+ source_volume_ref = self.db.volume_get(context, source_volid)
+ specs.update({
+ 'source_volid': source_volid,
+ # This is captured incase we have to revert and we want to set
+ # back the source volume status to its original status. This
+ # may or may not be sketchy to do??
+ 'source_volstatus': source_volume_ref['status'],
+ 'type': 'source_vol',
+ })
+ elif kwargs.get('image_id'):
+ # We are making a image based volume instead of a raw volume.
+ image_href = kwargs['image_id']
+ image_service, image_id = get_remote_image_service(context,
+ image_href)
+ specs.update({
+ 'type': 'image',
+ 'image_id': image_id,
+ 'image_location': image_service.get_location(context,
+ image_id),
+ 'image_meta': image_service.show(context, image_id),
+ # Instead of refetching the image service later just save it.
+ #
+ # NOTE(harlowja): if we have to later recover this tasks output
+ # on another 'node' that this object won't be able to be
+ # serialized, so we will have to recreate this object on
+ # demand in the future.
+ 'image_service': image_service,
+ })
+
+ return {
+ 'volume_spec': specs,
+ # NOTE(harlowja): it appears like further usage of this volume_ref
+ # result actually depend on it being a sqlalchemy object and not
+ # just a plain dictionary so thats why we are storing this here.
+ #
+ # It was attempted to refetch it when needed in subsequent tasks,
+ # but that caused sqlalchemy errors to occur (volume already open
+ # or similar).
+ #
+ # In the future where this task could fail and be recovered from we
+ # will need to store the volume_spec and recreate the volume_ref
+ # on demand.
+ 'volume_ref': volume_ref,
+ }
+
+
+class NotifyVolumeActionTask(CinderTask):
+ """Performs a notification about the given volume when called.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, db, host, event_suffix):
+ super(NotifyVolumeActionTask, self).__init__(addons=[event_suffix])
+ self.requires.update(['volume_ref'])
+ self.db = db
+ self.event_suffix = event_suffix
+ self.host = host
+
+ def __call__(self, context, volume_ref):
+ volume_id = volume_ref['id']
+ try:
+ volume_utils.notify_about_volume_usage(context, volume_ref,
+ self.event_suffix,
+ host=self.host)
+ except exception.CinderException:
+ # If notification sending of volume database entry reading fails
+ # then we shouldn't error out the whole workflow since this is
+ # not always information that must be sent for volumes to operate
+ LOG.exception(_("Failed notifying about the volume"
+ " action %(event)s for volume %(volume_id)s") %
+ {'event': self.event_suffix,
+ 'volume_id': volume_id})
+
+
+class CreateVolumeFromSpecTask(CinderTask):
+ """Creates a volume from a provided specification.
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, db, host, driver):
+ super(CreateVolumeFromSpecTask, self).__init__()
+ self.db = db
+ self.driver = driver
+ self.requires.update(['volume_spec', 'volume_ref'])
+ # This maps the different volume specification types into the methods
+ # that can create said volume type (aka this is a jump table).
+ self._create_func_mapping = {
+ 'raw': self._create_raw_volume,
+ 'snap': self._create_from_snapshot,
+ 'source_vol': self._create_from_source_volume,
+ 'image': self._create_from_image,
+ }
+ self.host = host
+
+ def _create_from_snapshot(self, context, volume_ref, snapshot_id,
+ **kwargs):
+ volume_id = volume_ref['id']
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ model_update = self.driver.create_volume_from_snapshot(volume_ref,
+ snapshot_ref)
+ # NOTE(harlowja): Subtasks would be useful here since after this
+ # point the volume has already been created and further failures
+ # will not destroy the volume (although they could in the future).
+ make_bootable = False
+ try:
+ originating_vref = self.db.volume_get(context,
+ snapshot_ref['volume_id'])
+ make_bootable = originating_vref.bootable
+ except exception.CinderException as ex:
+ LOG.exception(_("Failed fetching snapshot %(snapshot_id)s bootable"
+ " flag using the provided glance snapshot "
+ "%(snapshot_ref_id)s volume reference") %
+ {'snapshot_id': snapshot_id,
+ 'snapshot_ref_id': snapshot_ref['volume_id']})
+ raise exception.MetadataUpdateFailure(reason=ex)
+ if make_bootable:
+ self._enable_bootable_flag(context, volume_id)
+ try:
+ LOG.debug(_("Copying metadata from snapshot %(snap_volume_id)s"
+ " to %(volume_id)s") % {'snap_volume_id': snapshot_id,
+ 'volume_id': volume_id})
+ self.db.volume_glance_metadata_copy_to_volume(context, volume_id,
+ snapshot_id)
+ except exception.CinderException as ex:
+ LOG.exception(_("Failed updating volume %(volume_id)s metadata"
+ " using the provided glance snapshot "
+ "%(snapshot_id)s metadata") %
+ {'volume_id': volume_id, 'snapshot_id': snapshot_id})
+ raise exception.MetadataCopyFailure(reason=ex)
+ return model_update
+
+ def _enable_bootable_flag(self, context, volume_id):
+ try:
+ LOG.debug(_('Marking volume %s as bootable'), volume_id)
+ self.db.volume_update(context, volume_id, {'bootable': True})
+ except exception.CinderException as ex:
+ LOG.exception(_("Failed updating volume %(volume_id)s bootable"
+ " flag to true") % {'volume_id': volume_id})
+ raise exception.MetadataUpdateFailure(reason=ex)
+
+ def _create_from_source_volume(self, context, volume_ref,
+ source_volid, **kwargs):
+ # NOTE(harlowja): if the source volume has disappeared this will be our
+ # detection of that since this database call should fail.
+ #
+ # NOTE(harlowja): likely this is not the best place for this to happen
+ # and we should have proper locks on the source volume while actions
+ # that use the source volume are underway.
+ srcvol_ref = self.db.volume_get(context, source_volid)
+ model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref)
+ # NOTE(harlowja): Subtasks would be useful here since after this
+ # point the volume has already been created and further failures
+ # will not destroy the volume (although they could in the future).
+ if srcvol_ref.bootable:
+ self._enable_bootable_flag(context, volume_ref['id'])
+ try:
+ LOG.debug(_('Copying metadata from source volume %(source_volid)s'
+ ' to cloned volume %(clone_vol_id)s') % {
+ 'source_volid': source_volid,
+ 'clone_vol_id': volume_ref['id'],
+ })
+ self.db.volume_glance_metadata_copy_from_volume_to_volume(
+ context,
+ source_volid,
+ volume_ref['id'])
+ except exception.CinderException as ex:
+ LOG.exception(_("Failed updating cloned volume %(volume_id)s"
+ " metadata using the provided source volumes"
+ " %(source_volid)s metadata") %
+ {'volume_id': volume_ref['id'],
+ 'source_volid': source_volid})
+ raise exception.MetadataCopyFailure(reason=ex)
+ return model_update
+
+ def _copy_image_to_volume(self, context, volume_ref,
+ image_id, image_location, image_service):
+ """Downloads Glance image to the specified volume. """
+ copy_image_to_volume = self.driver.copy_image_to_volume
+ volume_id = volume_ref['id']
+ LOG.debug(_("Attempting download of %(image_id)s (%(image_location)s)"
+ " to volume %(volume_id)s") %
+ {'image_id': image_id, 'volume_id': volume_id,
+ 'image_location': image_location})
+ try:
+ copy_image_to_volume(context, volume_ref, image_service, image_id)
+ except exception.ProcessExecutionError as ex:
+ LOG.error(_("Failed to copy image %(image_id)s to volume: "
+ "%(volume_id)s, error: %(error)s") %
+ {'volume_id': volume_id,
+ 'error': ex.stderr, 'image_id': image_id})
+ raise exception.ImageCopyFailure(reason=ex.stderr)
+ except exception.CinderException as ex:
+ LOG.error(_("Failed to copy image %(image_id)s to "
+ "volume: %(volume_id)s, error: %(error)s") %
+ {'volume_id': volume_id, 'error': ex,
+ 'image_id': image_id})
+ raise exception.ImageCopyFailure(reason=ex)
+
+ LOG.debug(_("Downloaded image %(image_id)s (%(image_location)s)"
+ " to volume %(volume_id)s successfully") %
+ {'image_id': image_id, 'volume_id': volume_id,
+ 'image_location': image_location})
+
+ def _capture_volume_image_metadata(self, context, volume_id,
+ image_id, image_meta):
+ if not image_meta:
+ image_meta = {}
+
+ # Save some base attributes into the volume metadata
+ base_metadata = {
+ 'image_id': image_id,
+ }
+ name = image_meta.get('name', None)
+ if name:
+ base_metadata['image_name'] = name
+
+ # Save some more attributes into the volume metadata from the image
+ # metadata
+ for key in IMAGE_ATTRIBUTES:
+ if key not in image_meta:
+ continue
+ value = image_meta.get(key, None)
+ if value is not None:
+ base_metadata[key] = value
+
+ # Save all the image metadata properties into the volume metadata
+ property_metadata = {}
+ image_properties = image_meta.get('properties', {})
+ for (key, value) in image_properties.items():
+ if value is not None:
+ property_metadata[key] = value
+
+ # NOTE(harlowja): The best way for this to happen would be in bulk,
+ # but that doesn't seem to exist (yet), so we go through one by one
+ # which means we can have partial create/update failure.
+ volume_metadata = dict(property_metadata)
+ volume_metadata.update(base_metadata)
+ LOG.debug(_("Creating volume glance metadata for volume %(volume_id)s"
+ " backed by image %(image_id)s with: %(vol_metadata)s") %
+ {'volume_id': volume_id, 'image_id': image_id,
+ 'vol_metadata': volume_metadata})
+ for (key, value) in volume_metadata.items():
+ try:
+ self.db.volume_glance_metadata_create(context, volume_id,
+ key, value)
+ except exception.GlanceMetadataExists:
+ pass
+
+ def _create_from_image(self, context, volume_ref,
+ image_location, image_id, image_meta,
+ image_service, **kwargs):
+ LOG.debug(_("Cloning %(volume_id)s from image %(image_id)s "
+ " at location %(image_location)s") %
+ {'volume_id': volume_ref['id'],
+ 'image_location': image_location, 'image_id': image_id})
+ # Create the volume from an image.
+ #
+ # NOTE (singn): two params need to be returned
+ # dict containing provider_location for cloned volume
+ # and clone status.
+ model_update, cloned = self.driver.clone_image(volume_ref,
+ image_location)
+ make_bootable = False
+ if not cloned:
+ # TODO(harlowja): what needs to be rolled back in the clone if this
+ # volume create fails?? Likely this should be a subflow or broken
+ # out task in the future. That will bring up the question of how
+ # do we make said subflow/task which is only triggered in the
+ # clone image 'path' resumable and revertable in the correct
+ # manner.
+ #
+ # Create the volume and then download the image onto the volume.
+ model_update = self.driver.create_volume(volume_ref)
+ updates = dict(model_update or dict(), status='downloading')
+ try:
+ volume_ref = self.db.volume_update(context,
+ volume_ref['id'], updates)
+ except exception.CinderException:
+ LOG.exception(_("Failed updating volume %(volume_id)s with "
+ "%(updates)s") %
+ {'volume_id': volume_ref['id'],
+ 'updates': updates})
+ self._copy_image_to_volume(context, volume_ref,
+ image_id, image_location, image_service)
+ make_bootable = True
+ if make_bootable:
+ self._enable_bootable_flag(context, volume_ref['id'])
+ try:
+ self._capture_volume_image_metadata(context, volume_ref['id'],
+ image_id, image_meta)
+ except exception.CinderException as ex:
+ LOG.exception(_("Failed updating volume %(volume_id)s metadata"
+ " using the provided image metadata"
+ " %(image_meta)s from image %(image_id)s") %
+ {'volume_id': volume_ref['id'],
+ 'image_meta': image_meta, 'image_id': image_id})
+ raise exception.MetadataUpdateFailure(reason=ex)
+ return model_update
+
+ def _create_raw_volume(self, context, volume_ref, **kwargs):
+ return self.driver.create_volume(volume_ref)
+
+ def __call__(self, context, volume_ref, volume_spec):
+ create_type = volume_spec.pop('type', None)
+ create_functor = self._create_func_mapping.get(create_type)
+ if not create_functor:
+ raise exception.VolumeTypeNotFound(volume_type_id=create_type)
+
+ volume_spec = dict(volume_spec)
+ volume_id = volume_spec.pop('volume_id', None)
+ if not volume_id:
+ volume_id = volume_ref['id']
+ LOG.info(_("Volume %(volume_id)s: being created using %(functor)s "
+ "with specification: %(volume_spec)s") %
+ {'volume_spec': volume_spec, 'volume_id': volume_id,
+ 'functor': _make_pretty_name(create_functor)})
+
+ # NOTE(vish): so we don't have to get volume from db again before
+ # passing it to the driver.
+ volume_ref['host'] = self.host
+
+ # Call the given functor to make the volume.
+ model_update = create_functor(context, volume_ref=volume_ref,
+ **volume_spec)
+
+ # Persist any model information provided on creation.
+ try:
+ if model_update:
+ volume_ref = self.db.volume_update(context, volume_ref['id'],
+ model_update)
+ except exception.CinderException as ex:
+ # If somehow the update failed we want to ensure that the
+ # failure is logged (but not try rescheduling since the volume at
+ # this point has been created).
+ if model_update:
+ LOG.exception(_("Failed updating model of volume %(volume_id)s"
+ " with creation provided model %(model)s") %
+ {'volume_id': volume_id, 'model': model_update})
+ raise exception.ExportFailure(reason=ex)
+
+ # Persist any driver exported model information.
+ model_update = None
+ try:
+ LOG.debug(_("Volume %s: creating export"), volume_ref['id'])
+ model_update = self.driver.create_export(context, volume_ref)
+ if model_update:
+ self.db.volume_update(context, volume_ref['id'], model_update)
+ except exception.CinderException as ex:
+ # If somehow the read *or* create export failed we want to ensure
+ # that the failure is logged (but not try rescheduling since
+ # the volume at this point has been created).
+ #
+ # NOTE(harlowja): Notice that since the model_update is initially
+ # empty, the only way it will still be empty is if there is no
+ # model_update (which we don't care about) or there was an
+ # model_update and updating failed.
+ if model_update:
+ LOG.exception(_("Failed updating model of volume %(volume_id)s"
+ " with driver provided model %(model)s") %
+ {'volume_id': volume_id, 'model': model_update})
+ raise exception.ExportFailure(reason=ex)
+
+
+class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
+ """On successful volume creation this will perform final volume actions.
+
+ When a volume is created successfully it is expected that MQ notifications
+ and database updates will occur to 'signal' to others that the volume is
+ now ready for usage. This task does those notifications and updates in a
+ reliable manner (not re-raising exceptions if said actions can not be
+ triggered).
+
+ Reversion strategy: N/A
+ """
+
+ def __init__(self, db, host, event_suffix):
+ super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
+ self.requires.update(['volume_spec'])
+ self.status_translation = {
+ 'migration_target_creating': 'migration_target',
+ }
+
+ def __call__(self, context, volume_ref, volume_spec):
+ volume_id = volume_ref['id']
+ new_status = self.status_translation.get(volume_spec.get('status'),
+ 'available')
+ update = {
+ 'status': new_status,
+ 'launched_at': timeutils.utcnow(),
+ }
+ try:
+ # TODO(harlowja): is it acceptable to only log if this fails??
+ # or are there other side-effects that this will cause if the
+ # status isn't updated correctly (aka it will likely be stuck in
+ # 'building' if this fails)??
+ volume_ref = self.db.volume_update(context, volume_id, update)
+ # Now use the parent to notify.
+ super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
+ except exception.CinderException:
+ LOG.exception(_("Failed updating volume %(volume_id)s with "
+ "%(update)s") % {'volume_id': volume_id,
+ 'update': update})
+ # Even if the update fails, the volume is ready.
+ msg = _("Volume %(volume_name)s (%(volume_id)s): created successfully")
+ LOG.info(msg % {
+ 'volume_name': volume_spec['volume_name'],
+ 'volume_id': volume_id,
+ })
+
+
+def _attach_debug_listeners(flow):
+ """Sets up a nice set of debug listeners for the flow.
+
+ These listeners will log when tasks/flows are transitioning from state to
+ state so that said states can be seen in the debug log output which is very
+ useful for figuring out where problems are occuring.
+ """
+
+ def flow_log_change(state, details):
+ LOG.debug(_("%(flow)s has moved into state %(state)s from state"
+ " %(old_state)s") % {'state': state,
+ 'old_state': details.get('old_state'),
+ 'flow': details['flow']})
+
+ def task_log_change(state, details):
+ LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
+ " result: %(result)s") % {'state': state,
+ 'flow': details['flow'],
+ 'runner': details['runner'],
+ 'result': details.get('result')})
+
+ # Register * for all state changes (and not selective state changes to be
+ # called upon) since all the changes is more useful.
+ flow.notifier.register('*', flow_log_change)
+ flow.task_notifier.register('*', task_log_change)
+ return flow
+
+
+def get_api_flow(scheduler_rpcapi, volume_rpcapi, db,
+ image_service,
+ az_check_functor,
+ create_what):
+ """Constructs and returns the api entrypoint flow.
+
+ This flow will do the following:
+
+ 1. Inject keys & values for dependent tasks.
+ 2. Extracts and validates the input keys & values.
+ 3. Reserves the quota (reverts quota on any failures).
+ 4. Creates the database entry.
+ 5. Commits the quota.
+ 6. Casts to volume manager or scheduler for further processing.
+ """
+
+ flow_name = ACTION.replace(":", "_") + "_api"
+ api_flow = linear_flow.Flow(flow_name)
+
+ # This injects the initial starting flow values into the workflow so that
+ # the dependency order of the tasks provides/requires can be correctly
+ # determined.
+ api_flow.add(ValuesInjectTask(create_what))
+ api_flow.add(ExtractVolumeRequestTask(image_service,
+ az_check_functor))
+ api_flow.add(QuotaReserveTask())
+ v_uuid = api_flow.add(EntryCreateTask(db))
+ api_flow.add(QuotaCommitTask())
+
+ # If after commiting something fails, ensure we set the db to failure
+ # before reverting any prior tasks.
+ api_flow.add(OnFailureChangeStatusTask(db))
+
+ # This will cast it out to either the scheduler or volume manager via
+ # the rpc apis provided.
+ api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
+
+ # Note(harlowja): this will return the flow as well as the uuid of the
+ # task which will produce the 'volume' database reference (since said
+ # reference is returned to other callers in the api for further usage).
+ return (_attach_debug_listeners(api_flow), v_uuid)
+
+
+def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
+ volume_id=None, snapshot_id=None, image_id=None):
+
+ """Constructs and returns the scheduler entrypoint flow.
+
+ This flow will do the following:
+
+ 1. Inject keys & values for dependent tasks.
+ 2. Extracts a scheduler specification from the provided inputs.
+ 3. Attaches 2 activated only on *failure* tasks (one to update the db
+ status and one to notify on the MQ of the failure that occured).
+ 4. Uses provided driver to to then select and continue processing of
+ volume request.
+ """
+
+ flow_name = ACTION.replace(":", "_") + "_scheduler"
+ scheduler_flow = linear_flow.Flow(flow_name)
+
+ # This injects the initial starting flow values into the workflow so that
+ # the dependency order of the tasks provides/requires can be correctly
+ # determined.
+ scheduler_flow.add(ValuesInjectTask({
+ 'request_spec': request_spec,
+ 'filter_properties': filter_properties,
+ 'volume_id': volume_id,
+ 'snapshot_id': snapshot_id,
+ 'image_id': image_id,
+ }))
+
+ # This will extract and clean the spec from the starting values.
+ scheduler_flow.add(ExtractSchedulerSpecTask(db))
+
+ # The decorator application here ensures that the method gets the right
+ # requires attributes automatically by examining the underlying functions
+ # arguments.
+
+ @decorators.task
+ def schedule_create_volume(context, request_spec, filter_properties):
+
+ def _log_failure(cause):
+ LOG.error(_("Failed to schedule_create_volume: %(cause)s") %
+ {'cause': cause})
+
+ def _notify_failure(cause):
+ """When scheduling fails send out a event that it failed."""
+ topic = "scheduler.create_volume"
+ payload = {
+ 'request_spec': request_spec,
+ 'volume_properties': request_spec.get('volume_properties', {}),
+ 'volume_id': volume_id,
+ 'state': 'error',
+ 'method': 'create_volume',
+ 'reason': cause,
+ }
+ try:
+ publisher_id = notifier.publisher_id("scheduler")
+ notifier.notify(context, publisher_id, topic, notifier.ERROR,
+ payload)
+ except exception.CinderException:
+ LOG.exception(_("Failed notifying on %(topic)s "
+ "payload %(payload)s") % {'topic': topic,
+ 'payload': payload})
+
+ try:
+ driver.schedule_create_volume(context, request_spec,
+ filter_properties)
+ except exception.NoValidHost as e:
+ # Not host found happened, notify on the scheduler queue and log
+ # that this happened and set the volume to errored out and
+ # *do not* reraise the error (since whats the point).
+ _notify_failure(e)
+ _log_failure(e)
+ _error_out_volume(context, db, volume_id, reason=e)
+ except Exception as e:
+ # Some other error happened, notify on the scheduler queue and log
+ # that this happened and set the volume to errored out and
+ # *do* reraise the error.
+ with excutils.save_and_reraise_exception():
+ _notify_failure(e)
+ _log_failure(e)
+ _error_out_volume(context, db, volume_id, reason=e)
+
+ scheduler_flow.add(schedule_create_volume)
+
+ return _attach_debug_listeners(scheduler_flow)
+
+
+def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
+ request_spec=None, filter_properties=None,
+ allow_reschedule=True,
+ snapshot_id=None, image_id=None, source_volid=None,
+ reschedule_context=None):
+ """Constructs and returns the manager entrypoint flow.
+
+ This flow will do the following:
+
+ 1. Determines if rescheduling is enabled (ahead of time).
+ 2. Inject keys & values for dependent tasks.
+ 3. Selects 1 of 2 activated only on *failure* tasks (one to update the db
+ status & notify or one to update the db status & notify & *reschedule*).
+ 4. Extracts a volume specification from the provided inputs.
+ 5. Notifies that the volume has start to be created.
+ 6. Creates a volume from the extracted volume specification.
+ 7. Attaches a on-success *only* task that notifies that the volume creation
+ has ended and performs further database status updates.
+ """
+
+ flow_name = ACTION.replace(":", "_") + "_manager"
+ volume_flow = linear_flow.Flow(flow_name)
+
+ # Determine if we are allowed to reschedule since this affects how
+ # failures will be handled.
+ if not filter_properties:
+ filter_properties = {}
+ if not request_spec and allow_reschedule:
+ LOG.debug(_("No request spec, will not reschedule"))
+ allow_reschedule = False
+ if not filter_properties.get('retry', None) and allow_reschedule:
+ LOG.debug(_("No retry filter property or associated "
+ "retry info, will not reschedule"))
+ allow_reschedule = False
+
+ # This injects the initial starting flow values into the workflow so that
+ # the dependency order of the tasks provides/requires can be correctly
+ # determined.
+ volume_flow.add(ValuesInjectTask({
+ 'filter_properties': filter_properties,
+ 'image_id': image_id,
+ 'request_spec': request_spec,
+ 'snapshot_id': snapshot_id,
+ 'source_volid': source_volid,
+ 'volume_id': volume_id,
+ }))
+
+ # We can actually just check if we should reschedule on failure ahead of
+ # time instead of trying to determine this later, certain values are needed
+ # to reschedule and without them we should just avoid rescheduling.
+ if not allow_reschedule:
+ # On failure ensure that we just set the volume status to error.
+ LOG.debug(_("Retry info not present, will not reschedule"))
+ volume_flow.add(OnFailureChangeStatusTask(db))
+ else:
+ volume_flow.add(OnFailureRescheduleTask(reschedule_context,
+ db, scheduler_rpcapi))
+
+ volume_flow.add(ExtractVolumeSpecTask(db))
+ volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
+ volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
+ volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
+
+ return _attach_debug_listeners(volume_flow)
from cinder import quota
from cinder import utils
from cinder.volume.configuration import Configuration
+from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
+from cinder.taskflow import states
+
LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
# collect and publish service capabilities
self.publish_service_capabilities(ctxt)
- def _create_volume(self, context, volume_ref, snapshot_ref,
- srcvol_ref, image_service, image_id, image_location):
- cloned = None
- model_update = False
-
- if all(x is None for x in(snapshot_ref, image_id, srcvol_ref)):
- model_update = self.driver.create_volume(volume_ref)
- elif snapshot_ref is not None:
- model_update = self.driver.create_volume_from_snapshot(
- volume_ref,
- snapshot_ref)
-
- originating_vref = self.db.volume_get(context,
- snapshot_ref['volume_id'])
- if originating_vref.bootable:
- self.db.volume_update(context,
- volume_ref['id'],
- {'bootable': True})
- elif srcvol_ref is not None:
- model_update = self.driver.create_cloned_volume(volume_ref,
- srcvol_ref)
- if srcvol_ref.bootable:
- self.db.volume_update(context,
- volume_ref['id'],
- {'bootable': True})
- else:
- # create the volume from an image
- # NOTE (singn): two params need to be returned
- # dict containing provider_location for cloned volume
- # and clone status
- model_update, cloned = self.driver.clone_image(
- volume_ref, image_location)
- if not cloned:
- model_update = self.driver.create_volume(volume_ref)
-
- updates = dict(model_update or dict(), status='downloading')
- volume_ref = self.db.volume_update(context,
- volume_ref['id'],
- updates)
-
- # TODO(jdg): Wrap this in a try block and update status
- # appropriately if the download image fails
- self._copy_image_to_volume(context,
- volume_ref,
- image_service,
- image_id)
- self.db.volume_update(context,
- volume_ref['id'],
- {'bootable': True})
- return model_update, cloned
-
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
"""Creates and exports the volume."""
- context_saved = context.deepcopy()
- context = context.elevated()
- if filter_properties is None:
- filter_properties = {}
- volume_ref = self.db.volume_get(context, volume_id)
- self._notify_about_volume_usage(context, volume_ref, "create.start")
-
- # NOTE(vish): so we don't have to get volume from db again
- # before passing it to the driver.
- volume_ref['host'] = self.host
- if volume_ref['status'] == 'migration_target_creating':
- status = 'migration_target'
- else:
- status = 'available'
- model_update = False
- image_meta = None
- cloned = False
-
- try:
- LOG.debug(_("volume %(vol_name)s: creating lv of"
- " size %(vol_size)sG"),
- {'vol_name': volume_ref['name'],
- 'vol_size': volume_ref['size']})
- snapshot_ref = None
- sourcevol_ref = None
- image_service = None
- image_location = None
- image_meta = None
-
- if snapshot_id is not None:
- LOG.info(_("volume %s: creating from snapshot"),
- volume_ref['name'])
- snapshot_ref = self.db.snapshot_get(context, snapshot_id)
- elif source_volid is not None:
- LOG.info(_("volume %s: creating from existing volume"),
- volume_ref['name'])
- sourcevol_ref = self.db.volume_get(context, source_volid)
- elif image_id is not None:
- LOG.info(_("volume %s: creating from image"),
- volume_ref['name'])
- # create the volume from an image
- image_service, image_id = \
- glance.get_remote_image_service(context,
- image_id)
- image_location = image_service.get_location(context, image_id)
- image_meta = image_service.show(context, image_id)
- else:
- LOG.info(_("volume %s: creating"), volume_ref['name'])
-
- try:
- model_update, cloned = self._create_volume(context,
- volume_ref,
- snapshot_ref,
- sourcevol_ref,
- image_service,
- image_id,
- image_location)
- except exception.ImageCopyFailure as ex:
- LOG.error(_('Setting volume: %s status to error '
- 'after failed image copy.'), volume_ref['id'])
- self.db.volume_update(context,
- volume_ref['id'],
- {'status': 'error'})
- return
- except Exception:
- exc_info = sys.exc_info()
- # restore source volume status before reschedule
- # FIXME(zhiteng) do all the clean-up before reschedule
- if sourcevol_ref is not None:
- self.db.volume_update(context, sourcevol_ref['id'],
- {'status': sourcevol_ref['status']})
- rescheduled = False
- # try to re-schedule volume:
- if allow_reschedule:
- rescheduled = self._reschedule_or_error(context_saved,
- volume_id,
- exc_info,
- snapshot_id,
- image_id,
- request_spec,
- filter_properties)
-
- if rescheduled:
- LOG.error(_('Unexpected Error: '), exc_info=exc_info)
- msg = (_('Creating %(volume_id)s %(snapshot_id)s '
- '%(image_id)s was rescheduled due to '
- '%(reason)s')
- % {'volume_id': volume_id,
- 'snapshot_id': snapshot_id,
- 'image_id': image_id,
- 'reason': unicode(exc_info[1])})
- raise exception.CinderException(msg)
- else:
- # not re-scheduling
- raise exc_info[0], exc_info[1], exc_info[2]
+ flow = create_volume.get_manager_flow(
+ self.db,
+ self.driver,
+ self.scheduler_rpcapi,
+ self.host,
+ volume_id,
+ request_spec=request_spec,
+ filter_properties=filter_properties,
+ allow_reschedule=allow_reschedule,
+ snapshot_id=snapshot_id,
+ image_id=image_id,
+ source_volid=source_volid,
+ reschedule_context=context.deepcopy())
+
+ assert flow, _('Manager volume flow not retrieved')
+
+ flow.run(context.elevated())
+ if flow.state != states.SUCCESS:
+ raise exception.CinderException(_("Failed to successfully complete"
+ " manager volume workflow"))
- if model_update:
- volume_ref = self.db.volume_update(
- context, volume_ref['id'], model_update)
- if sourcevol_ref is not None:
- self.db.volume_glance_metadata_copy_from_volume_to_volume(
- context,
- source_volid,
- volume_id)
-
- LOG.debug(_("volume %s: creating export"), volume_ref['name'])
- model_update = self.driver.create_export(context, volume_ref)
- if model_update:
- self.db.volume_update(context, volume_ref['id'], model_update)
- except Exception:
- with excutils.save_and_reraise_exception():
- volume_ref['status'] = 'error'
- self.db.volume_update(context,
- volume_ref['id'],
- {'status': volume_ref['status']})
- LOG.error(_("volume %s: create failed"), volume_ref['name'])
- self._notify_about_volume_usage(context, volume_ref,
- "create.end")
-
- if snapshot_id:
- # Copy any Glance metadata from the original volume
- self.db.volume_glance_metadata_copy_to_volume(context,
- volume_ref['id'],
- snapshot_id)
-
- if image_id and image_meta:
- # Copy all of the Glance image properties to the
- # volume_glance_metadata table for future reference.
- self.db.volume_glance_metadata_create(context,
- volume_ref['id'],
- 'image_id', image_id)
- name = image_meta.get('name', None)
- if name:
- self.db.volume_glance_metadata_create(context,
- volume_ref['id'],
- 'image_name', name)
- # Save some more attributes into the volume metadata
- IMAGE_ATTRIBUTES = ['size', 'disk_format',
- 'container_format', 'checksum',
- 'min_disk', 'min_ram']
- for key in IMAGE_ATTRIBUTES:
- value = image_meta.get(key, None)
- if value is not None:
- self.db.volume_glance_metadata_create(context,
- volume_ref['id'],
- key, value)
- image_properties = image_meta.get('properties', {})
- for key, value in image_properties.items():
- self.db.volume_glance_metadata_create(context,
- volume_ref['id'],
- key, value)
-
- now = timeutils.utcnow()
- volume_ref['status'] = status
- self.db.volume_update(context,
- volume_ref['id'],
- {'status': volume_ref['status'],
- 'launched_at': now})
- LOG.info(_("volume %s: created successfully"), volume_ref['name'])
self._reset_stats()
-
- self._notify_about_volume_usage(context, volume_ref, "create.end")
- return volume_ref['id']
-
- def _reschedule_or_error(self, context, volume_id, exc_info,
- snapshot_id, image_id, request_spec,
- filter_properties):
- """Try to re-schedule the request."""
- rescheduled = False
- try:
- method_args = (CONF.volume_topic, volume_id, snapshot_id,
- image_id, request_spec, filter_properties)
-
- rescheduled = self._reschedule(context, request_spec,
- filter_properties, volume_id,
- self.scheduler_rpcapi.create_volume,
- method_args,
- exc_info)
- except Exception:
- rescheduled = False
- LOG.exception(_("volume %s: Error trying to reschedule create"),
- volume_id)
-
- return rescheduled
-
- def _reschedule(self, context, request_spec, filter_properties,
- volume_id, scheduler_method, method_args,
- exc_info=None):
- """Attempt to re-schedule a volume operation."""
-
- retry = filter_properties.get('retry', None)
- if not retry:
- # no retry information, do not reschedule.
- LOG.debug(_("Retry info not present, will not reschedule"))
- return
-
- if not request_spec:
- LOG.debug(_("No request spec, will not reschedule"))
- return
-
- request_spec['volume_id'] = volume_id
-
- LOG.debug(_("volume %(volume_id)s: re-scheduling %(method)s "
- "attempt %(num)d") %
- {'volume_id': volume_id,
- 'method': scheduler_method.func_name,
- 'num': retry['num_attempts']})
-
- # reset the volume state:
- now = timeutils.utcnow()
- self.db.volume_update(context, volume_id,
- {'status': 'creating',
- 'scheduled_at': now})
-
- if exc_info:
- # stringify to avoid circular ref problem in json serialization:
- retry['exc'] = traceback.format_exception(*exc_info)
-
- scheduler_method(context, *method_args)
- return True
+ return volume_id
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
volume['name'] not in volume['provider_location']):
self.driver.ensure_export(context, volume)
- def _copy_image_to_volume(self, context, volume, image_service, image_id):
- """Downloads Glance image to the specified volume."""
- volume_id = volume['id']
- try:
- self.driver.copy_image_to_volume(context, volume,
- image_service,
- image_id)
- except exception.ProcessExecutionError as ex:
- LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
- "error: %(error)s") % {'volume_id': volume_id,
- 'error': ex.stderr})
- raise exception.ImageCopyFailure(reason=ex.stderr)
- except Exception as ex:
- LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
- "error: %(error)s") % {'volume_id': volume_id,
- 'error': ex})
- raise exception.ImageCopyFailure(reason=ex)
-
- LOG.info(_("Downloaded image %(image_id)s to %(volume_id)s "
- "successfully.") % {'image_id': image_id,
- 'volume_id': volume_id})
-
def copy_volume_to_image(self, context, volume_id, image_meta):
"""Uploads the specified volume to Glance.
--- /dev/null
+[DEFAULT]
+
+# The list of primitives to copy from taskflow
+primitives=flow.linear_flow,task
+
+# The base module to hold the copy of taskflow
+base=cinder