]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Refactoring of manager's create_volume flow
authorMichal Dulko <michal.dulko@intel.com>
Tue, 21 Jul 2015 10:30:56 +0000 (12:30 +0200)
committerMichal Dulko <michal.dulko@intel.com>
Fri, 24 Jul 2015 09:19:09 +0000 (11:19 +0200)
This commit resolves a TODO item (bulk metadata create), rephrases
some comments and makes use of new TaskFlow functionality to save
results of revert command to make decisions in c-vol manager instead
of doing an ugly workaround by injecting the information into exception
raised.

Partial-Implements: blueprint taskflow-refactoring
Depends-On: I82ebd0102aa5f50d98d9d6b48b13cd659d6dbcec
Change-Id: Ia4739c87ca83ae725e16256e3f2c596c5e6d935b

cinder/db/api.py
cinder/db/sqlalchemy/api.py
cinder/flow_utils.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py

index 3c5779d81433dc5eb529fff0f7ccdcf4a597f9c1..c7c55b8197ecb919b06e77cc5030a03389c10a13 100644 (file)
@@ -639,6 +639,12 @@ def volume_glance_metadata_create(context, volume_id, key, value):
                                               value)
 
 
+def volume_glance_metadata_bulk_create(context, volume_id, metadata):
+    """Add Glance metadata for specified volume (multiple pairs)."""
+    return IMPL.volume_glance_metadata_bulk_create(context, volume_id,
+                                                   metadata)
+
+
 def volume_glance_metadata_get_all(context):
     """Return the glance metadata for all volumes."""
     return IMPL.volume_glance_metadata_get_all(context)
index 02208a54447c6ef3fc518764d3e131e1f1c7282f..87bcb76abf20721eebc796c84cf7689612410b8c 100644 (file)
@@ -3154,6 +3154,34 @@ def volume_glance_metadata_create(context, volume_id, key, value):
     return
 
 
+@require_context
+@require_volume_exists
+def volume_glance_metadata_bulk_create(context, volume_id, metadata):
+    """Update the Glance metadata for a volume by adding new key:value pairs.
+
+    This API does not support changing the value of a key once it has been
+    created.
+    """
+
+    session = get_session()
+    with session.begin():
+        for (key, value) in metadata.items():
+            rows = session.query(models.VolumeGlanceMetadata).\
+                filter_by(volume_id=volume_id).\
+                filter_by(key=key).\
+                filter_by(deleted=False).all()
+
+            if len(rows) > 0:
+                raise exception.GlanceMetadataExists(key=key,
+                                                     volume_id=volume_id)
+
+            vol_glance_metadata = models.VolumeGlanceMetadata()
+            vol_glance_metadata.volume_id = volume_id
+            vol_glance_metadata.key = key
+            vol_glance_metadata.value = six.text_type(value)
+            session.add(vol_glance_metadata)
+
+
 @require_context
 @require_snapshot_exists
 def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id):
index 3bca9c820e61bc273216acfd8de370e85a15b44a..711f5d3017eb7208c8ab0732c181ac4032ce22c9 100644 (file)
@@ -40,9 +40,11 @@ class CinderTask(task.Task):
     """
 
     def __init__(self, addons=None, **kwargs):
-        super(CinderTask, self).__init__(_make_task_name(self.__class__,
-                                                         addons),
-                                         **kwargs)
+        super(CinderTask, self).__init__(self.make_name(addons), **kwargs)
+
+    @classmethod
+    def make_name(cls, addons=None):
+        return _make_task_name(cls, addons)
 
 
 class DynamicLogListener(logging_listener.DynamicLoggingListener):
index 0f775a639ca94d179d6632fa2db20fa15d2b1172..7b9c6d4f7a99b0ac44b4cb46ee3f725fb942c5ec 100644 (file)
@@ -49,6 +49,8 @@ IMAGE_ATTRIBUTES = (
 class OnFailureRescheduleTask(flow_utils.CinderTask):
     """Triggers a rescheduling request to be sent when reverting occurs.
 
+    If rescheduling doesn't occur this task errors out the volume.
+
     Reversion strategy: Triggers the rescheduling mechanism whereby a cast gets
     sent to the scheduler rpc api to allow for an attempt X of Y for scheduling
     this volume elsewhere.
@@ -88,6 +90,31 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
     def execute(self, **kwargs):
         pass
 
