class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
- RPC_API_VERSION = '1.10'
+ RPC_API_VERSION = '1.11'
target = messaging.Target(version=RPC_API_VERSION)
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
- filter_properties=None):
+ filter_properties=None, volume=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is None:
+ # For older clients, mimic the old behavior and look up the
+ # volume by its volume_id.
+ volume = objects.Volume.get_by_id(context, volume_id)
+
def _migrate_volume_set_error(self, context, ex, request_spec):
- volume = db.volume_get(context, request_spec['volume_id'])
if volume.status == 'maintenance':
previous_status = (
volume.previous_status or 'maintenance')
with excutils.save_and_reraise_exception():
_migrate_volume_set_error(self, context, ex, request_spec)
else:
- volume_ref = db.volume_get(context, volume_id)
- volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
+ volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
tgt_host,
force_host_copy)
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
1.10 - Adds support for sending objects over RPC in retype()
+ 1.11 - Adds support for sending objects over RPC in
+ migrate_volume_to_host()
"""
RPC_API_VERSION = '1.0'
def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
force_host_copy=False, request_spec=None,
- filter_properties=None):
-
- cctxt = self.client.prepare(version='1.3')
+ filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
- return cctxt.cast(ctxt, 'migrate_volume_to_host',
- topic=topic,
- volume_id=volume_id,
- host=host,
- force_host_copy=force_host_copy,
- request_spec=request_spec_p,
- filter_properties=filter_properties)
+ msg_args = {'topic': topic, 'volume_id': volume_id,
+ 'host': host, 'force_host_copy': force_host_copy,
+ 'request_spec': request_spec_p,
+ 'filter_properties': filter_properties}
+ if self.client.can_send_version('1.11'):
+ version = '1.11'
+ msg_args['volume'] = volume
+ else:
+ version = '1.3'
+
+ cctxt = self.client.prepare(version=version)
+ return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None, volume=None):
force_host_copy=False):
admin_ctx = context.get_admin_context()
# build request to migrate to host
- req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume': {'host': host,
resp = req.get_response(app())
# verify status
self.assertEqual(expected_status, resp.status_int)
- volume = db.volume_get(admin_ctx, volume['id'])
+ volume = objects.Volume.get_by_id(admin_ctx, volume.id)
return volume
def test_migrate_volume_success(self):
version='1.2')
can_send_version.assert_called_once_with('1.9')
- def test_migrate_volume_to_host(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_migrate_volume_to_host(self, can_send_version):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
+ volume='volume',
+ version='1.11')
+ can_send_version.assert_called_once_with('1.11')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_migrate_volume_to_host_old(self, can_send_version):
+ self._test_scheduler_api('migrate_volume_to_host',
+ rpc_method='cast',
+ topic='topic',
+ volume_id='volume_id',
+ host='host',
+ force_host_copy=True,
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ volume='volume',
version='1.3')
+ can_send_version.assert_called_once_with('1.11')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_clean_temporary_volume(self):
def fake_delete_volume(ctxt, volume):
- db.volume_destroy(ctxt, volume['id'])
+ volume.destroy()
fake_volume = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
+ host=CONF.host,
+ migration_status='migrating')
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
- # Check when the migrated volume is in migration
- db.volume_update(self.context, fake_volume['id'],
- {'migration_status': 'migrating'})
# 1. Only clean the db
- self.volume._clean_temporary_volume(self.context, fake_volume['id'],
- fake_new_volume['id'],
+ self.volume._clean_temporary_volume(self.context, fake_volume,
+ fake_new_volume,
clean_db_only=True)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
- fake_new_volume['id'])
+ fake_new_volume.id)
# 2. Delete the backend storage
fake_new_volume = tests_utils.create_volume(self.context, size=1,
mock_delete_volume:
mock_delete_volume.side_effect = fake_delete_volume
self.volume._clean_temporary_volume(self.context,
- fake_volume['id'],
- fake_new_volume['id'],
+ fake_volume,
+ fake_new_volume,
clean_db_only=False)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
- fake_new_volume['id'])
+ fake_new_volume.id)
# Check when the migrated volume is not in migration
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
- db.volume_update(self.context, fake_volume['id'],
- {'migration_status': 'non-migrating'})
- self.volume._clean_temporary_volume(self.context, fake_volume['id'],
- fake_new_volume['id'])
+ fake_volume.migration_status = 'non-migrating'
+ fake_volume.save()
+ self.volume._clean_temporary_volume(self.context, fake_volume,
+ fake_new_volume)
volume = db.volume_get(context.get_admin_context(),
- fake_new_volume['id'])
- self.assertIsNone(volume['migration_status'])
+ fake_new_volume.id)
+ self.assertIsNone(volume.migration_status)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
host=CONF.host,
migration_status='migrating')
host_obj = {'host': 'newhost', 'capabilities': {}}
- self.volume.migrate_volume(self.context, volume['id'],
- host_obj, False)
+ self.volume.migrate_volume(self.context, volume.id, host_obj, False,
+ volume=volume)
# check volume properties
- volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual('newhost', volume['host'])
- self.assertEqual('success', volume['migration_status'])
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
+ self.assertEqual('newhost', volume.host)
+ self.assertEqual('success', volume.migration_status)
def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
allow_reschedule=True):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- False)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual('error', volume['migration_status'])
- self.assertEqual('available', volume['status'])
+ False,
+ volume=volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
+ self.assertEqual('error', volume.migration_status)
+ self.assertEqual('available', volume.status)
@mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
migrate_volume_completion,
nova_api):
fake_volume_id = 'fake_volume_id'
- fake_new_volume = {'status': 'available', 'id': fake_volume_id}
+ fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
+ fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
+ new_volume_obj = fake_volume.fake_volume_obj(self.context,
+ **fake_new_volume)
host_obj = {'host': 'newhost', 'capabilities': {}}
volume_get.return_value = fake_new_volume
update_server_volume = nova_api.return_value.update_server_volume
self.volume._migrate_volume_generic(self.context, volume,
host_obj, None)
mock_copy_volume.assert_called_with(self.context, volume,
- fake_new_volume,
+ new_volume_obj,
remote='dest')
- migrate_volume_completion.assert_called_with(self.context,
- volume['id'],
- fake_new_volume['id'],
- error=False)
+ migrate_volume_completion.assert_called_with(
+ self.context, volume.id, new_volume_obj.id, error=False)
self.assertFalse(update_server_volume.called)
@mock.patch('cinder.compute.API')
rpc_delete_volume,
update_migrated_volume):
fake_volume = tests_utils.create_volume(self.context, size=1,
+ previous_status='available',
host=CONF.host)
host_obj = {'host': 'newhost', 'capabilities': {}}
mock.patch.object(self.volume.driver, 'delete_volume') as \
delete_volume:
create_volume.side_effect = self._fake_create_volume
- self.volume.migrate_volume(self.context, fake_volume['id'],
- host_obj, True)
- volume = db.volume_get(context.get_admin_context(),
- fake_volume['id'])
- self.assertEqual('newhost', volume['host'])
- self.assertEqual('success', volume['migration_status'])
+ self.volume.migrate_volume(self.context, fake_volume.id,
+ host_obj, True, volume=fake_volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ fake_volume.id)
+ self.assertEqual('newhost', volume.host)
+ self.assertEqual('success', volume.migration_status)
self.assertFalse(mock_migrate_volume.called)
self.assertFalse(delete_volume.called)
self.assertTrue(rpc_delete_volume.called)
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- True)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual('error', volume['migration_status'])
- self.assertEqual('available', volume['status'])
+ True,
+ volume=volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
+ self.assertEqual('error', volume.migration_status)
+ self.assertEqual('available', volume.status)
@mock.patch('cinder.db.volume_update')
def test_update_migrated_volume(self, volume_update):
fake_new_host = 'fake_new_host'
fake_update = {'_name_id': 'updated_id',
'provider_location': 'updated_location'}
- fake_elevated = 'fake_elevated'
+ fake_elevated = context.RequestContext('fake', self.project_id,
+ is_admin=True)
volume = tests_utils.create_volume(self.context, size=1,
status='available',
host=fake_host)
provider_location='fake_provider_location',
_name_id='fake_name_id',
host=fake_new_host)
- new_volume['_name_id'] = 'fake_name_id'
- new_volume['provider_location'] = 'fake_provider_location'
- fake_update_error = {'_name_id': new_volume['_name_id'],
+ new_volume._name_id = 'fake_name_id'
+ new_volume.provider_location = 'fake_provider_location'
+ fake_update_error = {'_name_id': new_volume._name_id,
'provider_location':
- new_volume['provider_location']}
- expected_update = {'_name_id': volume['_name_id'],
- 'provider_location': volume['provider_location']}
+ new_volume.provider_location}
+ expected_update = {'_name_id': volume._name_id,
+ 'provider_location': volume.provider_location}
with mock.patch.object(self.volume.driver,
'update_migrated_volume') as migrate_update,\
mock.patch.object(self.context, 'elevated') as elevated:
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
- mock.call(fake_elevated, new_volume['id'], expected_update),
- mock.call(fake_elevated, volume['id'], fake_update)))
+ mock.call(fake_elevated, new_volume.id, expected_update),
+ mock.call(fake_elevated, volume.id, fake_update)))
# Test the case for update_migrated_volume not implemented
# for the driver.
migrate_update.reset_mock()
volume_update.reset_mock()
+ # Reset the volume objects to their original value, since they
+ # were changed in the last call.
+ new_volume._name_id = 'fake_name_id'
+ new_volume.provider_location = 'fake_provider_location'
migrate_update.side_effect = NotImplementedError
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
- mock.call(fake_elevated, new_volume['id'], expected_update),
- mock.call(fake_elevated, volume['id'], fake_update_error)))
+ mock.call(fake_elevated, new_volume.id, fake_update),
+ mock.call(fake_elevated, volume.id, fake_update_error)))
def test_migrate_volume_generic_create_volume_error(self):
self.expected_status = 'error'
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- True)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
+ True,
+ volume=volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- True)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
+ True,
+ volume=volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- True)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
+ True,
+ volume=volume)
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
- volume['id'],
+ volume.id,
host_obj,
- True)
+ True,
+ volume=volume)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
previous_status='available'):
def fake_attach_volume(ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
- tests_utils.attach_volume(ctxt, volume['id'],
+ tests_utils.attach_volume(ctxt, volume.id,
instance_uuid, host_name,
'/dev/vda')
previous_status=previous_status)
attachment_id = None
if status == 'in-use':
- vol = tests_utils.attach_volume(self.context, old_volume['id'],
+ vol = tests_utils.attach_volume(self.context, old_volume.id,
instance_uuid, attached_host,
'/dev/vda')
self.assertEqual('in-use', vol['status'])
attachment_id = vol['volume_attachment'][0]['id']
- target_status = 'target:%s' % old_volume['id']
+ target_status = 'target:%s' % old_volume.id
new_host = CONF.host + 'new'
new_volume = tests_utils.create_volume(self.context, size=0,
host=new_host,
'update_migrated_volume'),\
mock.patch.object(self.volume.driver, 'attach_volume'):
mock_attach_volume.side_effect = fake_attach_volume
- self.volume.migrate_volume_completion(self.context, old_volume[
- 'id'], new_volume['id'])
- after_new_volume = db.volume_get(self.context, new_volume.id)
- after_old_volume = db.volume_get(self.context, old_volume.id)
+ self.volume.migrate_volume_completion(self.context, old_volume.id,
+ new_volume.id)
+ after_new_volume = objects.Volume.get_by_id(self.context,
+ new_volume.id)
+ after_old_volume = objects.Volume.get_by_id(self.context,
+ old_volume.id)
if status == 'in-use':
mock_detach_volume.assert_called_with(self.context,
- old_volume['id'],
+ old_volume.id,
attachment_id)
attachment = db.volume_attachment_get_by_instance_uuid(
- self.context, old_volume['id'], instance_uuid)
+ self.context, old_volume.id, instance_uuid)
self.assertIsNotNone(attachment)
self.assertEqual(attached_host, attachment['attached_host'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.volume.driver._initialized = False
self.assertRaises(exception.DriverNotInitialized,
self.volume.migrate_volume,
- self.context, volume['id'],
- host_obj, True)
+ self.context, volume.id, host_obj, True,
+ volume=volume)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
+ volume = objects.Volume.get_by_id(context.get_admin_context(),
+ volume.id)
self.assertEqual('error', volume.migration_status)
# lets cleanup the mess.
expected_msg['host'] = dest_host_dict
if 'new_volume' in expected_msg:
volume = expected_msg['new_volume']
- del expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id']
if 'host' in kwargs:
version='1.14')
can_send_version.assert_called_once_with('1.35')
- def test_migrate_volume(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_migrate_volume(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
rpc_method='cast',
- volume=self.fake_volume,
+ volume=self.fake_volume_obj,
+ dest_host=dest_host,
+ force_host_copy=True,
+ version='1.36')
+ can_send_version.assert_called_once_with('1.36')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_migrate_volume_old(self, can_send_version):
+ class FakeHost(object):
+ def __init__(self):
+ self.host = 'host'
+ self.capabilities = {}
+ dest_host = FakeHost()
+ self._test_volume_api('migrate_volume',
+ rpc_method='cast',
+ volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='1.8')
+ can_send_version.assert_called_once_with('1.36')
- def test_migrate_volume_completion(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_migrate_volume_completion(self, can_send_version):
self._test_volume_api('migrate_volume_completion',
rpc_method='call',
- volume=self.fake_volume,
- new_volume=self.fake_volume,
+ volume=self.fake_volume_obj,
+ new_volume=self.fake_volume_obj,
+ error=False,
+ version='1.36')
+ can_send_version.assert_called_once_with('1.36')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_migrate_volume_completion_old(self, can_send_version):
+ self._test_volume_api('migrate_volume_completion',
+ rpc_method='call',
+ volume=self.fake_volume_obj,
+ new_volume=self.fake_volume_obj,
error=False,
version='1.10')
+ can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
lock_volume):
"""Migrate the volume to the specified host."""
- if volume['status'] not in ['available', 'in-use']:
+ if volume.status not in ['available', 'in-use']:
msg = _('Volume %(vol_id)s status must be available or in-use, '
'but current status is: '
- '%(vol_status)s.') % {'vol_id': volume['id'],
- 'vol_status': volume['status']}
+ '%(vol_status)s.') % {'vol_id': volume.id,
+ 'vol_status': volume.status}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Make sure volume is not part of a migration.
if self._is_volume_migrating(volume):
msg = _("Volume %s is already part of an active "
- "migration.") % volume['id']
+ "migration.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle volumes without snapshots for now
- snaps = objects.SnapshotList.get_all_for_volume(context, volume['id'])
+ snaps = objects.SnapshotList.get_all_for_volume(context, volume.id)
if snaps:
- msg = _("Volume %s must not have snapshots.") % volume['id']
+ msg = _("Volume %s must not have snapshots.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle non-replicated volumes for now
- rep_status = volume['replication_status']
- if rep_status is not None and rep_status != 'disabled':
- msg = _("Volume %s must not be replicated.") % volume['id']
+ if (volume.replication_status is not None and
+ volume.replication_status != 'disabled'):
+ msg = _("Volume %s must not be replicated.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
- cg_id = volume.get('consistencygroup_id', None)
- if cg_id:
+ if volume.consistencygroup_id:
msg = _("Volume %s must not be part of a consistency "
- "group.") % volume['id']
+ "group.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
raise exception.InvalidHost(reason=msg)
# Make sure the destination host is different than the current one
- if host == volume['host']:
+ if host == volume.host:
msg = _('Destination host must be different '
'than the current host.')
LOG.error(msg)
# that this volume is in maintenance mode, and no action is allowed
# on this volume, e.g. attach, detach, retype, migrate, etc.
updates = {'migration_status': 'starting',
- 'previous_status': volume['status']}
- if lock_volume and volume['status'] == 'available':
+ 'previous_status': volume.status}
+ if lock_volume and volume.status == 'available':
updates['status'] = 'maintenance'
self.update(context, volume, updates)
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume_type = {}
- volume_type_id = volume['volume_type_id']
- if volume_type_id:
+ if volume.volume_type_id:
volume_type = volume_types.get_volume_type(context.elevated(),
- volume_type_id)
+ volume.volume_type_id)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
- 'volume_id': volume['id']}
+ 'volume_id': volume.id}
self.scheduler_rpcapi.migrate_volume_to_host(context,
CONF.volume_topic,
- volume['id'],
+ volume.id,
host,
force_host_copy,
- request_spec)
+ request_spec,
+ volume=volume)
LOG.info(_LI("Migrate volume request issued successfully."),
resource=volume)
def migrate_volume_completion(self, context, volume, new_volume, error):
# This is a volume swap initiated by Nova, not Cinder. Nova expects
# us to return the new_volume_id.
- if not (volume['migration_status'] or new_volume['migration_status']):
+ if not (volume.migration_status or new_volume.migration_status):
# Don't need to do migration, but still need to finish the
# volume attach and detach so volumes don't end in 'attaching'
# and 'detaching' state
- attachments = volume['volume_attachment']
+ attachments = volume.volume_attachment
for attachment in attachments:
- self.detach(context, volume, attachment['id'])
+ self.detach(context, volume, attachment.id)
self.attach(context, new_volume,
- attachment['instance_uuid'],
- attachment['attached_host'],
- attachment['mountpoint'],
+ attachment.instance_uuid,
+ attachment.attached_host,
+ attachment.mountpoint,
'rw')
- return new_volume['id']
+ return new_volume.id
- if not volume['migration_status']:
+ if not volume.migration_status:
msg = _('Source volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
- if not new_volume['migration_status']:
+ if not new_volume.migration_status:
msg = _('Destination volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
- expected_status = 'target:%s' % volume['id']
- if not new_volume['migration_status'] == expected_status:
+ expected_status = 'target:%s' % volume.id
+ if not new_volume.migration_status == expected_status:
msg = (_('Destination has migration_status %(stat)s, expected '
- '%(exp)s.') % {'stat': new_volume['migration_status'],
+ '%(exp)s.') % {'stat': new_volume.migration_status,
'exp': expected_status})
raise exception.InvalidVolume(reason=msg)
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.35'
+ RPC_API_VERSION = '1.36'
target = messaging.Target(version=RPC_API_VERSION)
# Wait for new_volume to become ready
starttime = time.time()
deadline = starttime + CONF.migration_create_volume_timeout_secs
- new_volume = self.db.volume_get(ctxt, new_volume['id'])
+ # TODO(thangp): Replace get_by_id with refresh when it is available
+ new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
tries = 0
- while new_volume['status'] != 'available':
+ while new_volume.status != 'available':
tries += 1
now = time.time()
- if new_volume['status'] == 'error':
+ if new_volume.status == 'error':
msg = _("failed to create new_volume on destination host")
- self._clean_temporary_volume(ctxt, volume['id'],
- new_volume['id'],
+ self._clean_temporary_volume(ctxt, volume,
+ new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
elif now > deadline:
msg = _("timeout creating new_volume on destination host")
- self._clean_temporary_volume(ctxt, volume['id'],
- new_volume['id'],
+ self._clean_temporary_volume(ctxt, volume,
+ new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
- new_volume = self.db.volume_get(ctxt, new_volume['id'])
+ # TODO(thangp): Replace get_by_id with refresh when it is
+ # available
+ new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
# Copy the source volume to the destination volume
try:
- attachments = volume['volume_attachment']
+ attachments = volume.volume_attachment
if not attachments:
self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
# The above call is synchronous so we complete the migration
- self.migrate_volume_completion(ctxt, volume['id'],
- new_volume['id'],
+ self.migrate_volume_completion(ctxt, volume.id,
+ new_volume.id,
error=False)
else:
nova_api = compute.API()
for attachment in attachments:
instance_uuid = attachment['instance_uuid']
nova_api.update_server_volume(ctxt, instance_uuid,
- volume['id'],
- new_volume['id'])
+ volume.id,
+ new_volume.id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"),
- {'vol1': volume['id'], 'vol2': new_volume['id']})
- self._clean_temporary_volume(ctxt, volume['id'],
- new_volume['id'])
+ {'vol1': volume.id, 'vol2': new_volume.id})
+ self._clean_temporary_volume(ctxt, volume,
+ new_volume)
- def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id,
+ def _clean_temporary_volume(self, ctxt, volume, new_volume,
clean_db_only=False):
- volume = self.db.volume_get(ctxt, volume_id)
# If we're in the migrating phase, we need to cleanup
# destination volume because source volume is remaining
- if volume['migration_status'] == 'migrating':
+ if volume.migration_status == 'migrating':
try:
if clean_db_only:
# The temporary volume is not created, only DB data
# is created
- self.db.volume_destroy(ctxt, new_volume_id)
+ new_volume.destroy()
else:
# The temporary volume is already created
rpcapi = volume_rpcapi.VolumeAPI()
- volume = self.db.volume_get(ctxt, new_volume_id)
- rpcapi.delete_volume(ctxt, volume)
+ rpcapi.delete_volume(ctxt, new_volume)
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find the temporary volume "
"%(vol)s in the database. There is no need "
"to clean up this volume."),
- {'vol': new_volume_id})
+ {'vol': new_volume.id})
else:
# If we're in the completing phase don't delete the
# destination because we may have already deleted the
# source! But the migration_status in database should
# be cleared to handle volume after migration failure
try:
- updates = {'migration_status': None}
- self.db.volume_update(ctxt, new_volume_id, updates)
+ new_volume.migration_status = None
+ new_volume.save()
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find destination volume "
"%(vol)s in the database. The entry might be "
"successfully deleted during migration "
"completion phase."),
- {'vol': new_volume_id})
+ {'vol': new_volume.id})
LOG.warning(_LW("Failed to migrate volume. The destination "
"volume %(vol)s is not deleted since the "
"source volume may have been deleted."),
- {'vol': new_volume_id})
+ {'vol': new_volume.id})
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
- error=False):
+ error=False, volume=None, new_volume=None):
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is None or new_volume is None:
+ # For older clients, mimic the old behavior and look up the volume
+ # by its volume_id.
+ volume = objects.Volume.get_by_id(ctxt, volume_id)
+ new_volume = objects.Volume.get_by_id(ctxt, new_volume_id)
+
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
- self.db.volume_update(ctxt, volume_id,
- {'migration_status': 'error'})
+ volume.migration_status = 'error'
+ volume.save()
LOG.debug("migrate_volume_completion: completing migration for "
"volume %(vol1)s (temporary volume %(vol2)s",
- {'vol1': volume_id, 'vol2': new_volume_id})
- volume = self.db.volume_get(ctxt, volume_id)
- new_volume = self.db.volume_get(ctxt, new_volume_id)
+ {'vol1': volume.id, 'vol2': new_volume.id})
rpcapi = volume_rpcapi.VolumeAPI()
- orig_volume_status = volume['previous_status']
+ orig_volume_status = volume.previous_status
if error:
LOG.info(_LI("migrate_volume_completion is cleaning up an error "
"for volume %(vol1)s (temporary volume %(vol2)s"),
- {'vol1': volume['id'], 'vol2': new_volume['id']})
+ {'vol1': volume['id'], 'vol2': new_volume.id})
rpcapi.delete_volume(ctxt, new_volume)
updates = {'migration_status': 'error',
'status': orig_volume_status}
- self.db.volume_update(ctxt, volume_id, updates)
- return volume_id
+ volume.update(updates)
+ volume.save()
+ return volume.id
- self.db.volume_update(ctxt, volume_id,
- {'migration_status': 'completing'})
+ volume.migration_status = 'completing'
+ volume.save()
# Detach the source volume (if it fails, don't fail the migration)
try:
if orig_volume_status == 'in-use':
- attachments = volume['volume_attachment']
+ attachments = volume.volume_attachment
for attachment in attachments:
- self.detach_volume(ctxt, volume_id, attachment['id'])
+ self.detach_volume(ctxt, volume.id, attachment['id'])
except Exception as ex:
LOG.error(_LE("Detach migration source volume failed: %(err)s"),
{'err': ex}, resource=volume)
# Swap src and dest DB records so we can continue using the src id and
# asynchronously delete the destination id
__, updated_new = self.db.finish_volume_migration(
- ctxt, volume_id, new_volume_id)
+ ctxt, volume.id, new_volume.id)
updates = {'status': orig_volume_status,
- 'previous_status': volume['status'],
+ 'previous_status': volume.status,
'migration_status': 'success'}
if orig_volume_status == 'in-use':
- attachments = volume['volume_attachment']
+ attachments = volume.volume_attachment
for attachment in attachments:
rpcapi.attach_volume(ctxt, volume,
attachment['instance_uuid'],
attachment['attached_host'],
attachment['mountpoint'],
'rw')
- self.db.volume_update(ctxt, volume_id, updates)
+ volume.update(updates)
+ volume.save()
# Asynchronous deletion of the source volume in the back-end (now
# pointed by the target volume id)
except Exception as ex:
LOG.error(_LE('Failed to request async delete of migration source '
'vol %(vol)s: %(err)s'),
- {'vol': volume_id, 'err': ex})
+ {'vol': volume.id, 'err': ex})
LOG.info(_LI("Complete-Migrate volume completed successfully."),
resource=volume)
- return volume['id']
+ return volume.id
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
- new_type_id=None):
+ new_type_id=None, volume=None):
"""Migrate the volume to the specified host (called on source host)."""
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is None:
+ # For older clients, mimic the old behavior and look up the volume
+ # by its volume_id.
+ volume = objects.Volume.get_by_id(context, volume_id)
+
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
- self.db.volume_update(ctxt, volume_id,
- {'migration_status': 'error'})
+ volume.migration_status = 'error'
+ volume.save()
- volume_ref = self.db.volume_get(ctxt, volume_id)
model_update = None
moved = False
status_update = None
- if volume_ref['status'] in ('retyping', 'maintenance'):
- status_update = {'status': volume_ref['previous_status']}
+ if volume.status in ('retyping', 'maintenance'):
+ status_update = {'status': volume.previous_status}
- self.db.volume_update(ctxt, volume_ref['id'],
- {'migration_status': 'migrating'})
+ volume.migration_status = 'migrating'
+ volume.save()
if not force_host_copy and new_type_id is None:
try:
- LOG.debug("Issue driver.migrate_volume.", resource=volume_ref)
+ LOG.debug("Issue driver.migrate_volume.", resource=volume)
moved, model_update = self.driver.migrate_volume(ctxt,
- volume_ref,
+ volume,
host)
if moved:
updates = {'host': host['host'],
'migration_status': 'success',
- 'previous_status': volume_ref['status']}
+ 'previous_status': volume.status}
if status_update:
updates.update(status_update)
if model_update:
updates.update(model_update)
- volume_ref = self.db.volume_update(ctxt,
- volume_ref['id'],
- updates)
+ volume.update(updates)
+ volume.save()
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
- self.db.volume_update(ctxt, volume_ref['id'], updates)
+ volume.update(updates)
+ volume.save()
if not moved:
try:
- self._migrate_volume_generic(ctxt, volume_ref, host,
+ self._migrate_volume_generic(ctxt, volume, host,
new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
- self.db.volume_update(ctxt, volume_ref['id'], updates)
+ volume.update(updates)
+ volume.save()
LOG.info(_LI("Migrate volume completed successfully."),
- resource=volume_ref)
+ resource=volume)
@periodic_task.periodic_task
def _report_driver_status(self, context):
def update_migrated_volume(self, ctxt, volume, new_volume,
volume_status):
"""Finalize migration process on backend device."""
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if (not isinstance(volume, objects.Volume) or
+ not isinstance(new_volume, objects.Volume)):
+ volume = objects.Volume.get_by_id(ctxt, volume['id'])
+ new_volume = objects.Volume.get_by_id(ctxt, new_volume['id'])
+
model_update = None
- # This is temporary fix for bug 1491210.
- volume = self.db.volume_get(ctxt, volume['id'])
- new_volume = self.db.volume_get(ctxt, new_volume['id'])
- model_update_default = {'_name_id': new_volume['_name_id'] or
- new_volume['id'],
+ model_update_default = {'_name_id': new_volume.name_id,
'provider_location':
- new_volume['provider_location']}
+ new_volume.provider_location}
try:
model_update = self.driver.update_migrated_volume(ctxt,
volume,
if volume.get('volume_metadata'):
model_update_new[key] = {
metadata['key']: metadata['value']
- for metadata in volume.get('volume_metadata')}
+ for metadata in volume.volume_metadata}
elif key == 'admin_metadata':
model_update_new[key] = {
metadata['key']: metadata['value']
- for metadata in volume.get('volume_admin_metadata')}
+ for metadata in volume.volume_admin_metadata}
else:
model_update_new[key] = volume[key]
- self.db.volume_update(ctxt.elevated(), new_volume['id'],
- model_update_new)
- self.db.volume_update(ctxt.elevated(), volume['id'],
- model_update_default)
+ with new_volume.obj_as_admin():
+ new_volume.update(model_update_new)
+ new_volume.save()
+ with volume.obj_as_admin():
+ volume.update(model_update_default)
+ volume.save()
# Replication V2 methods
def enable_replication(self, context, volume):
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
1.35 - Adds support for sending objects over RPC in extend_volume().
+ 1.36 - Adds support for sending objects over RPC in migrate_volume(),
+ migrate_volume_completion(), and update_migrated_volume().
"""
BASE_RPC_API_VERSION = '1.0'
cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
- new_host = utils.extract_host(volume['host'])
- cctxt = self.client.prepare(server=new_host, version='1.8')
+ new_host = utils.extract_host(volume.host)
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
- cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
- host=host_p, force_host_copy=force_host_copy)
+
+ msg_args = {'volume_id': volume.id, 'host': host_p,
+ 'force_host_copy': force_host_copy}
+ if self.client.can_send_version('1.36'):
+ version = '1.36'
+ msg_args['volume'] = volume
+ else:
+ version = '1.8'
+
+ cctxt = self.client.prepare(server=new_host, version=version)
+ cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
- new_host = utils.extract_host(volume['host'])
- cctxt = self.client.prepare(server=new_host, version='1.10')
- return cctxt.call(ctxt, 'migrate_volume_completion',
- volume_id=volume['id'],
- new_volume_id=new_volume['id'],
- error=error)
+ new_host = utils.extract_host(volume.host)
+
+ msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
+ 'error': error}
+ if self.client.can_send_version('1.36'):
+ version = '1.36'
+ msg_args['volume'] = volume
+ msg_args['new_volume'] = new_volume
+ else:
+ version = '1.10'
+
+ cctxt = self.client.prepare(server=new_host, version=version)
+ return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
host = utils.extract_host(new_volume['host'])
- cctxt = self.client.prepare(server=host, version='1.19')
+ cctxt = self.client.prepare(server=host, version='1.36')
cctxt.call(ctxt,
'update_migrated_volume',
volume=volume,