self.availability_zones = availability_zones
@staticmethod
- def _extract_consistencygroup(consistencygroup):
- """Extracts the consistencygroup id from the provided consistencygroup.
-
- This function validates the input consistencygroup dict and checks that
- the status of that consistencygroup is valid for creating a volume in.
- """
-
- consistencygroup_id = None
- if consistencygroup is not None:
- if consistencygroup['status'] not in CG_PROCEED_STATUS:
- msg = _("Originating consistencygroup status must be one"
- " of '%s' values")
- msg = msg % (", ".join(CG_PROCEED_STATUS))
- raise exception.InvalidConsistencyGroup(reason=msg)
- consistencygroup_id = consistencygroup['id']
- return consistencygroup_id
-
- @staticmethod
- def _extract_cgsnapshot(cgsnapshot):
- """Extracts the cgsnapshot id from the provided cgsnapshot.
-
- This function validates the input cgsnapshot dict and checks that
- the status of that cgsnapshot is valid for creating a cg from.
- """
-
- cgsnapshot_id = None
- if cgsnapshot:
- if cgsnapshot['status'] not in CGSNAPSHOT_PROCEED_STATUS:
- msg = _("Originating CGSNAPSHOT status must be one"
- " of '%s' values")
- msg = msg % (", ".join(CGSNAPSHOT_PROCEED_STATUS))
- raise exception.InvalidCgSnapshot(reason=msg)
- cgsnapshot_id = cgsnapshot['id']
- return cgsnapshot_id
-
- @staticmethod
- def _extract_snapshot(snapshot):
- """Extracts the snapshot id from the provided snapshot (if provided).
-
- This function validates the input snapshot dict and checks that the
- status of that snapshot is valid for creating a volume from.
- """
-
- snapshot_id = None
- if snapshot is not None:
- if snapshot['status'] not in SNAPSHOT_PROCEED_STATUS:
- msg = _("Originating snapshot status must be one"
- " of %s values")
- msg = msg % (", ".join(SNAPSHOT_PROCEED_STATUS))
- # TODO(harlowja): what happens if the status changes after this
- # initial snapshot status check occurs??? Seems like someone
- # could delete the snapshot after this check passes but before
- # the volume is officially created?
- raise exception.InvalidSnapshot(reason=msg)
- snapshot_id = snapshot['id']
- return snapshot_id
-
- @staticmethod
- def _extract_source_volume(source_volume):
- """Extracts the volume id from the provided volume (if provided).
-
- This function validates the input source_volume dict and checks that
- the status of that source_volume is valid for creating a volume from.
- """
-
- source_volid = None
- if source_volume is not None:
- if source_volume['status'] not in SRC_VOL_PROCEED_STATUS:
- msg = _("Unable to create a volume from an originating source"
- " volume when its status is not one of %s"
- " values")
- msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
- # TODO(harlowja): what happens if the status changes after this
- # initial volume status check occurs??? Seems like someone
- # could delete the volume after this check passes but before
- # the volume is officially created?
- raise exception.InvalidVolume(reason=msg)
- source_volid = source_volume['id']
- return source_volid
-
- @staticmethod
- def _extract_source_replica(source_replica):
- """Extracts the volume id from the provided replica (if provided).
-
- This function validates the input replica_volume dict and checks that
- the status of that replica_volume is valid for creating a volume from.
+ def _extract_resource(resource, allowed_vals, exc, resource_name,
+ props=('status',)):
+ """Extracts the resource id from the provided resource.
+
+ This method validates the input resource dict and checks that the
+ properties which names are passed in `props` argument match
+ corresponding lists in `allowed` argument. In case of mismatch
+ exception of type exc is raised.
+
+ :param resource: Resource dict.
+ :param allowed_vals: Tuple of allowed values lists.
+ :param exc: Exception type to raise.
+ :param resource_name: Name of resource - used to construct log message.
+ :param props: Tuple of resource properties names to validate.
+ :return: Id of a resource.
"""
- source_replicaid = None
- if source_replica is not None:
- if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
- msg = _("Unable to create a volume from an originating source"
- " volume when its status is not one of %s"
- " values")
- msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
- # TODO(harlowja): what happens if the status changes after this
- # initial volume status check occurs??? Seems like someone
- # could delete the volume after this check passes but before
- # the volume is officially created?
- raise exception.InvalidVolume(reason=msg)
- replication_status = source_replica['replication_status']
- if replication_status not in REPLICA_PROCEED_STATUS:
- msg = _("Unable to create a volume from a replica"
- " when replication status is not one of %s"
- " values")
- msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
- # TODO(ronenkat): what happens if the replication status
- # changes after this initial volume status check occurs???
- raise exception.InvalidVolume(reason=msg)
- source_replicaid = source_replica['id']
- return source_replicaid
+ resource_id = None
+ if resource:
+ for prop, allowed_states in zip(props, allowed_vals):
+ if resource[prop] not in allowed_states:
+ msg = _("Originating %(res)s %(prop)s must be one of"
+ "'%(vals)s' values")
+ msg = msg % {'res': resource_name,
+ 'prop': prop,
+ 'vals': ', '.join(allowed_states)}
+ # TODO(harlowja): what happens if the status changes after
+ # this initial resource status check occurs??? Seems like
+ # someone could delete the resource after this check passes
+ # but before the volume is officially created?
+ raise exc(reason=msg)
+ resource_id = resource['id']
+ return resource_id
+
+ def _extract_consistencygroup(self, consistencygroup):
+ return self._extract_resource(consistencygroup, (CG_PROCEED_STATUS,),
+ exception.InvalidConsistencyGroup,
+ 'consistencygroup')
+
+ def _extract_cgsnapshot(self, cgsnapshot):
+ return self._extract_resource(cgsnapshot, (CGSNAPSHOT_PROCEED_STATUS,),
+ exception.InvalidCgSnapshot,
+ 'CGSNAPSHOT')
+
+ def _extract_snapshot(self, snapshot):
+ return self._extract_resource(snapshot, (SNAPSHOT_PROCEED_STATUS,),
+ exception.InvalidSnapshot, 'snapshot')
+
+ def _extract_source_volume(self, source_volume):
+ return self._extract_resource(source_volume, (SRC_VOL_PROCEED_STATUS,),
+ exception.InvalidVolume, 'source volume')
+
+ def _extract_source_replica(self, source_replica):
+ return self._extract_resource(source_replica, (SRC_VOL_PROCEED_STATUS,
+ REPLICA_PROCEED_STATUS),
+ exception.InvalidVolume,
+ 'replica', ('status',
+ 'replication_status'))
@staticmethod
def _extract_size(size, source_volume, snapshot):
else:
# For backwards compatibility use the storage_availability_zone
availability_zone = CONF.storage_availability_zone
+
if availability_zone not in self.availability_zones:
msg = _("Availability zone '%s' is invalid") % (availability_zone)
LOG.warning(msg)
return encryption_key_id
def _get_volume_type_id(self, volume_type, source_volume, snapshot):
- volume_type_id = None
if not volume_type and source_volume:
- volume_type_id = source_volume['volume_type_id']
+ return source_volume['volume_type_id']
elif snapshot is not None:
if volume_type:
current_volume_type_id = volume_type.get('id')
- if (current_volume_type_id !=
- snapshot['volume_type_id']):
+ if current_volume_type_id != snapshot['volume_type_id']:
msg = _LW("Volume type will be changed to "
"be the same as the source volume.")
LOG.warning(msg)
- volume_type_id = snapshot['volume_type_id']
+ return snapshot['volume_type_id']
else:
- volume_type_id = volume_type.get('id')
-
- return volume_type_id
+ return volume_type.get('id')
def execute(self, context, size, snapshot, image_id, source_volume,
- availability_zone, volume_type, metadata,
- key_manager, source_replica,
- consistencygroup, cgsnapshot):
+ availability_zone, volume_type, metadata, key_manager,
+ source_replica, consistencygroup, cgsnapshot):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
# their volume type matches the source volume is too convoluted. We
# should copy encryption metadata from the encrypted volume type to the
# volume upon creation and propagate that information to each snapshot.
- # This strategy avoid any dependency upon the encrypted volume type.
+ # This strategy avoids any dependency upon the encrypted volume type.
def_vol_type = volume_types.get_default_volume_type()
if not volume_type and not source_volume and not snapshot:
volume_type = def_vol_type
}
def revert(self, context, result, optional_args, **kwargs):
- # We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
+ # We never produced a result and therefore can't destroy anything.
return
if optional_args['is_quota_committed']:
- # Committed quota doesn't rollback as the volume has already been
- # created at this point, and the quota has already been absorbed.
+ # If quota got commited we shouldn't rollback as the volume has
+ # already been created and the quota has already been absorbed.
return
+
vol_id = result['volume_id']
try:
self.db.volume_destroy(context.elevated(), vol_id)
usages = e.kwargs['usages']
def _consumed(name):
- return (usages[name]['reserved'] + usages[name]['in_use'])
+ return usages[name]['reserved'] + usages[name]['in_use']
def _is_over(name):
for over in overs:
# We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
return
+
volume = result['volume_properties']
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
class VolumeCastTask(flow_utils.CinderTask):
"""Performs a volume create cast to the scheduler or to the volume manager.
- This which will signal a transition of the api workflow to another child
- and/or related workflow on another component.
+ This will signal a transition of the api workflow to another child and/or
+ related workflow on another component.
- Reversion strategy: N/A
+ Reversion strategy: rollback source volume status and error out newly
+ created volume.
"""
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
volume_id = request_spec['volume_id']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
- group_id = request_spec['consistencygroup_id']
+ cgroup_id = request_spec['consistencygroup_id']
host = None
cgsnapshot_id = request_spec['cgsnapshot_id']
- if group_id:
- group = self.db.consistencygroup_get(context, group_id)
- if group:
- host = group.get('host', None)
+ if cgroup_id:
+ cgroup = self.db.consistencygroup_get(context, cgroup_id)
+ if cgroup:
+ host = cgroup.get('host', None)
elif snapshot_id and CONF.snapshot_same_host:
# NOTE(Rongze Zhu): A simple solution for bug 1008866.
#
- # If snapshot_id is set, make the call create volume directly to
- # the volume host where the snapshot resides instead of passing it
- # through the scheduler. So snapshot can be copy to new volume.
+ # If snapshot_id is set and CONF.snapshot_same_host is True, make
+ # the call create volume directly to the volume host where the
+ # snapshot resides instead of passing it through the scheduler, so
+ # snapshot can be copied to the new volume.
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
- source_volume_ref = self.db.volume_get(context,
- snapshot.volume_id)
+ source_volume_ref = self.db.volume_get(context, snapshot.volume_id)
host = source_volume_ref['host']
elif source_volid:
source_volume_ref = self.db.volume_get(context, source_volid)
image_id=image_id,
source_volid=source_volid,
source_replicaid=source_replicaid,
- consistencygroup_id=group_id)
+ consistencygroup_id=cgroup_id)
def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)
LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
-def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
- image_service_api, availability_zones,
- create_what):
+def get_flow(db_api, image_service_api, availability_zones, create_what,
+ scheduler_rpcapi=None, volume_rpcapi=None):
"""Constructs and returns the api entrypoint flow.
This flow will do the following:
EntryCreateTask(db_api),
QuotaCommitTask())
- # This will cast it out to either the scheduler or volume manager via
- # the rpc apis provided.
- api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
-
- # Now load (but do not run) the flow using the provided initial data.
- return taskflow.engines.load(api_flow, store=create_what)
-
-
-def get_flow_no_rpc(db_api, image_service_api, availability_zones,
- create_what):
- """Constructs and returns the api entrypoint flow.
-
- This flow will do the following:
-
- 1. Inject keys & values for dependent tasks.
- 2. Extracts and validates the input keys & values.
- 3. Reserves the quota (reverts quota on any failures).
- 4. Creates the database entry.
- 5. Commits the quota.
- """
-
- flow_name = ACTION.replace(":", "_") + "_api"
- api_flow = linear_flow.Flow(flow_name)
-
- api_flow.add(ExtractVolumeRequestTask(
- image_service_api,
- availability_zones,
- rebind={'size': 'raw_size',
- 'availability_zone': 'raw_availability_zone',
- 'volume_type': 'raw_volume_type'}))
- api_flow.add(QuotaReserveTask(),
- EntryCreateTask(db_api),
- QuotaCommitTask())
+ if scheduler_rpcapi and volume_rpcapi:
+ # This will cast it out to either the scheduler or volume manager via
+ # the rpc apis provided.
+ api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)