From: Edward Hope-Morley Date: Thu, 14 Nov 2013 19:00:00 +0000 (+0000) Subject: Adds lock for create from vol/snap to avoid race conditions X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=4f6e5fcc252799e2b9207b6ef2b58b52a7a93563;p=openstack-build%2Fcinder-build.git Adds lock for create from vol/snap to avoid race conditions This patch protects create from volume/snapshot by using a lockfile to protect the operation from concurrent deletes of the volume/snapshot used in the create operation. Currently, if a volume/snapshot is deleted while a volume is being created from it that delete may complete during the create operation thus leaving the new volume in error or stuck state. This lock will ensure that: (a) if a create of VolA from snap/volB is in progress, any delete requests for snap/volB will wait until the create is complete. (b) if a delete of snap/volA is in progress, any create from snap/volA will wait until snap/volA delete is complete. Co-authored-by: Takashi Natsume Closes-Bug: 1251334 Change-Id: Ie4bc0af789ab232593f55aa2f6b34345eb9b9929 --- diff --git a/cinder/tests/test_volume.py b/cinder/tests/test_volume.py index 06721dc1d..daad21207 100644 --- a/cinder/tests/test_volume.py +++ b/cinder/tests/test_volume.py @@ -46,6 +46,7 @@ from cinder.openstack.common.notifier import test_notifier from cinder.openstack.common import rpc import cinder.policy from cinder import quota +from cinder.taskflow.patterns import linear_flow from cinder import test from cinder.tests.brick.fake_lvm import FakeBrickLVM from cinder.tests import conf_fixture @@ -62,6 +63,8 @@ from cinder.volume.flows import create_volume from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volutils +import eventlet + QUOTAS = quota.QUOTAS CONF = cfg.CONF @@ -110,6 +113,8 @@ class BaseVolumeTestCase(test.TestCase): self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True) self.stubs.Set(os.path, 'exists', lambda x: True) self.volume.driver.set_initialized() + # keep ordered record of what we execute + self.called = [] def tearDown(self): try: @@ -443,6 +448,234 @@ class VolumeTestCase(BaseVolumeTestCase): self.volume.delete_snapshot(self.context, snapshot_id) self.volume.delete_volume(self.context, volume_src['id']) + def _mock_synchronized(self, name, *s_args, **s_kwargs): + def inner_sync1(f): + def inner_sync2(*args, **kwargs): + self.called.append('lock-%s' % (name)) + ret = f(*args, **kwargs) + self.called.append('unlock-%s' % (name)) + return ret + return inner_sync2 + return inner_sync1 + + def test_create_volume_from_snapshot_check_locks(self): + # mock the synchroniser so we can record events + self.stubs.Set(utils, 'synchronized', self._mock_synchronized) + + self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot', + lambda *args, **kwargs: None) + + orig_flow = linear_flow.Flow.run + + def mock_flow_run(*args, **kwargs): + # ensure the lock has been taken + self.assertEqual(len(self.called), 1) + # now proceed with the flow. + ret = orig_flow(*args, **kwargs) + return ret + + # create source volume + src_vol = tests_utils.create_volume(self.context, **self.volume_params) + src_vol_id = src_vol['id'] + + # no lock + self.volume.create_volume(self.context, src_vol_id) + + snap_id = self._create_snapshot(src_vol_id)['id'] + # no lock + self.volume.create_snapshot(self.context, src_vol_id, snap_id) + + dst_vol = tests_utils.create_volume(self.context, + snapshot_id=snap_id, + **self.volume_params) + dst_vol_id = dst_vol['id'] + admin_ctxt = context.get_admin_context() + + # mock the flow runner so we can do some checks + self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run) + + # locked + self.volume.create_volume(self.context, volume_id=dst_vol_id, + snapshot_id=snap_id) + self.assertEqual(len(self.called), 2) + self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id) + self.assertEqual(snap_id, + db.volume_get(admin_ctxt, dst_vol_id).snapshot_id) + + # locked + self.volume.delete_volume(self.context, dst_vol_id) + self.assertEqual(len(self.called), 4) + + # locked + self.volume.delete_snapshot(self.context, snap_id) + self.assertEqual(len(self.called), 6) + + # locked + self.volume.delete_volume(self.context, src_vol_id) + self.assertEqual(len(self.called), 8) + + self.assertEqual(self.called, + ['lock-%s' % ('%s-delete_snapshot' % (snap_id)), + 'unlock-%s' % ('%s-delete_snapshot' % (snap_id)), + 'lock-%s' % ('%s-delete_volume' % (dst_vol_id)), + 'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)), + 'lock-%s' % ('%s-delete_snapshot' % (snap_id)), + 'unlock-%s' % ('%s-delete_snapshot' % (snap_id)), + 'lock-%s' % ('%s-delete_volume' % (src_vol_id)), + 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))]) + + def test_create_volume_from_volume_check_locks(self): + # mock the synchroniser so we can record events + self.stubs.Set(utils, 'synchronized', self._mock_synchronized) + + orig_flow = linear_flow.Flow.run + + def mock_flow_run(*args, **kwargs): + # ensure the lock has been taken + self.assertEqual(len(self.called), 1) + # now proceed with the flow. + ret = orig_flow(*args, **kwargs) + return ret + + # create source volume + src_vol = tests_utils.create_volume(self.context, **self.volume_params) + src_vol_id = src_vol['id'] + + # no lock + self.volume.create_volume(self.context, src_vol_id) + + dst_vol = tests_utils.create_volume(self.context, + source_volid=src_vol_id, + **self.volume_params) + dst_vol_id = dst_vol['id'] + admin_ctxt = context.get_admin_context() + + # mock the flow runner so we can do some checks + self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run) + + # locked + self.volume.create_volume(self.context, volume_id=dst_vol_id, + source_volid=src_vol_id) + self.assertEqual(len(self.called), 2) + self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id) + self.assertEqual(src_vol_id, + db.volume_get(admin_ctxt, dst_vol_id).source_volid) + + # locked + self.volume.delete_volume(self.context, dst_vol_id) + self.assertEqual(len(self.called), 4) + + # locked + self.volume.delete_volume(self.context, src_vol_id) + self.assertEqual(len(self.called), 6) + + self.assertEqual(self.called, + ['lock-%s' % ('%s-delete_volume' % (src_vol_id)), + 'unlock-%s' % ('%s-delete_volume' % (src_vol_id)), + 'lock-%s' % ('%s-delete_volume' % (dst_vol_id)), + 'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)), + 'lock-%s' % ('%s-delete_volume' % (src_vol_id)), + 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))]) + + def test_create_volume_from_volume_delete_lock_taken(self): + # create source volume + src_vol = tests_utils.create_volume(self.context, **self.volume_params) + src_vol_id = src_vol['id'] + + # no lock + self.volume.create_volume(self.context, src_vol_id) + + dst_vol = tests_utils.create_volume(self.context, + source_volid=src_vol_id, + **self.volume_params) + dst_vol_id = dst_vol['id'] + admin_ctxt = context.get_admin_context() + + orig_elevated = self.context.elevated + + ctxt_deepcopy = self.context.deepcopy() + gthreads = [] + + def mock_elevated(*args, **kwargs): + # unset mock so it is only called once + self.stubs.Set(self.context, 'elevated', orig_elevated) + + # we expect this to block and then fail + t = eventlet.spawn(self.volume.create_volume, + ctxt_deepcopy, + volume_id=dst_vol_id, source_volid=src_vol_id) + gthreads.append(t) + + return orig_elevated(*args, **kwargs) + + # mock something from early on in the delete operation and within the + # lock so that when we do the create we expect it to block. + self.stubs.Set(self.context, 'elevated', mock_elevated) + + # locked + self.volume.delete_volume(self.context, src_vol_id) + + # we expect the volume create to fail with the following err since the + # source volume was deleted while the create was locked. Note that the + # volume is still in the db since it was created by the test prior to + # calling manager.create_volume. + self.assertRaises(exception.VolumeNotFound, gthreads[0].wait) + + def test_create_volume_from_snapshot_delete_lock_taken(self): + # create source volume + src_vol = tests_utils.create_volume(self.context, **self.volume_params) + src_vol_id = src_vol['id'] + + # no lock + self.volume.create_volume(self.context, src_vol_id) + + # create snapshot + snap_id = self._create_snapshot(src_vol_id)['id'] + # no lock + self.volume.create_snapshot(self.context, src_vol_id, snap_id) + + # create vol from snapshot... + dst_vol = tests_utils.create_volume(self.context, + source_volid=src_vol_id, + **self.volume_params) + dst_vol_id = dst_vol['id'] + admin_ctxt = context.get_admin_context() + + orig_elevated = self.context.elevated + + ctxt_deepcopy = self.context.deepcopy() + gthreads = [] + + def mock_elevated(*args, **kwargs): + # unset mock so it is only called once + self.stubs.Set(self.context, 'elevated', orig_elevated) + + # We expect this to block and then fail + t = eventlet.spawn(self.volume.create_volume, ctxt_deepcopy, + volume_id=dst_vol_id, snapshot_id=snap_id) + gthreads.append(t) + + return orig_elevated(*args, **kwargs) + + # mock something from early on in the delete operation and within the + # lock so that when we do the create we expect it to block. + self.stubs.Set(self.context, 'elevated', mock_elevated) + + # locked + self.volume.delete_snapshot(self.context, snap_id) + + # we expect the volume create to fail with the following err since the + # snapshot was deleted while the create was locked. Note that the + # volume is still in the db since it was created by the test prior to + # calling manager.create_volume. + self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait) + + # locked + self.volume.delete_volume(self.context, src_vol_id) + # make sure it is gone + self.assertRaises(exception.VolumeNotFound, db.volume_get, + self.context, src_vol_id) + def test_create_volume_from_snapshot_with_encryption(self): """Test volume can be created from a snapshot of an encrypted volume. diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 38ccd5be2..a0f4e071d 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -138,6 +138,49 @@ MAPPING = { 'cinder.volume.drivers.huawei.HuaweiVolumeDriver'} +def locked_volume_operation(f): + """Lock decorator for volume operations. + + Takes a named lock prior to executing the operation. The lock is named with + the operation executed and the id of the volume. This lock can then be used + by other operations to avoid operation conflicts on shared volumes. + + Example use: + + If a volume operation uses this decorator, it will block until the named + lock is free. This is used to protect concurrent operations on the same + volume e.g. delete VolA while create volume VolB from VolA is in progress. + """ + def lvo_inner1(inst, context, volume_id, **kwargs): + @utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True) + def lvo_inner2(*_args, **_kwargs): + return f(*_args, **_kwargs) + return lvo_inner2(inst, context, volume_id, **kwargs) + return lvo_inner1 + + +def locked_snapshot_operation(f): + """Lock decorator for snapshot operations. + + Takes a named lock prior to executing the operation. The lock is named with + the operation executed and the id of the snapshot. This lock can then be + used by other operations to avoid operation conflicts on shared snapshots. + + Example use: + + If a snapshot operation uses this decorator, it will block until the named + lock is free. This is used to protect concurrent operations on the same + snapshot e.g. delete SnapA while create volume VolA from SnapA is in + progress. + """ + def lso_inner1(inst, context, snapshot_id, **kwargs): + @utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True) + def lso_inner2(*_args, **_kwargs): + return f(*_args, **_kwargs) + return lso_inner2(inst, context, snapshot_id, **kwargs) + return lso_inner1 + + class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" @@ -264,15 +307,36 @@ class VolumeManager(manager.SchedulerDependentManager): assert flow, _('Manager volume flow not retrieved') - flow.run(context.elevated()) - if flow.state != states.SUCCESS: - raise exception.CinderException(_("Failed to successfully complete" - " manager volume workflow")) + if snapshot_id is not None: + # Make sure the snapshot is not deleted until we are done with it. + locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot') + elif source_volid is not None: + # Make sure the volume is not deleted until we are done with it. + locked_action = "%s-%s" % (source_volid, 'delete_volume') + else: + locked_action = None + + def _run_flow(): + flow.run(context.elevated()) + if flow.state != states.SUCCESS: + msg = _("Failed to successfully complete manager volume " + "workflow") + raise exception.CinderException(msg) + + @utils.synchronized(locked_action, external=True) + def _run_flow_locked(): + _run_flow() + + if locked_action is None: + _run_flow() + else: + _run_flow_locked() self._reset_stats() return volume_id @utils.require_driver_initialized + @locked_volume_operation def delete_volume(self, context, volume_id): """Deletes and unexports volume.""" context = context.elevated() @@ -401,6 +465,7 @@ class VolumeManager(manager.SchedulerDependentManager): return snapshot_id @utils.require_driver_initialized + @locked_snapshot_operation def delete_snapshot(self, context, snapshot_id): """Deletes and unexports snapshot.""" caller_context = context