from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
+from cinder.taskflow.patterns import linear_flow
from cinder import test
from cinder.tests.brick.fake_lvm import FakeBrickLVM
from cinder.tests import conf_fixture
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
+import eventlet
+
QUOTAS = quota.QUOTAS
CONF = cfg.CONF
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
+ # keep ordered record of what we execute
+ self.called = []
def tearDown(self):
try:
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
+ def _mock_synchronized(self, name, *s_args, **s_kwargs):
+ def inner_sync1(f):
+ def inner_sync2(*args, **kwargs):
+ self.called.append('lock-%s' % (name))
+ ret = f(*args, **kwargs)
+ self.called.append('unlock-%s' % (name))
+ return ret
+ return inner_sync2
+ return inner_sync1
+
+ def test_create_volume_from_snapshot_check_locks(self):
+ # mock the synchroniser so we can record events
+ self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
+
+ self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
+ lambda *args, **kwargs: None)
+
+ orig_flow = linear_flow.Flow.run
+
+ def mock_flow_run(*args, **kwargs):
+ # ensure the lock has been taken
+ self.assertEqual(len(self.called), 1)
+ # now proceed with the flow.
+ ret = orig_flow(*args, **kwargs)
+ return ret
+
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ # no lock
+ self.volume.create_volume(self.context, src_vol_id)
+
+ snap_id = self._create_snapshot(src_vol_id)['id']
+ # no lock
+ self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+
+ dst_vol = tests_utils.create_volume(self.context,
+ snapshot_id=snap_id,
+ **self.volume_params)
+ dst_vol_id = dst_vol['id']
+ admin_ctxt = context.get_admin_context()
+
+ # mock the flow runner so we can do some checks
+ self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+
+ # locked
+ self.volume.create_volume(self.context, volume_id=dst_vol_id,
+ snapshot_id=snap_id)
+ self.assertEqual(len(self.called), 2)
+ self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
+ self.assertEqual(snap_id,
+ db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)
+
+ # locked
+ self.volume.delete_volume(self.context, dst_vol_id)
+ self.assertEqual(len(self.called), 4)
+
+ # locked
+ self.volume.delete_snapshot(self.context, snap_id)
+ self.assertEqual(len(self.called), 6)
+
+ # locked
+ self.volume.delete_volume(self.context, src_vol_id)
+ self.assertEqual(len(self.called), 8)
+
+ self.assertEqual(self.called,
+ ['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
+ 'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
+ 'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+ 'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+ 'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
+ 'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
+ 'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+ 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+
+ def test_create_volume_from_volume_check_locks(self):
+ # mock the synchroniser so we can record events
+ self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
+
+ orig_flow = linear_flow.Flow.run
+
+ def mock_flow_run(*args, **kwargs):
+ # ensure the lock has been taken
+ self.assertEqual(len(self.called), 1)
+ # now proceed with the flow.
+ ret = orig_flow(*args, **kwargs)
+ return ret
+
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ # no lock
+ self.volume.create_volume(self.context, src_vol_id)
+
+ dst_vol = tests_utils.create_volume(self.context,
+ source_volid=src_vol_id,
+ **self.volume_params)
+ dst_vol_id = dst_vol['id']
+ admin_ctxt = context.get_admin_context()
+
+ # mock the flow runner so we can do some checks
+ self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
+
+ # locked
+ self.volume.create_volume(self.context, volume_id=dst_vol_id,
+ source_volid=src_vol_id)
+ self.assertEqual(len(self.called), 2)
+ self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
+ self.assertEqual(src_vol_id,
+ db.volume_get(admin_ctxt, dst_vol_id).source_volid)
+
+ # locked
+ self.volume.delete_volume(self.context, dst_vol_id)
+ self.assertEqual(len(self.called), 4)
+
+ # locked
+ self.volume.delete_volume(self.context, src_vol_id)
+ self.assertEqual(len(self.called), 6)
+
+ self.assertEqual(self.called,
+ ['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+ 'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
+ 'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+ 'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
+ 'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+ 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+
+ def test_create_volume_from_volume_delete_lock_taken(self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ # no lock
+ self.volume.create_volume(self.context, src_vol_id)
+
+ dst_vol = tests_utils.create_volume(self.context,
+ source_volid=src_vol_id,
+ **self.volume_params)
+ dst_vol_id = dst_vol['id']
+ admin_ctxt = context.get_admin_context()
+
+ orig_elevated = self.context.elevated
+
+ ctxt_deepcopy = self.context.deepcopy()
+ gthreads = []
+
+ def mock_elevated(*args, **kwargs):
+ # unset mock so it is only called once
+ self.stubs.Set(self.context, 'elevated', orig_elevated)
+
+ # we expect this to block and then fail
+ t = eventlet.spawn(self.volume.create_volume,
+ ctxt_deepcopy,
+ volume_id=dst_vol_id, source_volid=src_vol_id)
+ gthreads.append(t)
+
+ return orig_elevated(*args, **kwargs)
+
+ # mock something from early on in the delete operation and within the
+ # lock so that when we do the create we expect it to block.
+ self.stubs.Set(self.context, 'elevated', mock_elevated)
+
+ # locked
+ self.volume.delete_volume(self.context, src_vol_id)
+
+ # we expect the volume create to fail with the following err since the
+ # source volume was deleted while the create was locked. Note that the
+ # volume is still in the db since it was created by the test prior to
+ # calling manager.create_volume.
+ self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)
+
+ def test_create_volume_from_snapshot_delete_lock_taken(self):
+ # create source volume
+ src_vol = tests_utils.create_volume(self.context, **self.volume_params)
+ src_vol_id = src_vol['id']
+
+ # no lock
+ self.volume.create_volume(self.context, src_vol_id)
+
+ # create snapshot
+ snap_id = self._create_snapshot(src_vol_id)['id']
+ # no lock
+ self.volume.create_snapshot(self.context, src_vol_id, snap_id)
+
+ # create vol from snapshot...
+ dst_vol = tests_utils.create_volume(self.context,
+ source_volid=src_vol_id,
+ **self.volume_params)
+ dst_vol_id = dst_vol['id']
+ admin_ctxt = context.get_admin_context()
+
+ orig_elevated = self.context.elevated
+
+ ctxt_deepcopy = self.context.deepcopy()
+ gthreads = []
+
+ def mock_elevated(*args, **kwargs):
+ # unset mock so it is only called once
+ self.stubs.Set(self.context, 'elevated', orig_elevated)
+
+ # We expect this to block and then fail
+ t = eventlet.spawn(self.volume.create_volume, ctxt_deepcopy,
+ volume_id=dst_vol_id, snapshot_id=snap_id)
+ gthreads.append(t)
+
+ return orig_elevated(*args, **kwargs)
+
+ # mock something from early on in the delete operation and within the
+ # lock so that when we do the create we expect it to block.
+ self.stubs.Set(self.context, 'elevated', mock_elevated)
+
+ # locked
+ self.volume.delete_snapshot(self.context, snap_id)
+
+ # we expect the volume create to fail with the following err since the
+ # snapshot was deleted while the create was locked. Note that the
+ # volume is still in the db since it was created by the test prior to
+ # calling manager.create_volume.
+ self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait)
+
+ # locked
+ self.volume.delete_volume(self.context, src_vol_id)
+ # make sure it is gone
+ self.assertRaises(exception.VolumeNotFound, db.volume_get,
+ self.context, src_vol_id)
+
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of
an encrypted volume.
'cinder.volume.drivers.huawei.HuaweiVolumeDriver'}
+def locked_volume_operation(f):
+ """Lock decorator for volume operations.
+
+ Takes a named lock prior to executing the operation. The lock is named with
+ the operation executed and the id of the volume. This lock can then be used
+ by other operations to avoid operation conflicts on shared volumes.
+
+ Example use:
+
+ If a volume operation uses this decorator, it will block until the named
+ lock is free. This is used to protect concurrent operations on the same
+ volume e.g. delete VolA while create volume VolB from VolA is in progress.
+ """
+ def lvo_inner1(inst, context, volume_id, **kwargs):
+ @utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
+ def lvo_inner2(*_args, **_kwargs):
+ return f(*_args, **_kwargs)
+ return lvo_inner2(inst, context, volume_id, **kwargs)
+ return lvo_inner1
+
+
+def locked_snapshot_operation(f):
+ """Lock decorator for snapshot operations.
+
+ Takes a named lock prior to executing the operation. The lock is named with
+ the operation executed and the id of the snapshot. This lock can then be
+ used by other operations to avoid operation conflicts on shared snapshots.
+
+ Example use:
+
+ If a snapshot operation uses this decorator, it will block until the named
+ lock is free. This is used to protect concurrent operations on the same
+ snapshot e.g. delete SnapA while create volume VolA from SnapA is in
+ progress.
+ """
+ def lso_inner1(inst, context, snapshot_id, **kwargs):
+ @utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True)
+ def lso_inner2(*_args, **_kwargs):
+ return f(*_args, **_kwargs)
+ return lso_inner2(inst, context, snapshot_id, **kwargs)
+ return lso_inner1
+
+
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
assert flow, _('Manager volume flow not retrieved')
- flow.run(context.elevated())
- if flow.state != states.SUCCESS:
- raise exception.CinderException(_("Failed to successfully complete"
- " manager volume workflow"))
+ if snapshot_id is not None:
+ # Make sure the snapshot is not deleted until we are done with it.
+ locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
+ elif source_volid is not None:
+ # Make sure the volume is not deleted until we are done with it.
+ locked_action = "%s-%s" % (source_volid, 'delete_volume')
+ else:
+ locked_action = None
+
+ def _run_flow():
+ flow.run(context.elevated())
+ if flow.state != states.SUCCESS:
+ msg = _("Failed to successfully complete manager volume "
+ "workflow")
+ raise exception.CinderException(msg)
+
+ @utils.synchronized(locked_action, external=True)
+ def _run_flow_locked():
+ _run_flow()
+
+ if locked_action is None:
+ _run_flow()
+ else:
+ _run_flow_locked()
self._reset_stats()
return volume_id
@utils.require_driver_initialized
+ @locked_volume_operation
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
return snapshot_id
@utils.require_driver_initialized
+ @locked_snapshot_operation
def delete_snapshot(self, context, snapshot_id):
"""Deletes and unexports snapshot."""
caller_context = context