From: Thang Pham Date: Tue, 29 Sep 2015 14:12:35 +0000 (-0700) Subject: Update migrate_volume API to use versionedobjects X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=12e4d9236d3ed74c8062abd11ce4fc834e5a8404;p=openstack-build%2Fcinder-build.git Update migrate_volume API to use versionedobjects The following patch updates migrate_volume, migrate_volume_completion, and update_migrated_volume APIs to use volume versionedobjects. Changes were made to be backwards compatible with older RPC clients. It only includes changes to the core cinder code. Changes in the drivers are left to each driver maintainer to update. Note that this patch DOES NOT try to use object dot notation everywhere, since it would increase the size of the patch. Instead, it will be done in subsequent patches. Change-Id: I21fe68193c934a7ef3688274ab35f664a08cac7e Partial-Implements: blueprint cinder-objects Closes-Bug: #1521085 --- diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 5764c1da3..fa06628aa 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__) class SchedulerManager(manager.Manager): """Chooses a host to create volumes.""" - RPC_API_VERSION = '1.10' + RPC_API_VERSION = '1.11' target = messaging.Target(version=RPC_API_VERSION) @@ -148,13 +148,18 @@ class SchedulerManager(manager.Manager): def migrate_volume_to_host(self, context, topic, volume_id, host, force_host_copy, request_spec, - filter_properties=None): + filter_properties=None, volume=None): """Ensure that the host exists and can accept the volume.""" self._wait_for_scheduler() + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the + # volume by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + def _migrate_volume_set_error(self, context, ex, request_spec): - volume = db.volume_get(context, request_spec['volume_id']) if volume.status == 'maintenance': previous_status = ( volume.previous_status or 'maintenance') @@ -176,8 +181,7 @@ class SchedulerManager(manager.Manager): with excutils.save_and_reraise_exception(): _migrate_volume_set_error(self, context, ex, request_spec) else: - volume_ref = db.volume_get(context, volume_id) - volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref, + volume_rpcapi.VolumeAPI().migrate_volume(context, volume, tgt_host, force_host_copy) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 4e9769c72..e451336e7 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -44,6 +44,8 @@ class SchedulerAPI(object): 1.8 - Add sending object over RPC in create_consistencygroup method 1.9 - Adds support for sending objects over RPC in create_volume() 1.10 - Adds support for sending objects over RPC in retype() + 1.11 - Adds support for sending objects over RPC in + migrate_volume_to_host() """ RPC_API_VERSION = '1.0' @@ -95,17 +97,20 @@ class SchedulerAPI(object): def migrate_volume_to_host(self, ctxt, topic, volume_id, host, force_host_copy=False, request_spec=None, - filter_properties=None): - - cctxt = self.client.prepare(version='1.3') + filter_properties=None, volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - return cctxt.cast(ctxt, 'migrate_volume_to_host', - topic=topic, - volume_id=volume_id, - host=host, - force_host_copy=force_host_copy, - request_spec=request_spec_p, - filter_properties=filter_properties) + msg_args = {'topic': topic, 'volume_id': volume_id, + 'host': host, 'force_host_copy': force_host_copy, + 'request_spec': request_spec_p, + 'filter_properties': filter_properties} + if self.client.can_send_version('1.11'): + version = '1.11' + msg_args['volume'] = volume + else: + version = '1.3' + + cctxt = self.client.prepare(version=version) + return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) def retype(self, ctxt, topic, volume_id, request_spec=None, filter_properties=None, volume=None): diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index a6da59f1a..abfcec050 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -801,7 +801,7 @@ class AdminActionsTest(test.TestCase): force_host_copy=False): admin_ctx = context.get_admin_context() # build request to migrate to host - req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id) req.method = 'POST' req.headers['content-type'] = 'application/json' body = {'os-migrate_volume': {'host': host, @@ -811,7 +811,7 @@ class AdminActionsTest(test.TestCase): resp = req.get_response(app()) # verify status self.assertEqual(expected_status, resp.status_int) - volume = db.volume_get(admin_ctx, volume['id']) + volume = objects.Volume.get_by_id(admin_ctx, volume.id) return volume def test_migrate_volume_success(self): diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index 60c935f19..74de0baec 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -117,7 +117,9 @@ class SchedulerRpcAPITestCase(test.TestCase): version='1.2') can_send_version.assert_called_once_with('1.9') - def test_migrate_volume_to_host(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume_to_host(self, can_send_version): self._test_scheduler_api('migrate_volume_to_host', rpc_method='cast', topic='topic', @@ -126,7 +128,24 @@ class SchedulerRpcAPITestCase(test.TestCase): force_host_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', + volume='volume', + version='1.11') + can_send_version.assert_called_once_with('1.11') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_to_host_old(self, can_send_version): + self._test_scheduler_api('migrate_volume_to_host', + rpc_method='cast', + topic='topic', + volume_id='volume_id', + host='host', + force_host_copy=True, + request_spec='fake_request_spec', + filter_properties='filter_properties', + volume='volume', version='1.3') + can_send_version.assert_called_once_with('1.11') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index 12477fe3c..7c9361f13 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -4183,22 +4183,20 @@ class VolumeTestCase(BaseVolumeTestCase): def test_clean_temporary_volume(self): def fake_delete_volume(ctxt, volume): - db.volume_destroy(ctxt, volume['id']) + volume.destroy() fake_volume = tests_utils.create_volume(self.context, size=1, - host=CONF.host) + host=CONF.host, + migration_status='migrating') fake_new_volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) - # Check when the migrated volume is in migration - db.volume_update(self.context, fake_volume['id'], - {'migration_status': 'migrating'}) # 1. Only clean the db - self.volume._clean_temporary_volume(self.context, fake_volume['id'], - fake_new_volume['id'], + self.volume._clean_temporary_volume(self.context, fake_volume, + fake_new_volume, clean_db_only=True) self.assertRaises(exception.VolumeNotFound, db.volume_get, self.context, - fake_new_volume['id']) + fake_new_volume.id) # 2. Delete the backend storage fake_new_volume = tests_utils.create_volume(self.context, size=1, @@ -4207,23 +4205,23 @@ class VolumeTestCase(BaseVolumeTestCase): mock_delete_volume: mock_delete_volume.side_effect = fake_delete_volume self.volume._clean_temporary_volume(self.context, - fake_volume['id'], - fake_new_volume['id'], + fake_volume, + fake_new_volume, clean_db_only=False) self.assertRaises(exception.VolumeNotFound, db.volume_get, self.context, - fake_new_volume['id']) + fake_new_volume.id) # Check when the migrated volume is not in migration fake_new_volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) - db.volume_update(self.context, fake_volume['id'], - {'migration_status': 'non-migrating'}) - self.volume._clean_temporary_volume(self.context, fake_volume['id'], - fake_new_volume['id']) + fake_volume.migration_status = 'non-migrating' + fake_volume.save() + self.volume._clean_temporary_volume(self.context, fake_volume, + fake_new_volume) volume = db.volume_get(context.get_admin_context(), - fake_new_volume['id']) - self.assertIsNone(volume['migration_status']) + fake_new_volume.id) + self.assertIsNone(volume.migration_status) def test_update_volume_readonly_flag(self): """Test volume readonly flag can be updated at API level.""" @@ -4323,13 +4321,14 @@ class VolumeMigrationTestCase(VolumeTestCase): host=CONF.host, migration_status='migrating') host_obj = {'host': 'newhost', 'capabilities': {}} - self.volume.migrate_volume(self.context, volume['id'], - host_obj, False) + self.volume.migrate_volume(self.context, volume.id, host_obj, False, + volume=volume) # check volume properties - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('newhost', volume['host']) - self.assertEqual('success', volume['migration_status']) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('newhost', volume.host) + self.assertEqual('success', volume.migration_status) def _fake_create_volume(self, ctxt, volume, host, req_spec, filters, allow_reschedule=True): @@ -4351,12 +4350,14 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - False) - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('error', volume['migration_status']) - self.assertEqual('available', volume['status']) + False, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('error', volume.migration_status) + self.assertEqual('available', volume.status) @mock.patch('cinder.compute.API') @mock.patch('cinder.volume.manager.VolumeManager.' @@ -4366,7 +4367,10 @@ class VolumeMigrationTestCase(VolumeTestCase): migrate_volume_completion, nova_api): fake_volume_id = 'fake_volume_id' - fake_new_volume = {'status': 'available', 'id': fake_volume_id} + fake_db_new_volume = {'status': 'available', 'id': fake_volume_id} + fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume) + new_volume_obj = fake_volume.fake_volume_obj(self.context, + **fake_new_volume) host_obj = {'host': 'newhost', 'capabilities': {}} volume_get.return_value = fake_new_volume update_server_volume = nova_api.return_value.update_server_volume @@ -4377,12 +4381,10 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume._migrate_volume_generic(self.context, volume, host_obj, None) mock_copy_volume.assert_called_with(self.context, volume, - fake_new_volume, + new_volume_obj, remote='dest') - migrate_volume_completion.assert_called_with(self.context, - volume['id'], - fake_new_volume['id'], - error=False) + migrate_volume_completion.assert_called_with( + self.context, volume.id, new_volume_obj.id, error=False) self.assertFalse(update_server_volume.called) @mock.patch('cinder.compute.API') @@ -4421,6 +4423,7 @@ class VolumeMigrationTestCase(VolumeTestCase): rpc_delete_volume, update_migrated_volume): fake_volume = tests_utils.create_volume(self.context, size=1, + previous_status='available', host=CONF.host) host_obj = {'host': 'newhost', 'capabilities': {}} @@ -4430,12 +4433,12 @@ class VolumeMigrationTestCase(VolumeTestCase): mock.patch.object(self.volume.driver, 'delete_volume') as \ delete_volume: create_volume.side_effect = self._fake_create_volume - self.volume.migrate_volume(self.context, fake_volume['id'], - host_obj, True) - volume = db.volume_get(context.get_admin_context(), - fake_volume['id']) - self.assertEqual('newhost', volume['host']) - self.assertEqual('success', volume['migration_status']) + self.volume.migrate_volume(self.context, fake_volume.id, + host_obj, True, volume=fake_volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + fake_volume.id) + self.assertEqual('newhost', volume.host) + self.assertEqual('success', volume.migration_status) self.assertFalse(mock_migrate_volume.called) self.assertFalse(delete_volume.called) self.assertTrue(rpc_delete_volume.called) @@ -4461,12 +4464,14 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('error', volume['migration_status']) - self.assertEqual('available', volume['status']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('error', volume.migration_status) + self.assertEqual('available', volume.status) @mock.patch('cinder.db.volume_update') def test_update_migrated_volume(self, volume_update): @@ -4474,7 +4479,8 @@ class VolumeMigrationTestCase(VolumeTestCase): fake_new_host = 'fake_new_host' fake_update = {'_name_id': 'updated_id', 'provider_location': 'updated_location'} - fake_elevated = 'fake_elevated' + fake_elevated = context.RequestContext('fake', self.project_id, + is_admin=True) volume = tests_utils.create_volume(self.context, size=1, status='available', host=fake_host) @@ -4484,13 +4490,13 @@ class VolumeMigrationTestCase(VolumeTestCase): provider_location='fake_provider_location', _name_id='fake_name_id', host=fake_new_host) - new_volume['_name_id'] = 'fake_name_id' - new_volume['provider_location'] = 'fake_provider_location' - fake_update_error = {'_name_id': new_volume['_name_id'], + new_volume._name_id = 'fake_name_id' + new_volume.provider_location = 'fake_provider_location' + fake_update_error = {'_name_id': new_volume._name_id, 'provider_location': - new_volume['provider_location']} - expected_update = {'_name_id': volume['_name_id'], - 'provider_location': volume['provider_location']} + new_volume.provider_location} + expected_update = {'_name_id': volume._name_id, + 'provider_location': volume.provider_location} with mock.patch.object(self.volume.driver, 'update_migrated_volume') as migrate_update,\ mock.patch.object(self.context, 'elevated') as elevated: @@ -4499,19 +4505,23 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume.update_migrated_volume(self.context, volume, new_volume, 'available') volume_update.assert_has_calls(( - mock.call(fake_elevated, new_volume['id'], expected_update), - mock.call(fake_elevated, volume['id'], fake_update))) + mock.call(fake_elevated, new_volume.id, expected_update), + mock.call(fake_elevated, volume.id, fake_update))) # Test the case for update_migrated_volume not implemented # for the driver. migrate_update.reset_mock() volume_update.reset_mock() + # Reset the volume objects to their original value, since they + # were changed in the last call. + new_volume._name_id = 'fake_name_id' + new_volume.provider_location = 'fake_provider_location' migrate_update.side_effect = NotImplementedError self.volume.update_migrated_volume(self.context, volume, new_volume, 'available') volume_update.assert_has_calls(( - mock.call(fake_elevated, new_volume['id'], expected_update), - mock.call(fake_elevated, volume['id'], fake_update_error))) + mock.call(fake_elevated, new_volume.id, fake_update), + mock.call(fake_elevated, volume.id, fake_update_error))) def test_migrate_volume_generic_create_volume_error(self): self.expected_status = 'error' @@ -4530,10 +4540,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(exception.VolumeMigrationFailed, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) self.assertTrue(clean_temporary_volume.called) @@ -4558,10 +4570,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(exception.VolumeMigrationFailed, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) self.assertTrue(clean_temporary_volume.called) @@ -4588,10 +4602,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) @@ -4634,9 +4650,10 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) + True, + volume=volume) volume = db.volume_get(context.get_admin_context(), volume['id']) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) @@ -4649,7 +4666,7 @@ class VolumeMigrationTestCase(VolumeTestCase): previous_status='available'): def fake_attach_volume(ctxt, volume, instance_uuid, host_name, mountpoint, mode): - tests_utils.attach_volume(ctxt, volume['id'], + tests_utils.attach_volume(ctxt, volume.id, instance_uuid, host_name, '/dev/vda') @@ -4661,12 +4678,12 @@ class VolumeMigrationTestCase(VolumeTestCase): previous_status=previous_status) attachment_id = None if status == 'in-use': - vol = tests_utils.attach_volume(self.context, old_volume['id'], + vol = tests_utils.attach_volume(self.context, old_volume.id, instance_uuid, attached_host, '/dev/vda') self.assertEqual('in-use', vol['status']) attachment_id = vol['volume_attachment'][0]['id'] - target_status = 'target:%s' % old_volume['id'] + target_status = 'target:%s' % old_volume.id new_host = CONF.host + 'new' new_volume = tests_utils.create_volume(self.context, size=0, host=new_host, @@ -4681,16 +4698,18 @@ class VolumeMigrationTestCase(VolumeTestCase): 'update_migrated_volume'),\ mock.patch.object(self.volume.driver, 'attach_volume'): mock_attach_volume.side_effect = fake_attach_volume - self.volume.migrate_volume_completion(self.context, old_volume[ - 'id'], new_volume['id']) - after_new_volume = db.volume_get(self.context, new_volume.id) - after_old_volume = db.volume_get(self.context, old_volume.id) + self.volume.migrate_volume_completion(self.context, old_volume.id, + new_volume.id) + after_new_volume = objects.Volume.get_by_id(self.context, + new_volume.id) + after_old_volume = objects.Volume.get_by_id(self.context, + old_volume.id) if status == 'in-use': mock_detach_volume.assert_called_with(self.context, - old_volume['id'], + old_volume.id, attachment_id) attachment = db.volume_attachment_get_by_instance_uuid( - self.context, old_volume['id'], instance_uuid) + self.context, old_volume.id, instance_uuid) self.assertIsNotNone(attachment) self.assertEqual(attached_host, attachment['attached_host']) self.assertEqual(instance_uuid, attachment['instance_uuid']) @@ -4865,10 +4884,11 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume.driver._initialized = False self.assertRaises(exception.DriverNotInitialized, self.volume.migrate_volume, - self.context, volume['id'], - host_obj, True) + self.context, volume.id, host_obj, True, + volume=volume) - volume = db.volume_get(context.get_admin_context(), volume['id']) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume.migration_status) # lets cleanup the mess. diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index ccb2f0265..22f0c906c 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -146,7 +146,6 @@ class VolumeRpcAPITestCase(test.TestCase): expected_msg['host'] = dest_host_dict if 'new_volume' in expected_msg: volume = expected_msg['new_volume'] - del expected_msg['new_volume'] expected_msg['new_volume_id'] = volume['id'] if 'host' in kwargs: @@ -392,7 +391,9 @@ class VolumeRpcAPITestCase(test.TestCase): version='1.14') can_send_version.assert_called_once_with('1.35') - def test_migrate_volume(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' @@ -400,18 +401,49 @@ class VolumeRpcAPITestCase(test.TestCase): dest_host = FakeHost() self._test_volume_api('migrate_volume', rpc_method='cast', - volume=self.fake_volume, + volume=self.fake_volume_obj, + dest_host=dest_host, + force_host_copy=True, + version='1.36') + can_send_version.assert_called_once_with('1.36') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_old(self, can_send_version): + class FakeHost(object): + def __init__(self): + self.host = 'host' + self.capabilities = {} + dest_host = FakeHost() + self._test_volume_api('migrate_volume', + rpc_method='cast', + volume=self.fake_volume_obj, dest_host=dest_host, force_host_copy=True, version='1.8') + can_send_version.assert_called_once_with('1.36') - def test_migrate_volume_completion(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume_completion(self, can_send_version): self._test_volume_api('migrate_volume_completion', rpc_method='call', - volume=self.fake_volume, - new_volume=self.fake_volume, + volume=self.fake_volume_obj, + new_volume=self.fake_volume_obj, + error=False, + version='1.36') + can_send_version.assert_called_once_with('1.36') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_completion_old(self, can_send_version): + self._test_volume_api('migrate_volume_completion', + rpc_method='call', + volume=self.fake_volume_obj, + new_volume=self.fake_volume_obj, error=False, version='1.10') + can_send_version.assert_called_once_with('1.36') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index ac01271a3..9341399a4 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1275,39 +1275,38 @@ class API(base.Base): lock_volume): """Migrate the volume to the specified host.""" - if volume['status'] not in ['available', 'in-use']: + if volume.status not in ['available', 'in-use']: msg = _('Volume %(vol_id)s status must be available or in-use, ' 'but current status is: ' - '%(vol_status)s.') % {'vol_id': volume['id'], - 'vol_status': volume['status']} + '%(vol_status)s.') % {'vol_id': volume.id, + 'vol_status': volume.status} LOG.error(msg) raise exception.InvalidVolume(reason=msg) # Make sure volume is not part of a migration. if self._is_volume_migrating(volume): msg = _("Volume %s is already part of an active " - "migration.") % volume['id'] + "migration.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) # We only handle volumes without snapshots for now - snaps = objects.SnapshotList.get_all_for_volume(context, volume['id']) + snaps = objects.SnapshotList.get_all_for_volume(context, volume.id) if snaps: - msg = _("Volume %s must not have snapshots.") % volume['id'] + msg = _("Volume %s must not have snapshots.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) # We only handle non-replicated volumes for now - rep_status = volume['replication_status'] - if rep_status is not None and rep_status != 'disabled': - msg = _("Volume %s must not be replicated.") % volume['id'] + if (volume.replication_status is not None and + volume.replication_status != 'disabled'): + msg = _("Volume %s must not be replicated.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) - cg_id = volume.get('consistencygroup_id', None) - if cg_id: + if volume.consistencygroup_id: msg = _("Volume %s must not be part of a consistency " - "group.") % volume['id'] + "group.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) @@ -1327,7 +1326,7 @@ class API(base.Base): raise exception.InvalidHost(reason=msg) # Make sure the destination host is different than the current one - if host == volume['host']: + if host == volume.host: msg = _('Destination host must be different ' 'than the current host.') LOG.error(msg) @@ -1340,27 +1339,27 @@ class API(base.Base): # that this volume is in maintenance mode, and no action is allowed # on this volume, e.g. attach, detach, retype, migrate, etc. updates = {'migration_status': 'starting', - 'previous_status': volume['status']} - if lock_volume and volume['status'] == 'available': + 'previous_status': volume.status} + if lock_volume and volume.status == 'available': updates['status'] = 'maintenance' self.update(context, volume, updates) # Call the scheduler to ensure that the host exists and that it can # accept the volume volume_type = {} - volume_type_id = volume['volume_type_id'] - if volume_type_id: + if volume.volume_type_id: volume_type = volume_types.get_volume_type(context.elevated(), - volume_type_id) + volume.volume_type_id) request_spec = {'volume_properties': volume, 'volume_type': volume_type, - 'volume_id': volume['id']} + 'volume_id': volume.id} self.scheduler_rpcapi.migrate_volume_to_host(context, CONF.volume_topic, - volume['id'], + volume.id, host, force_host_copy, - request_spec) + request_spec, + volume=volume) LOG.info(_LI("Migrate volume request issued successfully."), resource=volume) @@ -1368,34 +1367,34 @@ class API(base.Base): def migrate_volume_completion(self, context, volume, new_volume, error): # This is a volume swap initiated by Nova, not Cinder. Nova expects # us to return the new_volume_id. - if not (volume['migration_status'] or new_volume['migration_status']): + if not (volume.migration_status or new_volume.migration_status): # Don't need to do migration, but still need to finish the # volume attach and detach so volumes don't end in 'attaching' # and 'detaching' state - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: - self.detach(context, volume, attachment['id']) + self.detach(context, volume, attachment.id) self.attach(context, new_volume, - attachment['instance_uuid'], - attachment['attached_host'], - attachment['mountpoint'], + attachment.instance_uuid, + attachment.attached_host, + attachment.mountpoint, 'rw') - return new_volume['id'] + return new_volume.id - if not volume['migration_status']: + if not volume.migration_status: msg = _('Source volume not mid-migration.') raise exception.InvalidVolume(reason=msg) - if not new_volume['migration_status']: + if not new_volume.migration_status: msg = _('Destination volume not mid-migration.') raise exception.InvalidVolume(reason=msg) - expected_status = 'target:%s' % volume['id'] - if not new_volume['migration_status'] == expected_status: + expected_status = 'target:%s' % volume.id + if not new_volume.migration_status == expected_status: msg = (_('Destination has migration_status %(stat)s, expected ' - '%(exp)s.') % {'stat': new_volume['migration_status'], + '%(exp)s.') % {'stat': new_volume.migration_status, 'exp': expected_status}) raise exception.InvalidVolume(reason=msg) diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 86908de0f..d63e2360c 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -197,7 +197,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '1.35' + RPC_API_VERSION = '1.36' target = messaging.Target(version=RPC_API_VERSION) @@ -1626,35 +1626,38 @@ class VolumeManager(manager.SchedulerDependentManager): # Wait for new_volume to become ready starttime = time.time() deadline = starttime + CONF.migration_create_volume_timeout_secs - new_volume = self.db.volume_get(ctxt, new_volume['id']) + # TODO(thangp): Replace get_by_id with refresh when it is available + new_volume = objects.Volume.get_by_id(ctxt, new_volume.id) tries = 0 - while new_volume['status'] != 'available': + while new_volume.status != 'available': tries += 1 now = time.time() - if new_volume['status'] == 'error': + if new_volume.status == 'error': msg = _("failed to create new_volume on destination host") - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id'], + self._clean_temporary_volume(ctxt, volume, + new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) elif now > deadline: msg = _("timeout creating new_volume on destination host") - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id'], + self._clean_temporary_volume(ctxt, volume, + new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) else: time.sleep(tries ** 2) - new_volume = self.db.volume_get(ctxt, new_volume['id']) + # TODO(thangp): Replace get_by_id with refresh when it is + # available + new_volume = objects.Volume.get_by_id(ctxt, new_volume.id) # Copy the source volume to the destination volume try: - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment if not attachments: self._copy_volume_data(ctxt, volume, new_volume, remote='dest') # The above call is synchronous so we complete the migration - self.migrate_volume_completion(ctxt, volume['id'], - new_volume['id'], + self.migrate_volume_completion(ctxt, volume.id, + new_volume.id, error=False) else: nova_api = compute.API() @@ -1663,58 +1666,63 @@ class VolumeManager(manager.SchedulerDependentManager): for attachment in attachments: instance_uuid = attachment['instance_uuid'] nova_api.update_server_volume(ctxt, instance_uuid, - volume['id'], - new_volume['id']) + volume.id, + new_volume.id) except Exception: with excutils.save_and_reraise_exception(): LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"), - {'vol1': volume['id'], 'vol2': new_volume['id']}) - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id']) + {'vol1': volume.id, 'vol2': new_volume.id}) + self._clean_temporary_volume(ctxt, volume, + new_volume) - def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id, + def _clean_temporary_volume(self, ctxt, volume, new_volume, clean_db_only=False): - volume = self.db.volume_get(ctxt, volume_id) # If we're in the migrating phase, we need to cleanup # destination volume because source volume is remaining - if volume['migration_status'] == 'migrating': + if volume.migration_status == 'migrating': try: if clean_db_only: # The temporary volume is not created, only DB data # is created - self.db.volume_destroy(ctxt, new_volume_id) + new_volume.destroy() else: # The temporary volume is already created rpcapi = volume_rpcapi.VolumeAPI() - volume = self.db.volume_get(ctxt, new_volume_id) - rpcapi.delete_volume(ctxt, volume) + rpcapi.delete_volume(ctxt, new_volume) except exception.VolumeNotFound: LOG.info(_LI("Couldn't find the temporary volume " "%(vol)s in the database. There is no need " "to clean up this volume."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) else: # If we're in the completing phase don't delete the # destination because we may have already deleted the # source! But the migration_status in database should # be cleared to handle volume after migration failure try: - updates = {'migration_status': None} - self.db.volume_update(ctxt, new_volume_id, updates) + new_volume.migration_status = None + new_volume.save() except exception.VolumeNotFound: LOG.info(_LI("Couldn't find destination volume " "%(vol)s in the database. The entry might be " "successfully deleted during migration " "completion phase."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) LOG.warning(_LW("Failed to migrate volume. The destination " "volume %(vol)s is not deleted since the " "source volume may have been deleted."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) def migrate_volume_completion(self, ctxt, volume_id, new_volume_id, - error=False): + error=False, volume=None, new_volume=None): + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None or new_volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(ctxt, volume_id) + new_volume = objects.Volume.get_by_id(ctxt, new_volume_id) + try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -1722,37 +1730,36 @@ class VolumeManager(manager.SchedulerDependentManager): utils.require_driver_initialized(self.driver) except exception.DriverNotInitialized: with excutils.save_and_reraise_exception(): - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'error'}) + volume.migration_status = 'error' + volume.save() LOG.debug("migrate_volume_completion: completing migration for " "volume %(vol1)s (temporary volume %(vol2)s", - {'vol1': volume_id, 'vol2': new_volume_id}) - volume = self.db.volume_get(ctxt, volume_id) - new_volume = self.db.volume_get(ctxt, new_volume_id) + {'vol1': volume.id, 'vol2': new_volume.id}) rpcapi = volume_rpcapi.VolumeAPI() - orig_volume_status = volume['previous_status'] + orig_volume_status = volume.previous_status if error: LOG.info(_LI("migrate_volume_completion is cleaning up an error " "for volume %(vol1)s (temporary volume %(vol2)s"), - {'vol1': volume['id'], 'vol2': new_volume['id']}) + {'vol1': volume['id'], 'vol2': new_volume.id}) rpcapi.delete_volume(ctxt, new_volume) updates = {'migration_status': 'error', 'status': orig_volume_status} - self.db.volume_update(ctxt, volume_id, updates) - return volume_id + volume.update(updates) + volume.save() + return volume.id - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'completing'}) + volume.migration_status = 'completing' + volume.save() # Detach the source volume (if it fails, don't fail the migration) try: if orig_volume_status == 'in-use': - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: - self.detach_volume(ctxt, volume_id, attachment['id']) + self.detach_volume(ctxt, volume.id, attachment['id']) except Exception as ex: LOG.error(_LE("Detach migration source volume failed: %(err)s"), {'err': ex}, resource=volume) @@ -1767,20 +1774,21 @@ class VolumeManager(manager.SchedulerDependentManager): # Swap src and dest DB records so we can continue using the src id and # asynchronously delete the destination id __, updated_new = self.db.finish_volume_migration( - ctxt, volume_id, new_volume_id) + ctxt, volume.id, new_volume.id) updates = {'status': orig_volume_status, - 'previous_status': volume['status'], + 'previous_status': volume.status, 'migration_status': 'success'} if orig_volume_status == 'in-use': - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: rpcapi.attach_volume(ctxt, volume, attachment['instance_uuid'], attachment['attached_host'], attachment['mountpoint'], 'rw') - self.db.volume_update(ctxt, volume_id, updates) + volume.update(updates) + volume.save() # Asynchronous deletion of the source volume in the back-end (now # pointed by the target volume id) @@ -1789,15 +1797,21 @@ class VolumeManager(manager.SchedulerDependentManager): except Exception as ex: LOG.error(_LE('Failed to request async delete of migration source ' 'vol %(vol)s: %(err)s'), - {'vol': volume_id, 'err': ex}) + {'vol': volume.id, 'err': ex}) LOG.info(_LI("Complete-Migrate volume completed successfully."), resource=volume) - return volume['id'] + return volume.id def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False, - new_type_id=None): + new_type_id=None, volume=None): """Migrate the volume to the specified host (called on source host).""" + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -1805,54 +1819,54 @@ class VolumeManager(manager.SchedulerDependentManager): utils.require_driver_initialized(self.driver) except exception.DriverNotInitialized: with excutils.save_and_reraise_exception(): - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'error'}) + volume.migration_status = 'error' + volume.save() - volume_ref = self.db.volume_get(ctxt, volume_id) model_update = None moved = False status_update = None - if volume_ref['status'] in ('retyping', 'maintenance'): - status_update = {'status': volume_ref['previous_status']} + if volume.status in ('retyping', 'maintenance'): + status_update = {'status': volume.previous_status} - self.db.volume_update(ctxt, volume_ref['id'], - {'migration_status': 'migrating'}) + volume.migration_status = 'migrating' + volume.save() if not force_host_copy and new_type_id is None: try: - LOG.debug("Issue driver.migrate_volume.", resource=volume_ref) + LOG.debug("Issue driver.migrate_volume.", resource=volume) moved, model_update = self.driver.migrate_volume(ctxt, - volume_ref, + volume, host) if moved: updates = {'host': host['host'], 'migration_status': 'success', - 'previous_status': volume_ref['status']} + 'previous_status': volume.status} if status_update: updates.update(status_update) if model_update: updates.update(model_update) - volume_ref = self.db.volume_update(ctxt, - volume_ref['id'], - updates) + volume.update(updates) + volume.save() except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} if status_update: updates.update(status_update) - self.db.volume_update(ctxt, volume_ref['id'], updates) + volume.update(updates) + volume.save() if not moved: try: - self._migrate_volume_generic(ctxt, volume_ref, host, + self._migrate_volume_generic(ctxt, volume, host, new_type_id) except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} if status_update: updates.update(status_update) - self.db.volume_update(ctxt, volume_ref['id'], updates) + volume.update(updates) + volume.save() LOG.info(_LI("Migrate volume completed successfully."), - resource=volume_ref) + resource=volume) @periodic_task.periodic_task def _report_driver_status(self, context): @@ -3088,14 +3102,16 @@ class VolumeManager(manager.SchedulerDependentManager): def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): """Finalize migration process on backend device.""" + # FIXME(thangp): Remove this in v2.0 of RPC API. + if (not isinstance(volume, objects.Volume) or + not isinstance(new_volume, objects.Volume)): + volume = objects.Volume.get_by_id(ctxt, volume['id']) + new_volume = objects.Volume.get_by_id(ctxt, new_volume['id']) + model_update = None - # This is temporary fix for bug 1491210. - volume = self.db.volume_get(ctxt, volume['id']) - new_volume = self.db.volume_get(ctxt, new_volume['id']) - model_update_default = {'_name_id': new_volume['_name_id'] or - new_volume['id'], + model_update_default = {'_name_id': new_volume.name_id, 'provider_location': - new_volume['provider_location']} + new_volume.provider_location} try: model_update = self.driver.update_migrated_volume(ctxt, volume, @@ -3119,17 +3135,19 @@ class VolumeManager(manager.SchedulerDependentManager): if volume.get('volume_metadata'): model_update_new[key] = { metadata['key']: metadata['value'] - for metadata in volume.get('volume_metadata')} + for metadata in volume.volume_metadata} elif key == 'admin_metadata': model_update_new[key] = { metadata['key']: metadata['value'] - for metadata in volume.get('volume_admin_metadata')} + for metadata in volume.volume_admin_metadata} else: model_update_new[key] = volume[key] - self.db.volume_update(ctxt.elevated(), new_volume['id'], - model_update_new) - self.db.volume_update(ctxt.elevated(), volume['id'], - model_update_default) + with new_volume.obj_as_admin(): + new_volume.update(model_update_new) + new_volume.save() + with volume.obj_as_admin(): + volume.update(model_update_default) + volume.save() # Replication V2 methods def enable_replication(self, context, volume): diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index c81e78341..035b4d595 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -83,6 +83,8 @@ class VolumeAPI(object): 1.33 - Adds support for sending objects over RPC in delete_volume(). 1.34 - Adds support for sending objects over RPC in retype(). 1.35 - Adds support for sending objects over RPC in extend_volume(). + 1.36 - Adds support for sending objects over RPC in migrate_volume(), + migrate_volume_completion(), and update_migrated_volume(). """ BASE_RPC_API_VERSION = '1.0' @@ -246,20 +248,35 @@ class VolumeAPI(object): cctxt.cast(ctxt, 'extend_volume', **msg_args) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.8') + new_host = utils.extract_host(volume.host) host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'], - host=host_p, force_host_copy=force_host_copy) + + msg_args = {'volume_id': volume.id, 'host': host_p, + 'force_host_copy': force_host_copy} + if self.client.can_send_version('1.36'): + version = '1.36' + msg_args['volume'] = volume + else: + version = '1.8' + + cctxt = self.client.prepare(server=new_host, version=version) + cctxt.cast(ctxt, 'migrate_volume', **msg_args) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.10') - return cctxt.call(ctxt, 'migrate_volume_completion', - volume_id=volume['id'], - new_volume_id=new_volume['id'], - error=error) + new_host = utils.extract_host(volume.host) + + msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id, + 'error': error} + if self.client.can_send_version('1.36'): + version = '1.36' + msg_args['volume'] = volume + msg_args['new_volume'] = new_volume + else: + version = '1.10' + + cctxt = self.client.prepare(server=new_host, version=version) + return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None): @@ -296,7 +313,7 @@ class VolumeAPI(object): def update_migrated_volume(self, ctxt, volume, new_volume, original_volume_status): host = utils.extract_host(new_volume['host']) - cctxt = self.client.prepare(server=host, version='1.19') + cctxt = self.client.prepare(server=host, version='1.36') cctxt.call(ctxt, 'update_migrated_volume', volume=volume,