]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Adds lock for create from vol/snap to avoid race conditions
authorEdward Hope-Morley <edward.hope-morley@canonical.com>
Thu, 14 Nov 2013 19:00:00 +0000 (19:00 +0000)
committerEdward Hope-Morley <edward.hope-morley@canonical.com>
Thu, 5 Dec 2013 10:08:46 +0000 (10:08 +0000)
This patch protects create from volume/snapshot by using a
lockfile to protect the operation from concurrent deletes of
the volume/snapshot used in the create operation.

Currently, if a volume/snapshot is deleted while a volume is
being created from it that delete may complete during the
create operation thus leaving the new volume in error or stuck
state. This lock will ensure that:

(a) if a create of VolA from snap/volB is in progress, any
    delete requests for snap/volB will wait until the create
    is complete.

(b) if a delete of snap/volA is in progress, any create from
    snap/volA will wait until snap/volA delete is complete.

Co-authored-by: Takashi Natsume <natsume.takashi@lab.ntt.co.jp>
Closes-Bug: 1251334
Change-Id: Ie4bc0af789ab232593f55aa2f6b34345eb9b9929

cinder/tests/test_volume.py
cinder/volume/manager.py

index 06721dc1d3444342e97f093aefe864eae52b2a8d..daad212073fb9a945b803a7f0766b91d7be6cbcd 100644 (file)
@@ -46,6 +46,7 @@ from cinder.openstack.common.notifier import test_notifier
 from cinder.openstack.common import rpc
 import cinder.policy
 from cinder import quota
+from cinder.taskflow.patterns import linear_flow
 from cinder import test
 from cinder.tests.brick.fake_lvm import FakeBrickLVM
 from cinder.tests import conf_fixture
@@ -62,6 +63,8 @@ from cinder.volume.flows import create_volume
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volutils
 
+import eventlet
+
 QUOTAS = quota.QUOTAS
 
 CONF = cfg.CONF
@@ -110,6 +113,8 @@ class BaseVolumeTestCase(test.TestCase):
         self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
         self.stubs.Set(os.path, 'exists', lambda x: True)
         self.volume.driver.set_initialized()
+        # keep ordered record of what we execute
+        self.called = []
 
     def tearDown(self):
         try:
@@ -443,6 +448,234 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.volume.delete_snapshot(self.context, snapshot_id)
         self.volume.delete_volume(self.context, volume_src['id'])
 
