import eventlet
import mock
import mox
+from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import importutils
self.assertEqual(volume['host'], 'newhost')
self.assertIsNone(volume['migration_status'])
+ def test_migrate_volume_error(self):
+ def fake_create_volume(ctxt, volume, host, req_spec, filters,
+ allow_reschedule=True):
+ db.volume_update(ctxt, volume['id'],
+ {'status': 'available'})
+
+ with mock.patch.object(self.volume.driver, 'migrate_volume') as \
+ mock_migrate,\
+ mock.patch.object(self.volume.driver, 'create_export') as \
+ mock_create_export:
+
+ # Exception case at self.driver.migrate_volume and create_export
+ mock_migrate.side_effect = processutils.ProcessExecutionError
+ mock_create_export.side_effect = processutils.ProcessExecutionError
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host)
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.assertRaises(processutils.ProcessExecutionError,
+ self.volume.migrate_volume,
+ self.context,
+ volume['id'],
+ host_obj,
+ False)
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertIsNone(volume['migration_status'])
+ self.assertEqual('available', volume['status'])
+
def test_migrate_volume_generic(self):
def fake_migr(vol, host):
raise Exception('should not be called')
self.assertEqual(volume['status'], 'available')
self.assertEqual(error_logs, [])
+ def test_migrate_volume_generic_copy_error(self):
+ def fake_create_volume(ctxt, volume, host, req_spec, filters,
+ allow_reschedule=True):
+ db.volume_update(ctxt, volume['id'],
+ {'status': 'available'})
+
+ with mock.patch.object(self.volume.driver, 'migrate_volume'),\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
+ as mock_create_volume,\
+ mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+ mock_copy_volume,\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
+ mock.patch.object(self.volume, 'migrate_volume_completion'),\
+ mock.patch.object(self.volume.driver, 'create_export'):
+
+ # Exception case at migrate_volume_generic
+ # source_volume['migration_status'] is 'migrating'
+ mock_create_volume.side_effect = fake_create_volume
+ mock_copy_volume.side_effect = processutils.ProcessExecutionError
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host)
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.assertRaises(processutils.ProcessExecutionError,
+ self.volume.migrate_volume,
+ self.context,
+ volume['id'],
+ host_obj,
+ True)
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertIsNone(volume['migration_status'])
+ self.assertEqual('available', volume['status'])
+
+ def test_migrate_volume_generic_create_export_error(self):
+ def fake_create_volume(ctxt, volume, host, req_spec, filters,
+ allow_reschedule=True):
+ db.volume_update(ctxt, volume['id'],
+ {'status': 'available'})
+
+ with mock.patch.object(self.volume.driver, 'migrate_volume'),\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
+ as mock_create_volume,\
+ mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+ mock_copy_volume,\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
+ mock.patch.object(self.volume, 'migrate_volume_completion'),\
+ mock.patch.object(self.volume.driver, 'create_export') as \
+ mock_create_export:
+
+ # Exception case at create_export
+ mock_create_volume.side_effect = fake_create_volume
+ mock_copy_volume.side_effect = processutils.ProcessExecutionError
+ mock_create_export.side_effect = processutils.ProcessExecutionError
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host)
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.assertRaises(processutils.ProcessExecutionError,
+ self.volume.migrate_volume,
+ self.context,
+ volume['id'],
+ host_obj,
+ True)
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertIsNone(volume['migration_status'])
+ self.assertEqual('available', volume['status'])
+
+ def test_migrate_volume_generic_migrate_volume_completion_error(self):
+ def fake_create_volume(ctxt, volume, host, req_spec, filters,
+ allow_reschedule=True):
+ db.volume_update(ctxt, volume['id'],
+ {'status': 'available'})
+
+ def fake_migrate_volume_completion(ctxt, volume_id, new_volume_id,
+ error=False):
+ db.volume_update(ctxt, volume['id'],
+ {'migration_status': 'completing'})
+ raise processutils.ProcessExecutionError
+
+ with mock.patch.object(self.volume.driver, 'migrate_volume'),\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
+ as mock_create_volume,\
+ mock.patch.object(self.volume.driver, 'copy_volume_data'),\
+ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
+ mock.patch.object(self.volume, 'migrate_volume_completion')\
+ as mock_migrate_compl,\
+ mock.patch.object(self.volume.driver, 'create_export'):
+
+ # Exception case at delete_volume
+ # source_volume['migration_status'] is 'completing'
+ mock_create_volume.side_effect = fake_create_volume
+ mock_migrate_compl.side_effect = fake_migrate_volume_completion
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host)
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.assertRaises(processutils.ProcessExecutionError,
+ self.volume.migrate_volume,
+ self.context,
+ volume['id'],
+ host_obj,
+ True)
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertIsNone(volume['migration_status'])
+ self.assertEqual('available', volume['status'])
+
def _test_migrate_volume_completion(self, status='available',
instance_uuid=None, attached_host=None,
retyping=False):
@locked_volume_operation
def delete_volume(self, context, volume_id, unmanage_only=False):
- """Deletes and unexports volume."""
+ """Deletes and unexports volume.
+
+ 1. Delete a volume(normal case)
+ Delete a volume and update quotas.
+
+ 2. Delete a migration source volume
+ If deleting the source volume in a migration, we want to skip
+ quotas. Also we want to skip other database updates for source
+ volume because these update will be handled at
+ migrate_volume_completion properly.
+
+ 3. Delete a migration destination volume
+ If deleting the destination volume in a migration, we want to
+ skip quotas but we need database updates for the volume.
+ """
+
context = context.elevated()
try:
volume_ref['id'],
{'status': 'error_deleting'})
- # If deleting the source volume in a migration, we want to skip quotas
- # and other database updates.
- if volume_ref['migration_status']:
- return True
+ is_migrating = volume_ref['migration_status'] is not None
+ is_migrating_dest = (is_migrating and
+ volume_ref['migration_status'].startswith(
+ 'target:'))
- # Get reservations
- try:
- reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
- QUOTAS.add_volume_type_opts(context,
- reserve_opts,
- volume_ref.get('volume_type_id'))
- reservations = QUOTAS.reserve(context,
- project_id=project_id,
- **reserve_opts)
- except Exception:
- reservations = None
- LOG.exception(_LE("Failed to update usages deleting volume"))
+ # If deleting source/destination volume in a migration, we should
+ # skip quotas.
+ if not is_migrating:
+ # Get reservations
+ try:
+ reserve_opts = {'volumes': -1,
+ 'gigabytes': -volume_ref['size']}
+ QUOTAS.add_volume_type_opts(context,
+ reserve_opts,
+ volume_ref.get('volume_type_id'))
+ reservations = QUOTAS.reserve(context,
+ project_id=project_id,
+ **reserve_opts)
+ except Exception:
+ reservations = None
+ LOG.exception(_LE("Failed to update usages deleting volume"))
+
+ # If deleting the source volume in a migration, we should skip database
+ # update here. In other cases, continue to update database entries.
+ if not is_migrating or is_migrating_dest:
- # Delete glance metadata if it exists
- self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
+ # Delete glance metadata if it exists
+ self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
- self.db.volume_destroy(context, volume_id)
- LOG.info(_LI("volume %s: deleted successfully"), volume_ref['id'])
- self._notify_about_volume_usage(context, volume_ref, "delete.end")
+ self.db.volume_destroy(context, volume_id)
+ LOG.info(_LI("volume %s: deleted successfully"), volume_ref['id'])
- # Commit the reservations
- if reservations:
- QUOTAS.commit(context, reservations, project_id=project_id)
+ # If deleting source/destination volume in a migration, we should
+ # skip quotas.
+ if not is_migrating:
+ self._notify_about_volume_usage(context, volume_ref, "delete.end")
- pool = vol_utils.extract_host(volume_ref['host'], 'pool')
- if pool is None:
- # Legacy volume, put them into default pool
- pool = self.driver.configuration.safe_get(
- 'volume_backend_name') or vol_utils.extract_host(
- volume_ref['host'], 'pool', True)
- size = volume_ref['size']
+ # Commit the reservations
+ if reservations:
+ QUOTAS.commit(context, reservations, project_id=project_id)
- try:
- self.stats['pools'][pool]['allocated_capacity_gb'] -= size
- except KeyError:
- self.stats['pools'][pool] = dict(
- allocated_capacity_gb=-size)
+ pool = vol_utils.extract_host(volume_ref['host'], 'pool')
+ if pool is None:
+ # Legacy volume, put them into default pool
+ pool = self.driver.configuration.safe_get(
+ 'volume_backend_name') or vol_utils.extract_host(
+ volume_ref['host'], 'pool', True)
+ size = volume_ref['size']
- self.publish_service_capabilities(context)
+ try:
+ self.stats['pools'][pool]['allocated_capacity_gb'] -= size
+ except KeyError:
+ self.stats['pools'][pool] = dict(
+ allocated_capacity_gb=-size)
+
+ self.publish_service_capabilities(context)
return True
LOG.error(msg % {'vol1': volume['id'],
'vol2': new_volume['id']})
volume = self.db.volume_get(ctxt, volume['id'])
- # If we're in the completing phase don't delete the target
- # because we may have already deleted the source!
+
+ # If we're in the migrating phase, we need to cleanup
+ # destination volume because source volume is remaining
if volume['migration_status'] == 'migrating':
rpcapi.delete_volume(ctxt, new_volume)
- new_volume['migration_status'] = None
+ else:
+ # If we're in the completing phase don't delete the
+ # destination because we may have already deleted the
+ # source! But the migration_status in database should
+ # be cleared to handle volume after migration failure
+ try:
+ updates = {'migration_status': None}
+ self.db.volume_update(ctxt, new_volume['id'], updates)
+ except exception.VolumeNotFound:
+ LOG.info(_LI("Couldn't find destination volume "
+ "%(vol)s in database. The entry might be "
+ "successfully deleted during migration "
+ "completion phase."),
+ {'vol': new_volume['id']})
+
+ LOG.warn(_LW("Failed to migrate volume. The destination "
+ "volume %(vol)s is not deleted since the "
+ "source volume may have already deleted."),
+ {'vol': new_volume['id']})
def _get_original_status(self, volume):
if (volume['instance_uuid'] is None and
"for volume %(vol1)s (temporary volume %(vol2)s")
LOG.info(msg % {'vol1': volume['id'],
'vol2': new_volume['id']})
- new_volume['migration_status'] = None
rpcapi.delete_volume(ctxt, new_volume)
updates = {'migration_status': None, 'status': orig_volume_status}
self.db.volume_update(ctxt, volume_id, updates)
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
- model_update = self.driver.create_export(ctxt, volume_ref)
- if model_update:
- updates.update(model_update)
- self.db.volume_update(ctxt, volume_ref['id'], updates)
+ try:
+ model_update = self.driver.create_export(ctxt,
+ volume_ref)
+ if model_update:
+ updates.update(model_update)
+ except Exception:
+ LOG.exception(_LE("Failed to create export for "
+ "volume: %s"), volume_ref['id'])
+ finally:
+ self.db.volume_update(ctxt, volume_ref['id'], updates)
if not moved:
try:
self._migrate_volume_generic(ctxt, volume_ref, host,
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
- model_update = self.driver.create_export(ctxt, volume_ref)
- if model_update:
- updates.update(model_update)
- self.db.volume_update(ctxt, volume_ref['id'], updates)
+ try:
+ model_update = self.driver.create_export(ctxt,
+ volume_ref)
+ if model_update:
+ updates.update(model_update)
+ except Exception:
+ LOG.exception(_LE("Failed to create export for "
+ "volume: %s"), volume_ref['id'])
+ finally:
+ self.db.volume_update(ctxt, volume_ref['id'], updates)
@periodic_task.periodic_task
def _report_driver_status(self, context):