+    def _pre_reschedule(self, context, volume_id):
+        """Actions that happen before the rescheduling attempt occur here."""
+
+        try:
+            # Update volume's timestamp and host.
+            #
+            # NOTE(harlowja): this is awkward to be done here, shouldn't
+            # this happen at the scheduler itself and not before it gets
+            # sent to the scheduler? (since what happens if it never gets
+            # there??). It's almost like we need a status of 'on-the-way-to
+            # scheduler' in the future.
+            # We don't need to update the volume's status to creating, since
+            # we haven't changed it to error.
+            update = {
+                'scheduled_at': timeutils.utcnow(),
+                'host': None,
+            }
+            LOG.debug("Updating volume %(volume_id)s with %(update)s.",
+                      {'update': update, 'volume_id': volume_id})
+            self.db.volume_update(context, volume_id, update)
+        except exception.CinderException:
+            # Don't let updating the state cause the rescheduling to fail.
+            LOG.exception(_LE("Volume %s: update volume state failed."),
+                          volume_id)
+
     def _reschedule(self, context, cause, request_spec, filter_properties,
                     volume_id):
         """Actions that happen during the rescheduling attempt occur here."""
@@ -122,47 +149,18 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
 
         LOG.debug("Volume %s: re-scheduled", volume_id)
 
-    def _pre_reschedule(self, context, volume_id):
-        """Actions that happen before the rescheduling attempt occur here."""
-
-        try:
-            # Update volume's timestamp and host.
-            #
-            # NOTE(harlowja): this is awkward to be done here, shouldn't
-            # this happen at the scheduler itself and not before it gets
-            # sent to the scheduler? (since what happens if it never gets
-            # there??). It's almost like we need a status of 'on-the-way-to
-            # scheduler' in the future.
-            # We don't need to update the volume's status to creating, since
-            # we haven't changed it to error.
-            update = {
-                'scheduled_at': timeutils.utcnow(),
-                'host': None
-            }
-            LOG.debug("Updating volume %(volume_id)s with %(update)s.",
-                      {'update': update, 'volume_id': volume_id})
-            self.db.volume_update(context, volume_id, update)
-        except exception.CinderException:
-            # Don't let updating the state cause the rescheduling to fail.
-            LOG.exception(_LE("Volume %s: update volume state failed."),
-                          volume_id)
-
-    def revert(self, context, result, flow_failures, **kwargs):
-        volume_id = kwargs['volume_id']
+    def revert(self, context, result, flow_failures, volume_id, **kwargs):
+        # NOTE(dulek): Revert is occurring and manager need to know if
+        # rescheduling happened. We're returning boolean flag that will
+        # indicate that. It which will be available in flow engine store
+        # through get_revert_result method.
 
         # If do not want to be rescheduled, just set the volume's status to
         # error and return.
         if not self.do_reschedule:
             common.error_out_volume(context, self.db, volume_id)
             LOG.error(_LE("Volume %s: create failed"), volume_id)
-            return
-
-        # NOTE(dulek): Revert is occurring and manager need to know if
-        # rescheduling happened. We're injecting this information into
-        # exception that will be caught there. This is ugly and we need
-        # TaskFlow to support better way of returning data from reverted flow.
-        cause = list(flow_failures.values())[0]
-        cause.exception.rescheduled = False
+            return False
 
         # Check if we have a cause which can tell us not to reschedule and
         # set the volume's status to error.
@@ -170,20 +168,22 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
             if failure.check(*self.no_reschedule_types):
                 common.error_out_volume(context, self.db, volume_id)
                 LOG.error(_LE("Volume %s: create failed"), volume_id)
-                return
+                return False
 
         # Use a different context when rescheduling.
         if self.reschedule_context:
+            cause = list(flow_failures.values())[0]
             context = self.reschedule_context
             try:
                 self._pre_reschedule(context, volume_id)
-                self._reschedule(context, cause, **kwargs)
+                self._reschedule(context, cause, volume_id=volume_id, **kwargs)
                 self._post_reschedule(context, volume_id)
-                # Inject information that we rescheduled
-                cause.exception.rescheduled = True
+                return True
             except exception.CinderException:
                 LOG.exception(_LE("Volume %s: rescheduling failed"), volume_id)
 
+        return False
+
 
 class ExtractVolumeRefTask(flow_utils.CinderTask):
     """Extracts volume reference for given volume id."""
@@ -206,11 +206,11 @@ class ExtractVolumeRefTask(flow_utils.CinderTask):
         return volume_ref
 
     def revert(self, context, volume_id, result, **kwargs):
