From: Michal Dulko Date: Tue, 16 Jun 2015 06:21:21 +0000 (+0200) Subject: Refactor API create_volume flow X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=def25810d4f5fb5ca37e66e34cddfb37f560712d;p=openstack-build%2Fcinder-build.git Refactor API create_volume flow This commit refactors the code to be more compliant with DRY rule. I've also changed some comments to be more readable and correct. Partial-Implements: blueprint taskflow-refactoring Change-Id: I0345c07ac490b6bd738c0fdac0b621bb9156f899 --- diff --git a/cinder/volume/api.py b/cinder/volume/api.py index e62144f8c..4afd9bbef 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -277,18 +277,14 @@ class API(base.Base): 'multiattach': multiattach, } try: - if cgsnapshot: - flow_engine = create_volume.get_flow_no_rpc(self.db, - self.image_service, - availability_zones, - create_what) - else: - flow_engine = create_volume.get_flow(self.scheduler_rpcapi, - self.volume_rpcapi, - self.db, - self.image_service, - availability_zones, - create_what) + sched_rpcapi = self.scheduler_rpcapi if not cgsnapshot else None + volume_rpcapi = self.volume_rpcapi if not cgsnapshot else None + flow_engine = create_volume.get_flow(self.db, + self.image_service, + availability_zones, + create_what, + sched_rpcapi, + volume_rpcapi) except Exception: msg = _('Failed to create api volume flow.') LOG.exception(msg) diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index 1d2746238..aa0305f6f 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -72,117 +72,64 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): self.availability_zones = availability_zones @staticmethod - def _extract_consistencygroup(consistencygroup): - """Extracts the consistencygroup id from the provided consistencygroup. - - This function validates the input consistencygroup dict and checks that - the status of that consistencygroup is valid for creating a volume in. - """ - - consistencygroup_id = None - if consistencygroup is not None: - if consistencygroup['status'] not in CG_PROCEED_STATUS: - msg = _("Originating consistencygroup status must be one" - " of '%s' values") - msg = msg % (", ".join(CG_PROCEED_STATUS)) - raise exception.InvalidConsistencyGroup(reason=msg) - consistencygroup_id = consistencygroup['id'] - return consistencygroup_id - - @staticmethod - def _extract_cgsnapshot(cgsnapshot): - """Extracts the cgsnapshot id from the provided cgsnapshot. - - This function validates the input cgsnapshot dict and checks that - the status of that cgsnapshot is valid for creating a cg from. - """ - - cgsnapshot_id = None - if cgsnapshot: - if cgsnapshot['status'] not in CGSNAPSHOT_PROCEED_STATUS: - msg = _("Originating CGSNAPSHOT status must be one" - " of '%s' values") - msg = msg % (", ".join(CGSNAPSHOT_PROCEED_STATUS)) - raise exception.InvalidCgSnapshot(reason=msg) - cgsnapshot_id = cgsnapshot['id'] - return cgsnapshot_id - - @staticmethod - def _extract_snapshot(snapshot): - """Extracts the snapshot id from the provided snapshot (if provided). - - This function validates the input snapshot dict and checks that the - status of that snapshot is valid for creating a volume from. - """ - - snapshot_id = None - if snapshot is not None: - if snapshot['status'] not in SNAPSHOT_PROCEED_STATUS: - msg = _("Originating snapshot status must be one" - " of %s values") - msg = msg % (", ".join(SNAPSHOT_PROCEED_STATUS)) - # TODO(harlowja): what happens if the status changes after this - # initial snapshot status check occurs??? Seems like someone - # could delete the snapshot after this check passes but before - # the volume is officially created? - raise exception.InvalidSnapshot(reason=msg) - snapshot_id = snapshot['id'] - return snapshot_id - - @staticmethod - def _extract_source_volume(source_volume): - """Extracts the volume id from the provided volume (if provided). - - This function validates the input source_volume dict and checks that - the status of that source_volume is valid for creating a volume from. - """ - - source_volid = None - if source_volume is not None: - if source_volume['status'] not in SRC_VOL_PROCEED_STATUS: - msg = _("Unable to create a volume from an originating source" - " volume when its status is not one of %s" - " values") - msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS)) - # TODO(harlowja): what happens if the status changes after this - # initial volume status check occurs??? Seems like someone - # could delete the volume after this check passes but before - # the volume is officially created? - raise exception.InvalidVolume(reason=msg) - source_volid = source_volume['id'] - return source_volid - - @staticmethod - def _extract_source_replica(source_replica): - """Extracts the volume id from the provided replica (if provided). - - This function validates the input replica_volume dict and checks that - the status of that replica_volume is valid for creating a volume from. + def _extract_resource(resource, allowed_vals, exc, resource_name, + props=('status',)): + """Extracts the resource id from the provided resource. + + This method validates the input resource dict and checks that the + properties which names are passed in `props` argument match + corresponding lists in `allowed` argument. In case of mismatch + exception of type exc is raised. + + :param resource: Resource dict. + :param allowed_vals: Tuple of allowed values lists. + :param exc: Exception type to raise. + :param resource_name: Name of resource - used to construct log message. + :param props: Tuple of resource properties names to validate. + :return: Id of a resource. """ - source_replicaid = None - if source_replica is not None: - if source_replica['status'] not in SRC_VOL_PROCEED_STATUS: - msg = _("Unable to create a volume from an originating source" - " volume when its status is not one of %s" - " values") - msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS)) - # TODO(harlowja): what happens if the status changes after this - # initial volume status check occurs??? Seems like someone - # could delete the volume after this check passes but before - # the volume is officially created? - raise exception.InvalidVolume(reason=msg) - replication_status = source_replica['replication_status'] - if replication_status not in REPLICA_PROCEED_STATUS: - msg = _("Unable to create a volume from a replica" - " when replication status is not one of %s" - " values") - msg = msg % (", ".join(REPLICA_PROCEED_STATUS)) - # TODO(ronenkat): what happens if the replication status - # changes after this initial volume status check occurs??? - raise exception.InvalidVolume(reason=msg) - source_replicaid = source_replica['id'] - return source_replicaid + resource_id = None + if resource: + for prop, allowed_states in zip(props, allowed_vals): + if resource[prop] not in allowed_states: + msg = _("Originating %(res)s %(prop)s must be one of" + "'%(vals)s' values") + msg = msg % {'res': resource_name, + 'prop': prop, + 'vals': ', '.join(allowed_states)} + # TODO(harlowja): what happens if the status changes after + # this initial resource status check occurs??? Seems like + # someone could delete the resource after this check passes + # but before the volume is officially created? + raise exc(reason=msg) + resource_id = resource['id'] + return resource_id + + def _extract_consistencygroup(self, consistencygroup): + return self._extract_resource(consistencygroup, (CG_PROCEED_STATUS,), + exception.InvalidConsistencyGroup, + 'consistencygroup') + + def _extract_cgsnapshot(self, cgsnapshot): + return self._extract_resource(cgsnapshot, (CGSNAPSHOT_PROCEED_STATUS,), + exception.InvalidCgSnapshot, + 'CGSNAPSHOT') + + def _extract_snapshot(self, snapshot): + return self._extract_resource(snapshot, (SNAPSHOT_PROCEED_STATUS,), + exception.InvalidSnapshot, 'snapshot') + + def _extract_source_volume(self, source_volume): + return self._extract_resource(source_volume, (SRC_VOL_PROCEED_STATUS,), + exception.InvalidVolume, 'source volume') + + def _extract_source_replica(self, source_replica): + return self._extract_resource(source_replica, (SRC_VOL_PROCEED_STATUS, + REPLICA_PROCEED_STATUS), + exception.InvalidVolume, + 'replica', ('status', + 'replication_status')) @staticmethod def _extract_size(size, source_volume, snapshot): @@ -328,6 +275,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): else: # For backwards compatibility use the storage_availability_zone availability_zone = CONF.storage_availability_zone + if availability_zone not in self.availability_zones: msg = _("Availability zone '%s' is invalid") % (availability_zone) LOG.warning(msg) @@ -379,27 +327,22 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): return encryption_key_id def _get_volume_type_id(self, volume_type, source_volume, snapshot): - volume_type_id = None if not volume_type and source_volume: - volume_type_id = source_volume['volume_type_id'] + return source_volume['volume_type_id'] elif snapshot is not None: if volume_type: current_volume_type_id = volume_type.get('id') - if (current_volume_type_id != - snapshot['volume_type_id']): + if current_volume_type_id != snapshot['volume_type_id']: msg = _LW("Volume type will be changed to " "be the same as the source volume.") LOG.warning(msg) - volume_type_id = snapshot['volume_type_id'] + return snapshot['volume_type_id'] else: - volume_type_id = volume_type.get('id') - - return volume_type_id + return volume_type.get('id') def execute(self, context, size, snapshot, image_id, source_volume, - availability_zone, volume_type, metadata, - key_manager, source_replica, - consistencygroup, cgsnapshot): + availability_zone, volume_type, metadata, key_manager, + source_replica, consistencygroup, cgsnapshot): utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id, @@ -425,7 +368,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): # their volume type matches the source volume is too convoluted. We # should copy encryption metadata from the encrypted volume type to the # volume upon creation and propagate that information to each snapshot. - # This strategy avoid any dependency upon the encrypted volume type. + # This strategy avoids any dependency upon the encrypted volume type. def_vol_type = volume_types.get_default_volume_type() if not volume_type and not source_volume and not snapshot: volume_type = def_vol_type @@ -532,14 +475,15 @@ class EntryCreateTask(flow_utils.CinderTask): } def revert(self, context, result, optional_args, **kwargs): - # We never produced a result and therefore can't destroy anything. if isinstance(result, ft.Failure): + # We never produced a result and therefore can't destroy anything. return if optional_args['is_quota_committed']: - # Committed quota doesn't rollback as the volume has already been - # created at this point, and the quota has already been absorbed. + # If quota got commited we shouldn't rollback as the volume has + # already been created and the quota has already been absorbed. return + vol_id = result['volume_id'] try: self.db.volume_destroy(context.elevated(), vol_id) @@ -585,7 +529,7 @@ class QuotaReserveTask(flow_utils.CinderTask): usages = e.kwargs['usages'] def _consumed(name): - return (usages[name]['reserved'] + usages[name]['in_use']) + return usages[name]['reserved'] + usages[name]['in_use'] def _is_over(name): for over in overs: @@ -666,6 +610,7 @@ class QuotaCommitTask(flow_utils.CinderTask): # We never produced a result and therefore can't destroy anything. if isinstance(result, ft.Failure): return + volume = result['volume_properties'] try: reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']} @@ -686,10 +631,11 @@ class QuotaCommitTask(flow_utils.CinderTask): class VolumeCastTask(flow_utils.CinderTask): """Performs a volume create cast to the scheduler or to the volume manager. - This which will signal a transition of the api workflow to another child - and/or related workflow on another component. + This will signal a transition of the api workflow to another child and/or + related workflow on another component. - Reversion strategy: N/A + Reversion strategy: rollback source volume status and error out newly + created volume. """ def __init__(self, scheduler_rpcapi, volume_rpcapi, db): @@ -709,23 +655,23 @@ class VolumeCastTask(flow_utils.CinderTask): volume_id = request_spec['volume_id'] snapshot_id = request_spec['snapshot_id'] image_id = request_spec['image_id'] - group_id = request_spec['consistencygroup_id'] + cgroup_id = request_spec['consistencygroup_id'] host = None cgsnapshot_id = request_spec['cgsnapshot_id'] - if group_id: - group = self.db.consistencygroup_get(context, group_id) - if group: - host = group.get('host', None) + if cgroup_id: + cgroup = self.db.consistencygroup_get(context, cgroup_id) + if cgroup: + host = cgroup.get('host', None) elif snapshot_id and CONF.snapshot_same_host: # NOTE(Rongze Zhu): A simple solution for bug 1008866. # - # If snapshot_id is set, make the call create volume directly to - # the volume host where the snapshot resides instead of passing it - # through the scheduler. So snapshot can be copy to new volume. + # If snapshot_id is set and CONF.snapshot_same_host is True, make + # the call create volume directly to the volume host where the + # snapshot resides instead of passing it through the scheduler, so + # snapshot can be copied to the new volume. snapshot = objects.Snapshot.get_by_id(context, snapshot_id) - source_volume_ref = self.db.volume_get(context, - snapshot.volume_id) + source_volume_ref = self.db.volume_get(context, snapshot.volume_id) host = source_volume_ref['host'] elif source_volid: source_volume_ref = self.db.volume_get(context, source_volid) @@ -763,7 +709,7 @@ class VolumeCastTask(flow_utils.CinderTask): image_id=image_id, source_volid=source_volid, source_replicaid=source_replicaid, - consistencygroup_id=group_id) + consistencygroup_id=cgroup_id) def execute(self, context, **kwargs): scheduler_hints = kwargs.pop('scheduler_hints', None) @@ -788,9 +734,8 @@ class VolumeCastTask(flow_utils.CinderTask): LOG.error(_LE('Unexpected build error:'), exc_info=exc_info) -def get_flow(scheduler_rpcapi, volume_rpcapi, db_api, - image_service_api, availability_zones, - create_what): +def get_flow(db_api, image_service_api, availability_zones, create_what, + scheduler_rpcapi=None, volume_rpcapi=None): """Constructs and returns the api entrypoint flow. This flow will do the following: @@ -816,39 +761,10 @@ def get_flow(scheduler_rpcapi, volume_rpcapi, db_api, EntryCreateTask(db_api), QuotaCommitTask()) - # This will cast it out to either the scheduler or volume manager via - # the rpc apis provided. - api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api)) - - # Now load (but do not run) the flow using the provided initial data. - return taskflow.engines.load(api_flow, store=create_what) - - -def get_flow_no_rpc(db_api, image_service_api, availability_zones, - create_what): - """Constructs and returns the api entrypoint flow. - - This flow will do the following: - - 1. Inject keys & values for dependent tasks. - 2. Extracts and validates the input keys & values. - 3. Reserves the quota (reverts quota on any failures). - 4. Creates the database entry. - 5. Commits the quota. - """ - - flow_name = ACTION.replace(":", "_") + "_api" - api_flow = linear_flow.Flow(flow_name) - - api_flow.add(ExtractVolumeRequestTask( - image_service_api, - availability_zones, - rebind={'size': 'raw_size', - 'availability_zone': 'raw_availability_zone', - 'volume_type': 'raw_volume_type'})) - api_flow.add(QuotaReserveTask(), - EntryCreateTask(db_api), - QuotaCommitTask()) + if scheduler_rpcapi and volume_rpcapi: + # This will cast it out to either the scheduler or volume manager via + # the rpc apis provided. + api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api)) # Now load (but do not run) the flow using the provided initial data. return taskflow.engines.load(api_flow, store=create_what)