]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Fix allocated_capacity tracking when rescheduling
authorMichal Dulko <michal.dulko@intel.com>
Thu, 12 Feb 2015 08:39:08 +0000 (09:39 +0100)
committerMichal Dulko <michal.dulko@intel.com>
Fri, 6 Mar 2015 09:36:15 +0000 (10:36 +0100)
This aims to fix capacity tracking in the manager in case of
rescheduling. Idea is to inject information if volume was rescheduled
into the exception passed up so manager can decide if incrementing
allocated_capacity is needed. Then in the manager finally block is used
to make sure that allocated_capacity is incremented in case of failure
that haven't triggered rescheduling. Also adding two unit tests checking
if tracking is correct.

Closes-Bug: 1408763

Change-Id: Icd6b04ac60c17cbda225d3be8bcc4435ea0cd7a8

cinder/tests/test_volume.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py

index fa21ee74686cc53197f52b1f6ad70a92b113c90a..1989bd2243bf6ea8032983f18d7359d3b1b2c902 100644 (file)
@@ -356,6 +356,25 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.assertEqual(volume.status, "error")
         db.volume_destroy(context.get_admin_context(), volume_id)
 
+    def test_create_driver_not_initialized_rescheduling(self):
+        self.volume.driver._initialized = False
+
+        volume = tests_utils.create_volume(
+            self.context,
+            availability_zone=CONF.storage_availability_zone,
+            **self.volume_params)
+
+        volume_id = volume['id']
+        self.assertRaises(exception.DriverNotInitialized,
+                          self.volume.create_volume,
+                          self.context, volume_id,
+                          {'volume_properties': self.volume_params})
+        # NOTE(dulek): Volume should be rescheduled as we passed request_spec,
+        # assert that it wasn't counted in allocated_capacity tracking.
+        self.assertEqual(self.volume.stats['pools'], {})
+
+        db.volume_destroy(context.get_admin_context(), volume_id)
+
     @mock.patch.object(QUOTAS, 'rollback')
     @mock.patch.object(QUOTAS, 'commit')
     @mock.patch.object(QUOTAS, 'reserve')
@@ -2598,14 +2617,17 @@ class VolumeTestCase(BaseVolumeTestCase):
                                               **self.volume_params)['id']
         # creating volume testdata
         try:
+            request_spec = {'volume_properties': self.volume_params}
             self.volume.create_volume(self.context,
                                       volume_id,
+                                      request_spec,
                                       image_id=image_id)
         finally:
             # cleanup
             os.unlink(dst_path)
             volume = db.volume_get(self.context, volume_id)
-            return volume
+
+        return volume
 
     def test_create_volume_from_image_cloned_status_available(self):
         """Test create volume from image via cloning.
@@ -2661,6 +2683,25 @@ class VolumeTestCase(BaseVolumeTestCase):
         db.volume_destroy(self.context, volume_id)
         os.unlink(dst_path)
 
+    def test_create_volume_from_image_copy_exception_rescheduling(self):
+        """Test create volume with ImageCopyFailure
+
+        This exception should not trigger rescheduling and allocated_capacity
+        should be incremented so we're having assert for that here.
+        """
+        def fake_copy_image_to_volume(context, volume, image_service,
+                                      image_id):
+            raise exception.ImageCopyFailure()
+
+        self.stubs.Set(self.volume.driver, 'copy_image_to_volume',
+                       fake_copy_image_to_volume)
+        self.assertRaises(exception.ImageCopyFailure,
+                          self._create_volume_from_image)
+        # NOTE(dulek): Rescheduling should not occur, so lets assert that
+        # allocated_capacity is incremented.
+        self.assertDictEqual(self.volume.stats['pools'],
+                             {'_pool0': {'allocated_capacity_gb': 1}})
+
     def test_create_volume_from_exact_sized_image(self):
         """Verify that an image which is exactly the same size as the
         volume, will work correctly.
index 6db79057d0192e4914b813bc5bfee26d2376f1da..59b1fceb6983b6d04a3a7b719d55ab15c8336190 100644 (file)
@@ -134,6 +134,7 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
             update = {
                 'status': 'creating',
                 'scheduled_at': timeutils.utcnow(),
+                'host': None
             }
             LOG.debug("Updating volume %(volume_id)s with %(update)s." %
                       {'update': update, 'volume_id': volume_id})
@@ -145,6 +146,13 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
                           volume_id)
 
     def revert(self, context, result, flow_failures, **kwargs):
+        # NOTE(dulek): Revert is occurring and manager need to know if
+        # rescheduling happened. We're injecting this information into
+        # exception that will be caught there. This is ugly and we need
+        # TaskFlow to support better way of returning data from reverted flow.
+        cause = list(flow_failures.values())[0]
+        cause.exception.rescheduled = False
+
         # Check if we have a cause which can tell us not to reschedule.
         for failure in flow_failures.values():
             if failure.check(*self.no_reschedule_types):
@@ -155,10 +163,11 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
         if self.reschedule_context:
             context = self.reschedule_context
             try:
-                cause = list(flow_failures.values())[0]
                 self._pre_reschedule(context, volume_id)
                 self._reschedule(context, cause, **kwargs)
                 self._post_reschedule(context, volume_id)
+                # Inject information that we rescheduled
+                cause.exception.rescheduled = True
             except exception.CinderException:
                 LOG.exception(_LE("Volume %s: rescheduling failed"), volume_id)
 
@@ -180,7 +189,6 @@ class ExtractVolumeRefTask(flow_utils.CinderTask):
         # In the future we might want to have a lock on the volume_id so that
         # the volume can not be deleted while its still being created?
         volume_ref = self.db.volume_get(context, volume_id)
-
         return volume_ref
 
     def revert(self, context, volume_id, result, **kwargs):
index e009f71f18147530c9e4b651c5cab8bc8acdd4d8..8c230c9ff218fa848721ee35432eef44cc55bbbe 100644 (file)
@@ -47,6 +47,7 @@ from oslo_utils import importutils
 from oslo_utils import timeutils
 from oslo_utils import uuidutils
 from osprofiler import profiler
+from taskflow import exceptions as tfe
 
 from cinder import compute
 from cinder import context
@@ -420,15 +421,30 @@ class VolumeManager(manager.SchedulerDependentManager):
         def _run_flow_locked():
             _run_flow()
 
-        if locked_action is None:
-            _run_flow()
-        else:
-            _run_flow_locked()
+        # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
+        # decide if allocated_capacity should be incremented.
+        rescheduled = False
 
-        # Fetch created volume from storage
-        vol_ref = flow_engine.storage.fetch('volume')
-        # Update volume stats
-        self._update_allocated_capacity(vol_ref)
+        try:
+            if locked_action is None:
+                _run_flow()
+            else:
+                _run_flow_locked()
+        except exception.CinderException as e:
+            if hasattr(e, 'rescheduled'):
+                rescheduled = e.rescheduled
+            raise
+        finally:
+            try:
+                vol_ref = flow_engine.storage.fetch('volume_ref')
+            except tfe.NotFound as e:
+                # Flow was reverted, fetching volume_ref from the DB.
+                vol_ref = self.db.volume_get(context, volume_id)
+
+            if not rescheduled:
+                # NOTE(dulek): Volume wasn't rescheduled so we need to update
+                # volume stats as these are decremented on delete.
+                self._update_allocated_capacity(vol_ref)
 
         return vol_ref['id']