+    def _mock_synchronized(self, name, *s_args, **s_kwargs):
+        def inner_sync1(f):
+            def inner_sync2(*args, **kwargs):
+                self.called.append('lock-%s' % (name))
+                ret = f(*args, **kwargs)
+                self.called.append('unlock-%s' % (name))
+                return ret
+            return inner_sync2
+        return inner_sync1
+
+    def test_create_volume_from_snapshot_check_locks(self):
+        # mock the synchroniser so we can record events
+        self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
+
+        self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
+                       lambda *args, **kwargs: None)
+
+        orig_flow = linear_flow.Flow.run
+
+        def mock_flow_run(*args, **kwargs):
+            # ensure the lock has been taken
+            self.assertEqual(len(self.called), 1)
+            # now proceed with the flow.
+            ret = orig_flow(*args, **kwargs)
+            return ret
+
+        # create source volume
+        src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+        src_vol_id = src_vol['id']
+
+        # no lock
+        self.volume.create_volume(self.context, src_vol_id)
+
+        snap_id = self._create_snapshot(src_vol_id)['id']
+        # no lock
+        self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+
+        dst_vol = tests_utils.create_volume(self.context,
+                                            snapshot_id=snap_id,
+                                            **self.volume_params)
+        dst_vol_id = dst_vol['id']
+        admin_ctxt = context.get_admin_context()
+
+        # mock the flow runner so we can do some checks
+        self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+
+        # locked
+        self.volume.create_volume(self.context, volume_id=dst_vol_id,
+                                  snapshot_id=snap_id)
+        self.assertEqual(len(self.called), 2)
+        self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
+        self.assertEqual(snap_id,
+                         db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)
+
+        # locked
+        self.volume.delete_volume(self.context, dst_vol_id)
+        self.assertEqual(len(self.called), 4)
+
+        # locked
+        self.volume.delete_snapshot(self.context, snap_id)
+        self.assertEqual(len(self.called), 6)
+
+        # locked
+        self.volume.delete_volume(self.context, src_vol_id)
+        self.assertEqual(len(self.called), 8)
+
+        self.assertEqual(self.called,
+                         ['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
+                          'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
+                          'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+                          'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+                          'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
+                          'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
+                          'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+                          'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+
+    def test_create_volume_from_volume_check_locks(self):
+        # mock the synchroniser so we can record events
+        self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
+
+        orig_flow = linear_flow.Flow.run
+
+        def mock_flow_run(*args, **kwargs):
+            # ensure the lock has been taken
+            self.assertEqual(len(self.called), 1)
+            # now proceed with the flow.
+            ret = orig_flow(*args, **kwargs)
+            return ret
+
+        # create source volume
+        src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+        src_vol_id = src_vol['id']
+
+        # no lock
+        self.volume.create_volume(self.context, src_vol_id)
+
+        dst_vol = tests_utils.create_volume(self.context,
+                                            source_volid=src_vol_id,
+                                            **self.volume_params)
+        dst_vol_id = dst_vol['id']
+        admin_ctxt = context.get_admin_context()
+
+        # mock the flow runner so we can do some checks
+        self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+
+        # locked
+        self.volume.create_volume(self.context, volume_id=dst_vol_id,
+                                  source_volid=src_vol_id)
+        self.assertEqual(len(self.called), 2)
+        self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
+        self.assertEqual(src_vol_id,
+                         db.volume_get(admin_ctxt, dst_vol_id).source_volid)
+
+        # locked
+        self.volume.delete_volume(self.context, dst_vol_id)
+        self.assertEqual(len(self.called), 4)
+
+        # locked
+        self.volume.delete_volume(self.context, src_vol_id)
+        self.assertEqual(len(self.called), 6)
+
+        self.assertEqual(self.called,
+                         ['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+                          'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
+                          'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+                          'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+                          'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+                          'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+
+    def test_create_volume_from_volume_delete_lock_taken(self):
+        # create source volume
+        src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+        src_vol_id = src_vol['id']
+
+        # no lock
+        self.volume.create_volume(self.context, src_vol_id)
+
+        dst_vol = tests_utils.create_volume(self.context,
+                                            source_volid=src_vol_id,
+                                            **self.volume_params)
+        dst_vol_id = dst_vol['id']
+        admin_ctxt = context.get_admin_context()
+
+        orig_elevated = self.context.elevated
+
+        ctxt_deepcopy = self.context.deepcopy()
+        gthreads = []
+
+        def mock_elevated(*args, **kwargs):
+            # unset mock so it is only called once
+            self.stubs.Set(self.context, 'elevated', orig_elevated)
+
+            # we expect this to block and then fail
+            t = eventlet.spawn(self.volume.create_volume,
+                               ctxt_deepcopy,
+                               volume_id=dst_vol_id, source_volid=src_vol_id)
+            gthreads.append(t)
+
+            return orig_elevated(*args, **kwargs)
+
+        # mock something from early on in the delete operation and within the
+        # lock so that when we do the create we expect it to block.
+        self.stubs.Set(self.context, 'elevated', mock_elevated)
+
+        # locked
+        self.volume.delete_volume(self.context, src_vol_id)
+
+        # we expect the volume create to fail with the following err since the
+        # source volume was deleted while the create was locked. Note that the
+        # volume is still in the db since it was created by the test prior to
+        # calling manager.create_volume.
+        self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)
+
+    def test_create_volume_from_snapshot_delete_lock_taken(self):
+        # create source volume
+        src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+        src_vol_id = src_vol['id']
+
+        # no lock
+        self.volume.create_volume(self.context, src_vol_id)
+
+        # create snapshot
+        snap_id = self._create_snapshot(src_vol_id)['id']
+        # no lock
+        self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+
+        # create vol from snapshot...
+        dst_vol = tests_utils.create_volume(self.context,
+                                            source_volid=src_vol_id,
+                                            **self.volume_params)
+        dst_vol_id = dst_vol['id']
+        admin_ctxt = context.get_admin_context()
+
+        orig_elevated = self.context.elevated
+
+        ctxt_deepcopy = self.context.deepcopy()
+        gthreads = []
+
+        def mock_elevated(*args, **kwargs):
+            # unset mock so it is only called once
+            self.stubs.Set(self.context, 'elevated', orig_elevated)
+
+            # We expect this to block and then fail
+            t = eventlet.spawn(self.volume.create_volume, ctxt_deepcopy,
+                               volume_id=dst_vol_id, snapshot_id=snap_id)
+            gthreads.append(t)
+
+            return orig_elevated(*args, **kwargs)
+
+        # mock something from early on in the delete operation and within the
+        # lock so that when we do the create we expect it to block.
+        self.stubs.Set(self.context, 'elevated', mock_elevated)
+
+        # locked
+        self.volume.delete_snapshot(self.context, snap_id)
+
+        # we expect the volume create to fail with the following err since the
+        # snapshot was deleted while the create was locked. Note that the
+        # volume is still in the db since it was created by the test prior to
+        #  calling manager.create_volume.
+        self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait)
+
+        # locked
+        self.volume.delete_volume(self.context, src_vol_id)
+        # make sure it is gone
+        self.assertRaises(exception.VolumeNotFound, db.volume_get,
+                          self.context, src_vol_id)
+
     def test_create_volume_from_snapshot_with_encryption(self):
         """Test volume can be created from a snapshot of
         an encrypted volume.
index 38ccd5be24f7fe1ba21cca0da1ed7e58a6c5c459..a0f4e071d5a558d3a9cb7e65029ae91632a48d02 100644 (file)
@@ -138,6 +138,49 @@ MAPPING = {
     'cinder.volume.drivers.huawei.HuaweiVolumeDriver'}
 
 
+def locked_volume_operation(f):
+    """Lock decorator for volume operations.
+
+    Takes a named lock prior to executing the operation. The lock is named with
+    the operation executed and the id of the volume. This lock can then be used
+    by other operations to avoid operation conflicts on shared volumes.
+
+    Example use:
+
+    If a volume operation uses this decorator, it will block until the named
+    lock is free. This is used to protect concurrent operations on the same
+    volume e.g. delete VolA while create volume VolB from VolA is in progress.
+    """
+    def lvo_inner1(inst, context, volume_id, **kwargs):
+        @utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
+        def lvo_inner2(*_args, **_kwargs):
+            return f(*_args, **_kwargs)
+        return lvo_inner2(inst, context, volume_id, **kwargs)
+    return lvo_inner1
+
+
+def locked_snapshot_operation(f):
+    """Lock decorator for snapshot operations.
+
+    Takes a named lock prior to executing the operation. The lock is named with
+    the operation executed and the id of the snapshot. This lock can then be
+    used by other operations to avoid operation conflicts on shared snapshots.
+
+    Example use:
+
+    If a snapshot operation uses this decorator, it will block until the named
+    lock is free. This is used to protect concurrent operations on the same
+    snapshot e.g. delete SnapA while create volume VolA from SnapA is in
+    progress.
+    """
+    def lso_inner1(inst, context, snapshot_id, **kwargs):
+        @utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True)
+        def lso_inner2(*_args, **_kwargs):
+            return f(*_args, **_kwargs)
+        return lso_inner2(inst, context, snapshot_id, **kwargs)
+    return lso_inner1
+
+
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
@@ -264,15 +307,36 @@ class VolumeManager(manager.SchedulerDependentManager):
 
         assert flow, _('Manager volume flow not retrieved')
 
-        flow.run(context.elevated())
-        if flow.state != states.SUCCESS:
-            raise exception.CinderException(_("Failed to successfully complete"
-                                              " manager volume workflow"))
+        if snapshot_id is not None:
+            # Make sure the snapshot is not deleted until we are done with it.
+            locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
+        elif source_volid is not None:
+            # Make sure the volume is not deleted until we are done with it.
+            locked_action = "%s-%s" % (source_volid, 'delete_volume')
+        else:
+            locked_action = None
+
+        def _run_flow():
+            flow.run(context.elevated())
+            if flow.state != states.SUCCESS:
+                msg = _("Failed to successfully complete manager volume "
+                        "workflow")
+                raise exception.CinderException(msg)
+
+        @utils.synchronized(locked_action, external=True)
+        def _run_flow_locked():
+            _run_flow()
+
+        if locked_action is None:
+            _run_flow()
+        else:
+            _run_flow_locked()
 
         self._reset_stats()
         return volume_id
 
     @utils.require_driver_initialized
+    @locked_volume_operation
     def delete_volume(self, context, volume_id):
         """Deletes and unexports volume."""
         context = context.elevated()
@@ -401,6 +465,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         return snapshot_id
 
     @utils.require_driver_initialized
+    @locked_snapshot_operation
     def delete_snapshot(self, context, snapshot_id):
         """Deletes and unexports snapshot."""
         caller_context = context