-        if isinstance(result, ft.Failure):
+        if isinstance(result, ft.Failure) or not self.set_error:
             return
-        if self.set_error:
-            common.error_out_volume(context, self.db, volume_id)
-            LOG.error(_LE("Volume %s: create failed"), volume_id)
+
+        common.error_out_volume(context, self.db, volume_id)
+        LOG.error(_LE("Volume %s: create failed"), volume_id)
 
 
 class ExtractVolumeSpecTask(flow_utils.CinderTask):
@@ -561,21 +561,14 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
             if value is not None:
                 property_metadata[key] = value
 
-        # NOTE(harlowja): The best way for this to happen would be in bulk,
-        # but that doesn't seem to exist (yet), so we go through one by one
-        # which means we can have partial create/update failure.
         volume_metadata = dict(property_metadata)
         volume_metadata.update(base_metadata)
         LOG.debug("Creating volume glance metadata for volume %(volume_id)s"
                   " backed by image %(image_id)s with: %(vol_metadata)s.",
                   {'volume_id': volume_id, 'image_id': image_id,
                    'vol_metadata': volume_metadata})
-        for (key, value) in volume_metadata.items():
-            try:
-                self.db.volume_glance_metadata_create(context, volume_id,
-                                                      key, value)
-            except exception.GlanceMetadataExists:
-                pass
+        self.db.volume_glance_metadata_bulk_create(context, volume_id,
+                                                   volume_metadata)
 
     def _create_from_image(self, context, volume_ref,
                            image_location, image_id, image_meta,
@@ -710,7 +703,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
             # TODO(harlowja): is it acceptable to only log if this fails??
             # or are there other side-effects that this will cause if the
             # status isn't updated correctly (aka it will likely be stuck in
-            # 'building' if this fails)??
+            # 'creating' if this fails)??
             volume_ref = self.db.volume_update(context, volume_id, update)
             # Now use the parent to notify.
             super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
@@ -737,7 +730,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
     3. Selects 1 of 2 activated only on *failure* tasks (one to update the db
        status & notify or one to update the db status & notify & *reschedule*).
     4. Extracts a volume specification from the provided inputs.
-    5. Notifies that the volume has start to be created.
+    5. Notifies that the volume has started to be created.
     6. Creates a volume from the extracted volume specification.
     7. Attaches a on-success *only* task that notifies that the volume creation
        has ended and performs further database status updates.
@@ -761,7 +754,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
     retry = filter_properties.get('retry', None)
 
     # Always add OnFailureRescheduleTask and we handle the change of volume's
-    # status when revert task flow. Meanwhile, no need to revert process of
+    # status when reverting the flow. Meanwhile, no need to revert process of
     # ExtractVolumeRefTask.
     do_reschedule = allow_reschedule and request_spec and retry
     volume_flow.add(OnFailureRescheduleTask(reschedule_context, db,
index b9c6e21b3dc33b5207e20e759d57c59fcafe0093..a231e2f6304dee353244b83ed5fcf48d803d7dd0 100644 (file)
@@ -466,24 +466,31 @@ class VolumeManager(manager.SchedulerDependentManager):
         # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
         # decide if allocated_capacity should be incremented.
         rescheduled = False
+        vol_ref = None
 
         try:
             if locked_action is None:
                 _run_flow()
             else:
                 _run_flow_locked()
-        except Exception as e:
-            if hasattr(e, 'rescheduled'):
-                rescheduled = e.rescheduled
-            raise
         finally:
             try:
                 vol_ref = flow_engine.storage.fetch('volume_ref')
-            except tfe.NotFound as e:
-                # Flow was reverted, fetching volume_ref from the DB.
-                vol_ref = self.db.volume_get(context, volume_id)
+            except tfe.NotFound:
+                # If there's no vol_ref, then flow is reverted. Lets check out
+                # if rescheduling occurred.
+                try:
+                    rescheduled = flow_engine.storage.get_revert_result(
+                        create_volume.OnFailureRescheduleTask.make_name(
+                            [create_volume.ACTION]))
+                except tfe.NotFound:
+                    pass
 
             if not rescheduled:
+                if not vol_ref:
+                    # Flow was reverted and not rescheduled, fetching
+                    # volume_ref from the DB, because it will be needed.
+                    vol_ref = self.db.volume_get(context, volume_id)
                 # NOTE(dulek): Volume wasn't rescheduled so we need to update
                 # volume stats as these are decremented on delete.
                 self._update_allocated_capacity(vol_ref)