]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Refactor API create_volume flow
authorMichal Dulko <michal.dulko@intel.com>
Tue, 16 Jun 2015 06:21:21 +0000 (08:21 +0200)
committerMichal Dulko <michal.dulko@intel.com>
Tue, 16 Jun 2015 06:21:21 +0000 (08:21 +0200)
This commit refactors the code to be more compliant with DRY rule.
I've also changed some comments to be more readable and correct.

Partial-Implements: blueprint taskflow-refactoring
Change-Id: I0345c07ac490b6bd738c0fdac0b621bb9156f899

cinder/volume/api.py
cinder/volume/flows/api/create_volume.py

index e62144f8c3f09794d94690d7a052875f7abd1a85..4afd9bbef4759de3381417e22aafbb7a02286890 100644 (file)
@@ -277,18 +277,14 @@ class API(base.Base):
             'multiattach': multiattach,
         }
         try:
-            if cgsnapshot:
-                flow_engine = create_volume.get_flow_no_rpc(self.db,
-                                                            self.image_service,
-                                                            availability_zones,
-                                                            create_what)
-            else:
-                flow_engine = create_volume.get_flow(self.scheduler_rpcapi,
-                                                     self.volume_rpcapi,
-                                                     self.db,
-                                                     self.image_service,
-                                                     availability_zones,
-                                                     create_what)
+            sched_rpcapi = self.scheduler_rpcapi if not cgsnapshot else None
+            volume_rpcapi = self.volume_rpcapi if not cgsnapshot else None
+            flow_engine = create_volume.get_flow(self.db,
+                                                 self.image_service,
+                                                 availability_zones,
+                                                 create_what,
+                                                 sched_rpcapi,
+                                                 volume_rpcapi)
         except Exception:
             msg = _('Failed to create api volume flow.')
             LOG.exception(msg)
index 1d2746238793177d537554406ca79f4f6bd1381b..aa0305f6fca4b78f87023596fe88cbbc0fa7a6b7 100644 (file)
@@ -72,117 +72,64 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         self.availability_zones = availability_zones
 
     @staticmethod
-    def _extract_consistencygroup(consistencygroup):
-        """Extracts the consistencygroup id from the provided consistencygroup.
-
-        This function validates the input consistencygroup dict and checks that
-        the status of that consistencygroup is valid for creating a volume in.
-        """
-
-        consistencygroup_id = None
-        if consistencygroup is not None:
-            if consistencygroup['status'] not in CG_PROCEED_STATUS:
-                msg = _("Originating consistencygroup status must be one"
-                        " of '%s' values")
-                msg = msg % (", ".join(CG_PROCEED_STATUS))
-                raise exception.InvalidConsistencyGroup(reason=msg)
-            consistencygroup_id = consistencygroup['id']
-        return consistencygroup_id
-
-    @staticmethod
-    def _extract_cgsnapshot(cgsnapshot):
-        """Extracts the cgsnapshot id from the provided cgsnapshot.
-
-        This function validates the input cgsnapshot dict and checks that
-        the status of that cgsnapshot is valid for creating a cg from.
-        """
-
-        cgsnapshot_id = None
-        if cgsnapshot:
-            if cgsnapshot['status'] not in CGSNAPSHOT_PROCEED_STATUS:
-                msg = _("Originating CGSNAPSHOT status must be one"
-                        " of '%s' values")
-                msg = msg % (", ".join(CGSNAPSHOT_PROCEED_STATUS))
-                raise exception.InvalidCgSnapshot(reason=msg)
-            cgsnapshot_id = cgsnapshot['id']
-        return cgsnapshot_id
-
-    @staticmethod
-    def _extract_snapshot(snapshot):
-        """Extracts the snapshot id from the provided snapshot (if provided).
-
-        This function validates the input snapshot dict and checks that the
-        status of that snapshot is valid for creating a volume from.
-        """
-
-        snapshot_id = None
-        if snapshot is not None:
-            if snapshot['status'] not in SNAPSHOT_PROCEED_STATUS:
-                msg = _("Originating snapshot status must be one"
-                        " of %s values")
-                msg = msg % (", ".join(SNAPSHOT_PROCEED_STATUS))
-                # TODO(harlowja): what happens if the status changes after this
-                # initial snapshot status check occurs??? Seems like someone
-                # could delete the snapshot after this check passes but before
-                # the volume is officially created?
-                raise exception.InvalidSnapshot(reason=msg)
-            snapshot_id = snapshot['id']
-        return snapshot_id
-
-    @staticmethod
-    def _extract_source_volume(source_volume):
-        """Extracts the volume id from the provided volume (if provided).
-
-        This function validates the input source_volume dict and checks that
-        the status of that source_volume is valid for creating a volume from.
-        """
-
-        source_volid = None
-        if source_volume is not None:
-            if source_volume['status'] not in SRC_VOL_PROCEED_STATUS:
-                msg = _("Unable to create a volume from an originating source"
-                        " volume when its status is not one of %s"
-                        " values")
-                msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
-                # TODO(harlowja): what happens if the status changes after this
-                # initial volume status check occurs??? Seems like someone
-                # could delete the volume after this check passes but before
-                # the volume is officially created?
-                raise exception.InvalidVolume(reason=msg)
-            source_volid = source_volume['id']
-        return source_volid
-
-    @staticmethod
-    def _extract_source_replica(source_replica):
-        """Extracts the volume id from the provided replica (if provided).
-
-        This function validates the input replica_volume dict and checks that
-        the status of that replica_volume is valid for creating a volume from.
+    def _extract_resource(resource, allowed_vals, exc, resource_name,
+                          props=('status',)):
+        """Extracts the resource id from the provided resource.
+
+        This method validates the input resource dict and checks that the
+        properties which names are passed in `props` argument match
+        corresponding lists in `allowed` argument. In case of mismatch
+        exception of type exc is raised.
+
+        :param resource: Resource dict.
+        :param allowed_vals: Tuple of allowed values lists.
+        :param exc: Exception type to raise.
+        :param resource_name: Name of resource - used to construct log message.
+        :param props: Tuple of resource properties names to validate.
+        :return: Id of a resource.
         """
 
-        source_replicaid = None
-        if source_replica is not None:
-            if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
-                msg = _("Unable to create a volume from an originating source"
-                        " volume when its status is not one of %s"
-                        " values")
-                msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
-                # TODO(harlowja): what happens if the status changes after this
-                # initial volume status check occurs??? Seems like someone
-                # could delete the volume after this check passes but before
-                # the volume is officially created?
-                raise exception.InvalidVolume(reason=msg)
-            replication_status = source_replica['replication_status']
-            if replication_status not in REPLICA_PROCEED_STATUS:
-                msg = _("Unable to create a volume from a replica"
-                        " when replication status is not one of %s"
-                        " values")
-                msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
-                # TODO(ronenkat): what happens if the replication status
-                # changes after this initial volume status check occurs???
-                raise exception.InvalidVolume(reason=msg)
-            source_replicaid = source_replica['id']
-        return source_replicaid
+        resource_id = None
+        if resource:
+            for prop, allowed_states in zip(props, allowed_vals):
+                if resource[prop] not in allowed_states:
+                    msg = _("Originating %(res)s %(prop)s must be one of"
+                            "'%(vals)s' values")
+                    msg = msg % {'res': resource_name,
+                                 'prop': prop,
+                                 'vals': ', '.join(allowed_states)}
+                    # TODO(harlowja): what happens if the status changes after
+                    # this initial resource status check occurs??? Seems like
+                    # someone could delete the resource after this check passes
+                    # but before the volume is officially created?
+                    raise exc(reason=msg)
+                resource_id = resource['id']
+        return resource_id
+
+    def _extract_consistencygroup(self, consistencygroup):
+        return self._extract_resource(consistencygroup, (CG_PROCEED_STATUS,),
+                                      exception.InvalidConsistencyGroup,
+                                      'consistencygroup')
+
+    def _extract_cgsnapshot(self, cgsnapshot):
+        return self._extract_resource(cgsnapshot, (CGSNAPSHOT_PROCEED_STATUS,),
+                                      exception.InvalidCgSnapshot,
+                                      'CGSNAPSHOT')
+
+    def _extract_snapshot(self, snapshot):
+        return self._extract_resource(snapshot, (SNAPSHOT_PROCEED_STATUS,),
+                                      exception.InvalidSnapshot, 'snapshot')
+
+    def _extract_source_volume(self, source_volume):
+        return self._extract_resource(source_volume, (SRC_VOL_PROCEED_STATUS,),
+                                      exception.InvalidVolume, 'source volume')
+
+    def _extract_source_replica(self, source_replica):
+        return self._extract_resource(source_replica, (SRC_VOL_PROCEED_STATUS,
+                                                       REPLICA_PROCEED_STATUS),
+                                      exception.InvalidVolume,
+                                      'replica', ('status',
+                                                  'replication_status'))
 
     @staticmethod
     def _extract_size(size, source_volume, snapshot):
@@ -328,6 +275,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
             else:
                 # For backwards compatibility use the storage_availability_zone
                 availability_zone = CONF.storage_availability_zone
+
         if availability_zone not in self.availability_zones:
             msg = _("Availability zone '%s' is invalid") % (availability_zone)
             LOG.warning(msg)
@@ -379,27 +327,22 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         return encryption_key_id
 
     def _get_volume_type_id(self, volume_type, source_volume, snapshot):
-        volume_type_id = None
         if not volume_type and source_volume:
-            volume_type_id = source_volume['volume_type_id']
+            return source_volume['volume_type_id']
         elif snapshot is not None:
             if volume_type:
                 current_volume_type_id = volume_type.get('id')
-                if (current_volume_type_id !=
-                        snapshot['volume_type_id']):
+                if current_volume_type_id != snapshot['volume_type_id']:
                     msg = _LW("Volume type will be changed to "
                               "be the same as the source volume.")
                     LOG.warning(msg)
-            volume_type_id = snapshot['volume_type_id']
+            return snapshot['volume_type_id']
         else:
-            volume_type_id = volume_type.get('id')
-
-        return volume_type_id
+            return volume_type.get('id')
 
     def execute(self, context, size, snapshot, image_id, source_volume,
-                availability_zone, volume_type, metadata,
-                key_manager, source_replica,
-                consistencygroup, cgsnapshot):
+                availability_zone, volume_type, metadata, key_manager,
+                source_replica, consistencygroup, cgsnapshot):
 
         utils.check_exclusive_options(snapshot=snapshot,
                                       imageRef=image_id,
@@ -425,7 +368,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         # their volume type matches the source volume is too convoluted. We
         # should copy encryption metadata from the encrypted volume type to the
         # volume upon creation and propagate that information to each snapshot.
-        # This strategy avoid any dependency upon the encrypted volume type.
+        # This strategy avoids any dependency upon the encrypted volume type.
         def_vol_type = volume_types.get_default_volume_type()
         if not volume_type and not source_volume and not snapshot:
             volume_type = def_vol_type
@@ -532,14 +475,15 @@ class EntryCreateTask(flow_utils.CinderTask):
         }
 
     def revert(self, context, result, optional_args, **kwargs):
-        # We never produced a result and therefore can't destroy anything.
         if isinstance(result, ft.Failure):
+            # We never produced a result and therefore can't destroy anything.
             return
 
         if optional_args['is_quota_committed']:
-            # Committed quota doesn't rollback as the volume has already been
-            # created at this point, and the quota has already been absorbed.
+            # If quota got commited we shouldn't rollback as the volume has
+            # already been created and the quota has already been absorbed.
             return
+
         vol_id = result['volume_id']
         try:
             self.db.volume_destroy(context.elevated(), vol_id)
@@ -585,7 +529,7 @@ class QuotaReserveTask(flow_utils.CinderTask):
             usages = e.kwargs['usages']
 
             def _consumed(name):
-                return (usages[name]['reserved'] + usages[name]['in_use'])
+                return usages[name]['reserved'] + usages[name]['in_use']
 
             def _is_over(name):
                 for over in overs:
@@ -666,6 +610,7 @@ class QuotaCommitTask(flow_utils.CinderTask):
         # We never produced a result and therefore can't destroy anything.
         if isinstance(result, ft.Failure):
             return
+
         volume = result['volume_properties']
         try:
             reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
@@ -686,10 +631,11 @@ class QuotaCommitTask(flow_utils.CinderTask):
 class VolumeCastTask(flow_utils.CinderTask):
     """Performs a volume create cast to the scheduler or to the volume manager.
 
-    This which will signal a transition of the api workflow to another child
-    and/or related workflow on another component.
+    This will signal a transition of the api workflow to another child and/or
+    related workflow on another component.
 
-    Reversion strategy: N/A
+    Reversion strategy: rollback source volume status and error out newly
+    created volume.
     """
 
     def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
@@ -709,23 +655,23 @@ class VolumeCastTask(flow_utils.CinderTask):
         volume_id = request_spec['volume_id']
         snapshot_id = request_spec['snapshot_id']
         image_id = request_spec['image_id']
-        group_id = request_spec['consistencygroup_id']
+        cgroup_id = request_spec['consistencygroup_id']
         host = None
         cgsnapshot_id = request_spec['cgsnapshot_id']
 
-        if group_id:
-            group = self.db.consistencygroup_get(context, group_id)
-            if group:
-                host = group.get('host', None)
+        if cgroup_id:
+            cgroup = self.db.consistencygroup_get(context, cgroup_id)
+            if cgroup:
+                host = cgroup.get('host', None)
         elif snapshot_id and CONF.snapshot_same_host:
             # NOTE(Rongze Zhu): A simple solution for bug 1008866.
             #
-            # If snapshot_id is set, make the call create volume directly to
-            # the volume host where the snapshot resides instead of passing it
-            # through the scheduler. So snapshot can be copy to new volume.
+            # If snapshot_id is set and CONF.snapshot_same_host is True, make
+            # the call create volume directly to the volume host where the
+            # snapshot resides instead of passing it through the scheduler, so
+            # snapshot can be copied to the new volume.
             snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
-            source_volume_ref = self.db.volume_get(context,
-                                                   snapshot.volume_id)
+            source_volume_ref = self.db.volume_get(context, snapshot.volume_id)
             host = source_volume_ref['host']
         elif source_volid:
             source_volume_ref = self.db.volume_get(context, source_volid)
@@ -763,7 +709,7 @@ class VolumeCastTask(flow_utils.CinderTask):
                     image_id=image_id,
                     source_volid=source_volid,
                     source_replicaid=source_replicaid,
-                    consistencygroup_id=group_id)
+                    consistencygroup_id=cgroup_id)
 
     def execute(self, context, **kwargs):
         scheduler_hints = kwargs.pop('scheduler_hints', None)
@@ -788,9 +734,8 @@ class VolumeCastTask(flow_utils.CinderTask):
         LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
 
 
-def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
-             image_service_api, availability_zones,
-             create_what):
+def get_flow(db_api, image_service_api, availability_zones, create_what,
+             scheduler_rpcapi=None, volume_rpcapi=None):
     """Constructs and returns the api entrypoint flow.
 
     This flow will do the following:
@@ -816,39 +761,10 @@ def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
                  EntryCreateTask(db_api),
                  QuotaCommitTask())
 
-    # This will cast it out to either the scheduler or volume manager via
-    # the rpc apis provided.
-    api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
-
-    # Now load (but do not run) the flow using the provided initial data.
-    return taskflow.engines.load(api_flow, store=create_what)
-
-
-def get_flow_no_rpc(db_api, image_service_api, availability_zones,
-                    create_what):
-    """Constructs and returns the api entrypoint flow.
-
-    This flow will do the following:
-
-    1. Inject keys & values for dependent tasks.
-    2. Extracts and validates the input keys & values.
-    3. Reserves the quota (reverts quota on any failures).
-    4. Creates the database entry.
-    5. Commits the quota.
-    """
-
-    flow_name = ACTION.replace(":", "_") + "_api"
-    api_flow = linear_flow.Flow(flow_name)
-
-    api_flow.add(ExtractVolumeRequestTask(
-        image_service_api,
-        availability_zones,
-        rebind={'size': 'raw_size',
-                'availability_zone': 'raw_availability_zone',
-                'volume_type': 'raw_volume_type'}))
-    api_flow.add(QuotaReserveTask(),
-                 EntryCreateTask(db_api),
-                 QuotaCommitTask())
+    if scheduler_rpcapi and volume_rpcapi:
+        # This will cast it out to either the scheduler or volume manager via
+        # the rpc apis provided.
+        api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
 
     # Now load (but do not run) the flow using the provided initial data.
     return taskflow.engines.load(api_flow, store=create_what)