]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Update migrate_volume API to use versionedobjects
authorThang Pham <thang.g.pham@gmail.com>
Tue, 29 Sep 2015 14:12:35 +0000 (07:12 -0700)
committerThang Pham <thang.g.pham@gmail.com>
Thu, 3 Dec 2015 16:27:16 +0000 (16:27 +0000)
The following patch updates migrate_volume,
migrate_volume_completion, and update_migrated_volume
APIs to use volume versionedobjects.  Changes were
made to be backwards compatible with older RPC clients.
It only includes changes to the core cinder code.
Changes in the drivers are left to each driver
maintainer to update.

Note that this patch DOES NOT try to use object dot
notation everywhere, since it would increase the
size of the patch.  Instead, it will be done in
subsequent patches.

Change-Id: I21fe68193c934a7ef3688274ab35f664a08cac7e
Partial-Implements: blueprint cinder-objects
Closes-Bug: #1521085

cinder/scheduler/manager.py
cinder/scheduler/rpcapi.py
cinder/tests/unit/api/contrib/test_admin_actions.py
cinder/tests/unit/scheduler/test_rpcapi.py
cinder/tests/unit/test_volume.py
cinder/tests/unit/test_volume_rpcapi.py
cinder/volume/api.py
cinder/volume/manager.py
cinder/volume/rpcapi.py

index 5764c1da35d05e145368d259f1e131da0eff7ef8..fa06628aa169c1bd007ec6921a14b4a4b2aaf2b0 100644 (file)
@@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
 class SchedulerManager(manager.Manager):
     """Chooses a host to create volumes."""
 
-    RPC_API_VERSION = '1.10'
+    RPC_API_VERSION = '1.11'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -148,13 +148,18 @@ class SchedulerManager(manager.Manager):
 
     def migrate_volume_to_host(self, context, topic, volume_id, host,
                                force_host_copy, request_spec,
-                               filter_properties=None):
+                               filter_properties=None, volume=None):
         """Ensure that the host exists and can accept the volume."""
 
         self._wait_for_scheduler()
 
+        # FIXME(thangp): Remove this in v2.0 of RPC API.
+        if volume is None:
+            # For older clients, mimic the old behavior and look up the
+            # volume by its volume_id.
+            volume = objects.Volume.get_by_id(context, volume_id)
+
         def _migrate_volume_set_error(self, context, ex, request_spec):
-            volume = db.volume_get(context, request_spec['volume_id'])
             if volume.status == 'maintenance':
                 previous_status = (
                     volume.previous_status or 'maintenance')
@@ -176,8 +181,7 @@ class SchedulerManager(manager.Manager):
             with excutils.save_and_reraise_exception():
                 _migrate_volume_set_error(self, context, ex, request_spec)
         else:
-            volume_ref = db.volume_get(context, volume_id)
-            volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
+            volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
                                                      tgt_host,
                                                      force_host_copy)
 
index 4e9769c72a93d6b114d207cae62b7fcc357097bf..e451336e7a174d46084f8ecc7419da75526bf63c 100644 (file)
@@ -44,6 +44,8 @@ class SchedulerAPI(object):
         1.8 - Add sending object over RPC in create_consistencygroup method
         1.9 - Adds support for sending objects over RPC in create_volume()
         1.10 - Adds support for sending objects over RPC in retype()
+        1.11 - Adds support for sending objects over RPC in
+               migrate_volume_to_host()
     """
 
     RPC_API_VERSION = '1.0'
@@ -95,17 +97,20 @@ class SchedulerAPI(object):
 
     def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
                                force_host_copy=False, request_spec=None,
-                               filter_properties=None):
-
-        cctxt = self.client.prepare(version='1.3')
+                               filter_properties=None, volume=None):
         request_spec_p = jsonutils.to_primitive(request_spec)
-        return cctxt.cast(ctxt, 'migrate_volume_to_host',
-                          topic=topic,
-                          volume_id=volume_id,
-                          host=host,
-                          force_host_copy=force_host_copy,
-                          request_spec=request_spec_p,
-                          filter_properties=filter_properties)
+        msg_args = {'topic': topic, 'volume_id': volume_id,
+                    'host': host, 'force_host_copy': force_host_copy,
+                    'request_spec': request_spec_p,
+                    'filter_properties': filter_properties}
+        if self.client.can_send_version('1.11'):
+            version = '1.11'
+            msg_args['volume'] = volume
+        else:
+            version = '1.3'
+
+        cctxt = self.client.prepare(version=version)
+        return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
 
     def retype(self, ctxt, topic, volume_id,
                request_spec=None, filter_properties=None, volume=None):
index a6da59f1a60e00068523b0e7d8bcc25ca057012c..abfcec0507440e9b637f18a6692acaf038c58dcc 100644 (file)
@@ -801,7 +801,7 @@ class AdminActionsTest(test.TestCase):
                              force_host_copy=False):
         admin_ctx = context.get_admin_context()
         # build request to migrate to host
-        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id)
         req.method = 'POST'
         req.headers['content-type'] = 'application/json'
         body = {'os-migrate_volume': {'host': host,
@@ -811,7 +811,7 @@ class AdminActionsTest(test.TestCase):
         resp = req.get_response(app())
         # verify status
         self.assertEqual(expected_status, resp.status_int)
-        volume = db.volume_get(admin_ctx, volume['id'])
+        volume = objects.Volume.get_by_id(admin_ctx, volume.id)
         return volume
 
     def test_migrate_volume_success(self):
index 60c935f195a79e04bf6c8fca78fa7a946efe9d62..74de0baec62245fe9f110455cc5c4e3b8c6ea888 100644 (file)
@@ -117,7 +117,9 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  version='1.2')
         can_send_version.assert_called_once_with('1.9')
 
-    def test_migrate_volume_to_host(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=True)
+    def test_migrate_volume_to_host(self, can_send_version):
         self._test_scheduler_api('migrate_volume_to_host',
                                  rpc_method='cast',
                                  topic='topic',
@@ -126,7 +128,24 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  force_host_copy=True,
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
+                                 volume='volume',
+                                 version='1.11')
+        can_send_version.assert_called_once_with('1.11')
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_migrate_volume_to_host_old(self, can_send_version):
+        self._test_scheduler_api('migrate_volume_to_host',
+                                 rpc_method='cast',
+                                 topic='topic',
+                                 volume_id='volume_id',
+                                 host='host',
+                                 force_host_copy=True,
+                                 request_spec='fake_request_spec',
+                                 filter_properties='filter_properties',
+                                 volume='volume',
                                  version='1.3')
+        can_send_version.assert_called_once_with('1.11')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=True)
index 12477fe3c2df0bcf8aa607904dc55b2519fbe208..7c9361f13fdc8ad3a173a080f073f3b7cdf06428 100644 (file)
@@ -4183,22 +4183,20 @@ class VolumeTestCase(BaseVolumeTestCase):
 
     def test_clean_temporary_volume(self):
         def fake_delete_volume(ctxt, volume):
-            db.volume_destroy(ctxt, volume['id'])
+            volume.destroy()
 
         fake_volume = tests_utils.create_volume(self.context, size=1,
-                                                host=CONF.host)
+                                                host=CONF.host,
+                                                migration_status='migrating')
         fake_new_volume = tests_utils.create_volume(self.context, size=1,
                                                     host=CONF.host)
-        # Check when the migrated volume is in migration
-        db.volume_update(self.context, fake_volume['id'],
-                         {'migration_status': 'migrating'})
         # 1. Only clean the db
-        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
-                                            fake_new_volume['id'],
+        self.volume._clean_temporary_volume(self.context, fake_volume,
+                                            fake_new_volume,
                                             clean_db_only=True)
         self.assertRaises(exception.VolumeNotFound,
                           db.volume_get, self.context,
-                          fake_new_volume['id'])
+                          fake_new_volume.id)
 
         # 2. Delete the backend storage
         fake_new_volume = tests_utils.create_volume(self.context, size=1,
@@ -4207,23 +4205,23 @@ class VolumeTestCase(BaseVolumeTestCase):
                 mock_delete_volume:
             mock_delete_volume.side_effect = fake_delete_volume
             self.volume._clean_temporary_volume(self.context,
-                                                fake_volume['id'],
-                                                fake_new_volume['id'],
+                                                fake_volume,
+                                                fake_new_volume,
                                                 clean_db_only=False)
             self.assertRaises(exception.VolumeNotFound,
                               db.volume_get, self.context,
-                              fake_new_volume['id'])
+                              fake_new_volume.id)
 
         # Check when the migrated volume is not in migration
         fake_new_volume = tests_utils.create_volume(self.context, size=1,
                                                     host=CONF.host)
-        db.volume_update(self.context, fake_volume['id'],
-                         {'migration_status': 'non-migrating'})
-        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
-                                            fake_new_volume['id'])
+        fake_volume.migration_status = 'non-migrating'
+        fake_volume.save()
+        self.volume._clean_temporary_volume(self.context, fake_volume,
+                                            fake_new_volume)
         volume = db.volume_get(context.get_admin_context(),
-                               fake_new_volume['id'])
-        self.assertIsNone(volume['migration_status'])
+                               fake_new_volume.id)
+        self.assertIsNone(volume.migration_status)
 
     def test_update_volume_readonly_flag(self):
         """Test volume readonly flag can be updated at API level."""
@@ -4323,13 +4321,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                            host=CONF.host,
                                            migration_status='migrating')
         host_obj = {'host': 'newhost', 'capabilities': {}}
-        self.volume.migrate_volume(self.context, volume['id'],
-                                   host_obj, False)
+        self.volume.migrate_volume(self.context, volume.id, host_obj, False,
+                                   volume=volume)
 
         # check volume properties
-        volume = db.volume_get(context.get_admin_context(), volume['id'])
-        self.assertEqual('newhost', volume['host'])
-        self.assertEqual('success', volume['migration_status'])
+        volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                          volume.id)
+        self.assertEqual('newhost', volume.host)
+        self.assertEqual('success', volume.migration_status)
 
     def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
                             allow_reschedule=True):
@@ -4351,12 +4350,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(processutils.ProcessExecutionError,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              False)
-            volume = db.volume_get(context.get_admin_context(), volume['id'])
-            self.assertEqual('error', volume['migration_status'])
-            self.assertEqual('available', volume['status'])
+                              False,
+                              volume=volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              volume.id)
+            self.assertEqual('error', volume.migration_status)
+            self.assertEqual('available', volume.status)
 
     @mock.patch('cinder.compute.API')
     @mock.patch('cinder.volume.manager.VolumeManager.'
@@ -4366,7 +4367,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                     migrate_volume_completion,
                                     nova_api):
         fake_volume_id = 'fake_volume_id'
-        fake_new_volume = {'status': 'available', 'id': fake_volume_id}
+        fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
+        fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
+        new_volume_obj = fake_volume.fake_volume_obj(self.context,
+                                                     **fake_new_volume)
         host_obj = {'host': 'newhost', 'capabilities': {}}
         volume_get.return_value = fake_new_volume
         update_server_volume = nova_api.return_value.update_server_volume
@@ -4377,12 +4381,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.volume._migrate_volume_generic(self.context, volume,
                                                 host_obj, None)
             mock_copy_volume.assert_called_with(self.context, volume,
-                                                fake_new_volume,
+                                                new_volume_obj,
                                                 remote='dest')
-            migrate_volume_completion.assert_called_with(self.context,
-                                                         volume['id'],
-                                                         fake_new_volume['id'],
-                                                         error=False)
+            migrate_volume_completion.assert_called_with(
+                self.context, volume.id, new_volume_obj.id, error=False)
             self.assertFalse(update_server_volume.called)
 
     @mock.patch('cinder.compute.API')
@@ -4421,6 +4423,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                                rpc_delete_volume,
                                                update_migrated_volume):
         fake_volume = tests_utils.create_volume(self.context, size=1,
+                                                previous_status='available',
                                                 host=CONF.host)
 
         host_obj = {'host': 'newhost', 'capabilities': {}}
@@ -4430,12 +4433,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
                 mock.patch.object(self.volume.driver, 'delete_volume') as \
                 delete_volume:
             create_volume.side_effect = self._fake_create_volume
-            self.volume.migrate_volume(self.context, fake_volume['id'],
-                                       host_obj, True)
-            volume = db.volume_get(context.get_admin_context(),
-                                   fake_volume['id'])
-            self.assertEqual('newhost', volume['host'])
-            self.assertEqual('success', volume['migration_status'])
+            self.volume.migrate_volume(self.context, fake_volume.id,
+                                       host_obj, True, volume=fake_volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              fake_volume.id)
+            self.assertEqual('newhost', volume.host)
+            self.assertEqual('success', volume.migration_status)
             self.assertFalse(mock_migrate_volume.called)
             self.assertFalse(delete_volume.called)
             self.assertTrue(rpc_delete_volume.called)
@@ -4461,12 +4464,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(processutils.ProcessExecutionError,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              True)
-            volume = db.volume_get(context.get_admin_context(), volume['id'])
-            self.assertEqual('error', volume['migration_status'])
-            self.assertEqual('available', volume['status'])
+                              True,
+                              volume=volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              volume.id)
+            self.assertEqual('error', volume.migration_status)
+            self.assertEqual('available', volume.status)
 
     @mock.patch('cinder.db.volume_update')
     def test_update_migrated_volume(self, volume_update):
@@ -4474,7 +4479,8 @@ class VolumeMigrationTestCase(VolumeTestCase):
         fake_new_host = 'fake_new_host'
         fake_update = {'_name_id': 'updated_id',
                        'provider_location': 'updated_location'}
-        fake_elevated = 'fake_elevated'
+        fake_elevated = context.RequestContext('fake', self.project_id,
+                                               is_admin=True)
         volume = tests_utils.create_volume(self.context, size=1,
                                            status='available',
                                            host=fake_host)
@@ -4484,13 +4490,13 @@ class VolumeMigrationTestCase(VolumeTestCase):
             provider_location='fake_provider_location',
             _name_id='fake_name_id',
             host=fake_new_host)
-        new_volume['_name_id'] = 'fake_name_id'
-        new_volume['provider_location'] = 'fake_provider_location'
-        fake_update_error = {'_name_id': new_volume['_name_id'],
+        new_volume._name_id = 'fake_name_id'
+        new_volume.provider_location = 'fake_provider_location'
+        fake_update_error = {'_name_id': new_volume._name_id,
                              'provider_location':
-                             new_volume['provider_location']}
-        expected_update = {'_name_id': volume['_name_id'],
-                           'provider_location': volume['provider_location']}
+                             new_volume.provider_location}
+        expected_update = {'_name_id': volume._name_id,
+                           'provider_location': volume.provider_location}
         with mock.patch.object(self.volume.driver,
                                'update_migrated_volume') as migrate_update,\
                 mock.patch.object(self.context, 'elevated') as elevated:
@@ -4499,19 +4505,23 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.volume.update_migrated_volume(self.context, volume,
                                                new_volume, 'available')
             volume_update.assert_has_calls((
-                mock.call(fake_elevated, new_volume['id'], expected_update),
-                mock.call(fake_elevated, volume['id'], fake_update)))
+                mock.call(fake_elevated, new_volume.id, expected_update),
+                mock.call(fake_elevated, volume.id, fake_update)))
 
             # Test the case for update_migrated_volume not implemented
             # for the driver.
             migrate_update.reset_mock()
             volume_update.reset_mock()
+            # Reset the volume objects to their original value, since they
+            # were changed in the last call.
+            new_volume._name_id = 'fake_name_id'
+            new_volume.provider_location = 'fake_provider_location'
             migrate_update.side_effect = NotImplementedError
             self.volume.update_migrated_volume(self.context, volume,
                                                new_volume, 'available')
             volume_update.assert_has_calls((
-                mock.call(fake_elevated, new_volume['id'], expected_update),
-                mock.call(fake_elevated, volume['id'], fake_update_error)))
+                mock.call(fake_elevated, new_volume.id, fake_update),
+                mock.call(fake_elevated, volume.id, fake_update_error)))
 
     def test_migrate_volume_generic_create_volume_error(self):
         self.expected_status = 'error'
@@ -4530,10 +4540,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(exception.VolumeMigrationFailed,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              True)
-            volume = db.volume_get(context.get_admin_context(), volume['id'])
+                              True,
+                              volume=volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              volume.id)
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
             self.assertTrue(clean_temporary_volume.called)
@@ -4558,10 +4570,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(exception.VolumeMigrationFailed,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              True)
-            volume = db.volume_get(context.get_admin_context(), volume['id'])
+                              True,
+                              volume=volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              volume.id)
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
             self.assertTrue(clean_temporary_volume.called)
@@ -4588,10 +4602,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(processutils.ProcessExecutionError,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              True)
-            volume = db.volume_get(context.get_admin_context(), volume['id'])
+                              True,
+                              volume=volume)
+            volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                              volume.id)
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
 
@@ -4634,9 +4650,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
             self.assertRaises(processutils.ProcessExecutionError,
                               self.volume.migrate_volume,
                               self.context,
-                              volume['id'],
+                              volume.id,
                               host_obj,
-                              True)
+                              True,
+                              volume=volume)
             volume = db.volume_get(context.get_admin_context(), volume['id'])
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
@@ -4649,7 +4666,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                         previous_status='available'):
         def fake_attach_volume(ctxt, volume, instance_uuid, host_name,
                                mountpoint, mode):
-            tests_utils.attach_volume(ctxt, volume['id'],
+            tests_utils.attach_volume(ctxt, volume.id,
                                       instance_uuid, host_name,
                                       '/dev/vda')
 
@@ -4661,12 +4678,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                                previous_status=previous_status)
         attachment_id = None
         if status == 'in-use':
-            vol = tests_utils.attach_volume(self.context, old_volume['id'],
+            vol = tests_utils.attach_volume(self.context, old_volume.id,
                                             instance_uuid, attached_host,
                                             '/dev/vda')
             self.assertEqual('in-use', vol['status'])
             attachment_id = vol['volume_attachment'][0]['id']
-        target_status = 'target:%s' % old_volume['id']
+        target_status = 'target:%s' % old_volume.id
         new_host = CONF.host + 'new'
         new_volume = tests_utils.create_volume(self.context, size=0,
                                                host=new_host,
@@ -4681,16 +4698,18 @@ class VolumeMigrationTestCase(VolumeTestCase):
                                   'update_migrated_volume'),\
                 mock.patch.object(self.volume.driver, 'attach_volume'):
             mock_attach_volume.side_effect = fake_attach_volume
-            self.volume.migrate_volume_completion(self.context, old_volume[
-                'id'], new_volume['id'])
-            after_new_volume = db.volume_get(self.context, new_volume.id)
-            after_old_volume = db.volume_get(self.context, old_volume.id)
+            self.volume.migrate_volume_completion(self.context, old_volume.id,
+                                                  new_volume.id)
+            after_new_volume = objects.Volume.get_by_id(self.context,
+                                                        new_volume.id)
+            after_old_volume = objects.Volume.get_by_id(self.context,
+                                                        old_volume.id)
             if status == 'in-use':
                 mock_detach_volume.assert_called_with(self.context,
-                                                      old_volume['id'],
+                                                      old_volume.id,
                                                       attachment_id)
                 attachment = db.volume_attachment_get_by_instance_uuid(
-                    self.context, old_volume['id'], instance_uuid)
+                    self.context, old_volume.id, instance_uuid)
                 self.assertIsNotNone(attachment)
                 self.assertEqual(attached_host, attachment['attached_host'])
                 self.assertEqual(instance_uuid, attachment['instance_uuid'])
@@ -4865,10 +4884,11 @@ class VolumeMigrationTestCase(VolumeTestCase):
         self.volume.driver._initialized = False
         self.assertRaises(exception.DriverNotInitialized,
                           self.volume.migrate_volume,
-                          self.context, volume['id'],
-                          host_obj, True)
+                          self.context, volume.id, host_obj, True,
+                          volume=volume)
 
-        volume = db.volume_get(context.get_admin_context(), volume['id'])
+        volume = objects.Volume.get_by_id(context.get_admin_context(),
+                                          volume.id)
         self.assertEqual('error', volume.migration_status)
 
         # lets cleanup the mess.
index ccb2f0265a64da95c018f7383dbb355b6018a7a0..22f0c906c4d6efb817c17a320cbc13daa3641cc4 100644 (file)
@@ -146,7 +146,6 @@ class VolumeRpcAPITestCase(test.TestCase):
             expected_msg['host'] = dest_host_dict
         if 'new_volume' in expected_msg:
             volume = expected_msg['new_volume']
-            del expected_msg['new_volume']
             expected_msg['new_volume_id'] = volume['id']
 
         if 'host' in kwargs:
@@ -392,7 +391,9 @@ class VolumeRpcAPITestCase(test.TestCase):
                               version='1.14')
         can_send_version.assert_called_once_with('1.35')
 
-    def test_migrate_volume(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=True)
+    def test_migrate_volume(self, can_send_version):
         class FakeHost(object):
             def __init__(self):
                 self.host = 'host'
@@ -400,18 +401,49 @@ class VolumeRpcAPITestCase(test.TestCase):
         dest_host = FakeHost()
         self._test_volume_api('migrate_volume',
                               rpc_method='cast',
-                              volume=self.fake_volume,
+                              volume=self.fake_volume_obj,
+                              dest_host=dest_host,
+                              force_host_copy=True,
+                              version='1.36')
+        can_send_version.assert_called_once_with('1.36')
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_migrate_volume_old(self, can_send_version):
+        class FakeHost(object):
+            def __init__(self):
+                self.host = 'host'
+                self.capabilities = {}
+        dest_host = FakeHost()
+        self._test_volume_api('migrate_volume',
+                              rpc_method='cast',
+                              volume=self.fake_volume_obj,
                               dest_host=dest_host,
                               force_host_copy=True,
                               version='1.8')
+        can_send_version.assert_called_once_with('1.36')
 
-    def test_migrate_volume_completion(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=True)
+    def test_migrate_volume_completion(self, can_send_version):
         self._test_volume_api('migrate_volume_completion',
                               rpc_method='call',
-                              volume=self.fake_volume,
-                              new_volume=self.fake_volume,
+                              volume=self.fake_volume_obj,
+                              new_volume=self.fake_volume_obj,
+                              error=False,
+                              version='1.36')
+        can_send_version.assert_called_once_with('1.36')
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_migrate_volume_completion_old(self, can_send_version):
+        self._test_volume_api('migrate_volume_completion',
+                              rpc_method='call',
+                              volume=self.fake_volume_obj,
+                              new_volume=self.fake_volume_obj,
                               error=False,
                               version='1.10')
+        can_send_version.assert_called_once_with('1.36')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=True)
index ac01271a3ea7fcac94217b7be276fbf6f2dd3240..9341399a4cf48370bc7cbaddf58b10d11b1fe7e8 100644 (file)
@@ -1275,39 +1275,38 @@ class API(base.Base):
                        lock_volume):
         """Migrate the volume to the specified host."""
 
-        if volume['status'] not in ['available', 'in-use']:
+        if volume.status not in ['available', 'in-use']:
             msg = _('Volume %(vol_id)s status must be available or in-use, '
                     'but current status is: '
-                    '%(vol_status)s.') % {'vol_id': volume['id'],
-                                          'vol_status': volume['status']}
+                    '%(vol_status)s.') % {'vol_id': volume.id,
+                                          'vol_status': volume.status}
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
         # Make sure volume is not part of a migration.
         if self._is_volume_migrating(volume):
             msg = _("Volume %s is already part of an active "
-                    "migration.") % volume['id']
+                    "migration.") % volume.id
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
         # We only handle volumes without snapshots for now
-        snaps = objects.SnapshotList.get_all_for_volume(context, volume['id'])
+        snaps = objects.SnapshotList.get_all_for_volume(context, volume.id)
         if snaps:
-            msg = _("Volume %s must not have snapshots.") % volume['id']
+            msg = _("Volume %s must not have snapshots.") % volume.id
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
         # We only handle non-replicated volumes for now
-        rep_status = volume['replication_status']
-        if rep_status is not None and rep_status != 'disabled':
-            msg = _("Volume %s must not be replicated.") % volume['id']
+        if (volume.replication_status is not None and
+                volume.replication_status != 'disabled'):
+            msg = _("Volume %s must not be replicated.") % volume.id
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
-        cg_id = volume.get('consistencygroup_id', None)
-        if cg_id:
+        if volume.consistencygroup_id:
             msg = _("Volume %s must not be part of a consistency "
-                    "group.") % volume['id']
+                    "group.") % volume.id
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
@@ -1327,7 +1326,7 @@ class API(base.Base):
             raise exception.InvalidHost(reason=msg)
 
         # Make sure the destination host is different than the current one
-        if host == volume['host']:
+        if host == volume.host:
             msg = _('Destination host must be different '
                     'than the current host.')
             LOG.error(msg)
@@ -1340,27 +1339,27 @@ class API(base.Base):
         # that this volume is in maintenance mode, and no action is allowed
         # on this volume, e.g. attach, detach, retype, migrate, etc.
         updates = {'migration_status': 'starting',
-                   'previous_status': volume['status']}
-        if lock_volume and volume['status'] == 'available':
+                   'previous_status': volume.status}
+        if lock_volume and volume.status == 'available':
             updates['status'] = 'maintenance'
         self.update(context, volume, updates)
 
         # Call the scheduler to ensure that the host exists and that it can
         # accept the volume
         volume_type = {}
-        volume_type_id = volume['volume_type_id']
-        if volume_type_id:
+        if volume.volume_type_id:
             volume_type = volume_types.get_volume_type(context.elevated(),
-                                                       volume_type_id)
+                                                       volume.volume_type_id)
         request_spec = {'volume_properties': volume,
                         'volume_type': volume_type,
-                        'volume_id': volume['id']}
+                        'volume_id': volume.id}
         self.scheduler_rpcapi.migrate_volume_to_host(context,
                                                      CONF.volume_topic,
-                                                     volume['id'],
+                                                     volume.id,
                                                      host,
                                                      force_host_copy,
-                                                     request_spec)
+                                                     request_spec,
+                                                     volume=volume)
         LOG.info(_LI("Migrate volume request issued successfully."),
                  resource=volume)
 
@@ -1368,34 +1367,34 @@ class API(base.Base):
     def migrate_volume_completion(self, context, volume, new_volume, error):
         # This is a volume swap initiated by Nova, not Cinder. Nova expects
         # us to return the new_volume_id.
-        if not (volume['migration_status'] or new_volume['migration_status']):
+        if not (volume.migration_status or new_volume.migration_status):
             # Don't need to do migration, but still need to finish the
             # volume attach and detach so volumes don't end in 'attaching'
             # and 'detaching' state
-            attachments = volume['volume_attachment']
+            attachments = volume.volume_attachment
             for attachment in attachments:
-                self.detach(context, volume, attachment['id'])
+                self.detach(context, volume, attachment.id)
 
                 self.attach(context, new_volume,
-                            attachment['instance_uuid'],
-                            attachment['attached_host'],
-                            attachment['mountpoint'],
+                            attachment.instance_uuid,
+                            attachment.attached_host,
+                            attachment.mountpoint,
                             'rw')
 
-            return new_volume['id']
+            return new_volume.id
 
-        if not volume['migration_status']:
+        if not volume.migration_status:
             msg = _('Source volume not mid-migration.')
             raise exception.InvalidVolume(reason=msg)
 
-        if not new_volume['migration_status']:
+        if not new_volume.migration_status:
             msg = _('Destination volume not mid-migration.')
             raise exception.InvalidVolume(reason=msg)
 
-        expected_status = 'target:%s' % volume['id']
-        if not new_volume['migration_status'] == expected_status:
+        expected_status = 'target:%s' % volume.id
+        if not new_volume.migration_status == expected_status:
             msg = (_('Destination has migration_status %(stat)s, expected '
-                     '%(exp)s.') % {'stat': new_volume['migration_status'],
+                     '%(exp)s.') % {'stat': new_volume.migration_status,
                                     'exp': expected_status})
             raise exception.InvalidVolume(reason=msg)
 
index 86908de0f7022502ee78785ed1b91e5fba7373ad..d63e2360c89d0a5a061b9abca4516aa29997ec83 100644 (file)
@@ -197,7 +197,7 @@ def locked_snapshot_operation(f):
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.35'
+    RPC_API_VERSION = '1.36'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -1626,35 +1626,38 @@ class VolumeManager(manager.SchedulerDependentManager):
         # Wait for new_volume to become ready
         starttime = time.time()
         deadline = starttime + CONF.migration_create_volume_timeout_secs
-        new_volume = self.db.volume_get(ctxt, new_volume['id'])
+        # TODO(thangp): Replace get_by_id with refresh when it is available
+        new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
         tries = 0
-        while new_volume['status'] != 'available':
+        while new_volume.status != 'available':
             tries += 1
             now = time.time()
-            if new_volume['status'] == 'error':
+            if new_volume.status == 'error':
                 msg = _("failed to create new_volume on destination host")
-                self._clean_temporary_volume(ctxt, volume['id'],
-                                             new_volume['id'],
+                self._clean_temporary_volume(ctxt, volume,
+                                             new_volume,
                                              clean_db_only=True)
                 raise exception.VolumeMigrationFailed(reason=msg)
             elif now > deadline:
                 msg = _("timeout creating new_volume on destination host")
-                self._clean_temporary_volume(ctxt, volume['id'],
-                                             new_volume['id'],
+                self._clean_temporary_volume(ctxt, volume,
+                                             new_volume,
                                              clean_db_only=True)
                 raise exception.VolumeMigrationFailed(reason=msg)
             else:
                 time.sleep(tries ** 2)
-            new_volume = self.db.volume_get(ctxt, new_volume['id'])
+            # TODO(thangp): Replace get_by_id with refresh when it is
+            # available
+            new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
 
         # Copy the source volume to the destination volume
         try:
-            attachments = volume['volume_attachment']
+            attachments = volume.volume_attachment
             if not attachments:
                 self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
                 # The above call is synchronous so we complete the migration
-                self.migrate_volume_completion(ctxt, volume['id'],
-                                               new_volume['id'],
+                self.migrate_volume_completion(ctxt, volume.id,
+                                               new_volume.id,
                                                error=False)
             else:
                 nova_api = compute.API()
@@ -1663,58 +1666,63 @@ class VolumeManager(manager.SchedulerDependentManager):
                 for attachment in attachments:
                     instance_uuid = attachment['instance_uuid']
                     nova_api.update_server_volume(ctxt, instance_uuid,
-                                                  volume['id'],
-                                                  new_volume['id'])
+                                                  volume.id,
+                                                  new_volume.id)
         except Exception:
             with excutils.save_and_reraise_exception():
                 LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"),
-                          {'vol1': volume['id'], 'vol2': new_volume['id']})
-                self._clean_temporary_volume(ctxt, volume['id'],
-                                             new_volume['id'])
+                          {'vol1': volume.id, 'vol2': new_volume.id})
+                self._clean_temporary_volume(ctxt, volume,
+                                             new_volume)
 
-    def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id,
+    def _clean_temporary_volume(self, ctxt, volume, new_volume,
                                 clean_db_only=False):
-        volume = self.db.volume_get(ctxt, volume_id)
         # If we're in the migrating phase, we need to cleanup
         # destination volume because source volume is remaining
-        if volume['migration_status'] == 'migrating':
+        if volume.migration_status == 'migrating':
             try:
                 if clean_db_only:
                     # The temporary volume is not created, only DB data
                     # is created
-                    self.db.volume_destroy(ctxt, new_volume_id)
+                    new_volume.destroy()
                 else:
                     # The temporary volume is already created
                     rpcapi = volume_rpcapi.VolumeAPI()
-                    volume = self.db.volume_get(ctxt, new_volume_id)
-                    rpcapi.delete_volume(ctxt, volume)
+                    rpcapi.delete_volume(ctxt, new_volume)
             except exception.VolumeNotFound:
                 LOG.info(_LI("Couldn't find the temporary volume "
                              "%(vol)s in the database. There is no need "
                              "to clean up this volume."),
-                         {'vol': new_volume_id})
+                         {'vol': new_volume.id})
         else:
             # If we're in the completing phase don't delete the
             # destination because we may have already deleted the
             # source! But the migration_status in database should
             # be cleared to handle volume after migration failure
             try:
-                updates = {'migration_status': None}
-                self.db.volume_update(ctxt, new_volume_id, updates)
+                new_volume.migration_status = None
+                new_volume.save()
             except exception.VolumeNotFound:
                 LOG.info(_LI("Couldn't find destination volume "
                              "%(vol)s in the database. The entry might be "
                              "successfully deleted during migration "
                              "completion phase."),
-                         {'vol': new_volume_id})
+                         {'vol': new_volume.id})
 
                 LOG.warning(_LW("Failed to migrate volume. The destination "
                                 "volume %(vol)s is not deleted since the "
                                 "source volume may have been deleted."),
-                            {'vol': new_volume_id})
+                            {'vol': new_volume.id})
 
     def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
-                                  error=False):
+                                  error=False, volume=None, new_volume=None):
+        # FIXME(thangp): Remove this in v2.0 of RPC API.
+        if volume is None or new_volume is None:
+            # For older clients, mimic the old behavior and look up the volume
+            # by its volume_id.
+            volume = objects.Volume.get_by_id(ctxt, volume_id)
+            new_volume = objects.Volume.get_by_id(ctxt, new_volume_id)
+
         try:
             # NOTE(flaper87): Verify the driver is enabled
             # before going forward. The exception will be caught
@@ -1722,37 +1730,36 @@ class VolumeManager(manager.SchedulerDependentManager):
             utils.require_driver_initialized(self.driver)
         except exception.DriverNotInitialized:
             with excutils.save_and_reraise_exception():
-                self.db.volume_update(ctxt, volume_id,
-                                      {'migration_status': 'error'})
+                volume.migration_status = 'error'
+                volume.save()
 
         LOG.debug("migrate_volume_completion: completing migration for "
                   "volume %(vol1)s (temporary volume %(vol2)s",
-                  {'vol1': volume_id, 'vol2': new_volume_id})
-        volume = self.db.volume_get(ctxt, volume_id)
-        new_volume = self.db.volume_get(ctxt, new_volume_id)
+                  {'vol1': volume.id, 'vol2': new_volume.id})
         rpcapi = volume_rpcapi.VolumeAPI()
 
-        orig_volume_status = volume['previous_status']
+        orig_volume_status = volume.previous_status
 
         if error:
             LOG.info(_LI("migrate_volume_completion is cleaning up an error "
                          "for volume %(vol1)s (temporary volume %(vol2)s"),
-                     {'vol1': volume['id'], 'vol2': new_volume['id']})
+                     {'vol1': volume['id'], 'vol2': new_volume.id})
             rpcapi.delete_volume(ctxt, new_volume)
             updates = {'migration_status': 'error',
                        'status': orig_volume_status}
-            self.db.volume_update(ctxt, volume_id, updates)
-            return volume_id
+            volume.update(updates)
+            volume.save()
+            return volume.id
 
-        self.db.volume_update(ctxt, volume_id,
-                              {'migration_status': 'completing'})
+        volume.migration_status = 'completing'
+        volume.save()
 
         # Detach the source volume (if it fails, don't fail the migration)
         try:
             if orig_volume_status == 'in-use':
-                attachments = volume['volume_attachment']
+                attachments = volume.volume_attachment
                 for attachment in attachments:
-                    self.detach_volume(ctxt, volume_id, attachment['id'])
+                    self.detach_volume(ctxt, volume.id, attachment['id'])
         except Exception as ex:
             LOG.error(_LE("Detach migration source volume failed:  %(err)s"),
                       {'err': ex}, resource=volume)
@@ -1767,20 +1774,21 @@ class VolumeManager(manager.SchedulerDependentManager):
         # Swap src and dest DB records so we can continue using the src id and
         # asynchronously delete the destination id
         __, updated_new = self.db.finish_volume_migration(
-            ctxt, volume_id, new_volume_id)
+            ctxt, volume.id, new_volume.id)
         updates = {'status': orig_volume_status,
-                   'previous_status': volume['status'],
+                   'previous_status': volume.status,
                    'migration_status': 'success'}
 
         if orig_volume_status == 'in-use':
-            attachments = volume['volume_attachment']
+            attachments = volume.volume_attachment
             for attachment in attachments:
                 rpcapi.attach_volume(ctxt, volume,
                                      attachment['instance_uuid'],
                                      attachment['attached_host'],
                                      attachment['mountpoint'],
                                      'rw')
-        self.db.volume_update(ctxt, volume_id, updates)
+        volume.update(updates)
+        volume.save()
 
         # Asynchronous deletion of the source volume in the back-end (now
         # pointed by the target volume id)
@@ -1789,15 +1797,21 @@ class VolumeManager(manager.SchedulerDependentManager):
         except Exception as ex:
             LOG.error(_LE('Failed to request async delete of migration source '
                           'vol %(vol)s: %(err)s'),
-                      {'vol': volume_id, 'err': ex})
+                      {'vol': volume.id, 'err': ex})
 
         LOG.info(_LI("Complete-Migrate volume completed successfully."),
                  resource=volume)
-        return volume['id']
+        return volume.id
 
     def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
-                       new_type_id=None):
+                       new_type_id=None, volume=None):
         """Migrate the volume to the specified host (called on source host)."""
+        # FIXME(thangp): Remove this in v2.0 of RPC API.
+        if volume is None:
+            # For older clients, mimic the old behavior and look up the volume
+            # by its volume_id.
+            volume = objects.Volume.get_by_id(context, volume_id)
+
         try:
             # NOTE(flaper87): Verify the driver is enabled
             # before going forward. The exception will be caught
@@ -1805,54 +1819,54 @@ class VolumeManager(manager.SchedulerDependentManager):
             utils.require_driver_initialized(self.driver)
         except exception.DriverNotInitialized:
             with excutils.save_and_reraise_exception():
-                self.db.volume_update(ctxt, volume_id,
-                                      {'migration_status': 'error'})
+                volume.migration_status = 'error'
+                volume.save()
 
-        volume_ref = self.db.volume_get(ctxt, volume_id)
         model_update = None
         moved = False
 
         status_update = None
-        if volume_ref['status'] in ('retyping', 'maintenance'):
-            status_update = {'status': volume_ref['previous_status']}
+        if volume.status in ('retyping', 'maintenance'):
+            status_update = {'status': volume.previous_status}
 
-        self.db.volume_update(ctxt, volume_ref['id'],
-                              {'migration_status': 'migrating'})
+        volume.migration_status = 'migrating'
+        volume.save()
         if not force_host_copy and new_type_id is None:
             try:
-                LOG.debug("Issue driver.migrate_volume.", resource=volume_ref)
+                LOG.debug("Issue driver.migrate_volume.", resource=volume)
                 moved, model_update = self.driver.migrate_volume(ctxt,
-                                                                 volume_ref,
+                                                                 volume,
                                                                  host)
                 if moved:
                     updates = {'host': host['host'],
                                'migration_status': 'success',
-                               'previous_status': volume_ref['status']}
+                               'previous_status': volume.status}
                     if status_update:
                         updates.update(status_update)
                     if model_update:
                         updates.update(model_update)
-                    volume_ref = self.db.volume_update(ctxt,
-                                                       volume_ref['id'],
-                                                       updates)
+                    volume.update(updates)
+                    volume.save()
             except Exception:
                 with excutils.save_and_reraise_exception():
                     updates = {'migration_status': 'error'}
                     if status_update:
                         updates.update(status_update)
-                    self.db.volume_update(ctxt, volume_ref['id'], updates)
+                    volume.update(updates)
+                    volume.save()
         if not moved:
             try:
-                self._migrate_volume_generic(ctxt, volume_ref, host,
+                self._migrate_volume_generic(ctxt, volume, host,
                                              new_type_id)
             except Exception:
                 with excutils.save_and_reraise_exception():
                     updates = {'migration_status': 'error'}
                     if status_update:
                         updates.update(status_update)
-                    self.db.volume_update(ctxt, volume_ref['id'], updates)
+                    volume.update(updates)
+                    volume.save()
         LOG.info(_LI("Migrate volume completed successfully."),
-                 resource=volume_ref)
+                 resource=volume)
 
     @periodic_task.periodic_task
     def _report_driver_status(self, context):
@@ -3088,14 +3102,16 @@ class VolumeManager(manager.SchedulerDependentManager):
     def update_migrated_volume(self, ctxt, volume, new_volume,
                                volume_status):
         """Finalize migration process on backend device."""
+        # FIXME(thangp): Remove this in v2.0 of RPC API.
+        if (not isinstance(volume, objects.Volume) or
+                not isinstance(new_volume, objects.Volume)):
+            volume = objects.Volume.get_by_id(ctxt, volume['id'])
+            new_volume = objects.Volume.get_by_id(ctxt, new_volume['id'])
+
         model_update = None
-        # This is temporary fix for bug 1491210.
-        volume = self.db.volume_get(ctxt, volume['id'])
-        new_volume = self.db.volume_get(ctxt, new_volume['id'])
-        model_update_default = {'_name_id': new_volume['_name_id'] or
-                                new_volume['id'],
+        model_update_default = {'_name_id': new_volume.name_id,
                                 'provider_location':
-                                new_volume['provider_location']}
+                                new_volume.provider_location}
         try:
             model_update = self.driver.update_migrated_volume(ctxt,
                                                               volume,
@@ -3119,17 +3135,19 @@ class VolumeManager(manager.SchedulerDependentManager):
                     if volume.get('volume_metadata'):
                         model_update_new[key] = {
                             metadata['key']: metadata['value']
-                            for metadata in volume.get('volume_metadata')}
+                            for metadata in volume.volume_metadata}
                 elif key == 'admin_metadata':
                     model_update_new[key] = {
                         metadata['key']: metadata['value']
-                        for metadata in volume.get('volume_admin_metadata')}
+                        for metadata in volume.volume_admin_metadata}
                 else:
                     model_update_new[key] = volume[key]
-            self.db.volume_update(ctxt.elevated(), new_volume['id'],
-                                  model_update_new)
-        self.db.volume_update(ctxt.elevated(), volume['id'],
-                              model_update_default)
+            with new_volume.obj_as_admin():
+                new_volume.update(model_update_new)
+                new_volume.save()
+        with volume.obj_as_admin():
+                volume.update(model_update_default)
+                volume.save()
 
     # Replication V2 methods
     def enable_replication(self, context, volume):
index c81e78341be1ed8a8f2dbeb69ae230f57741650d..035b4d595561f88dd811e24b04652c742eb091c9 100644 (file)
@@ -83,6 +83,8 @@ class VolumeAPI(object):
         1.33 - Adds support for sending objects over RPC in delete_volume().
         1.34 - Adds support for sending objects over RPC in retype().
         1.35 - Adds support for sending objects over RPC in extend_volume().
+        1.36 - Adds support for sending objects over RPC in migrate_volume(),
+               migrate_volume_completion(), and update_migrated_volume().
     """
 
     BASE_RPC_API_VERSION = '1.0'
@@ -246,20 +248,35 @@ class VolumeAPI(object):
         cctxt.cast(ctxt, 'extend_volume', **msg_args)
 
     def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.8')
+        new_host = utils.extract_host(volume.host)
         host_p = {'host': dest_host.host,
                   'capabilities': dest_host.capabilities}
-        cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
-                   host=host_p, force_host_copy=force_host_copy)
+
+        msg_args = {'volume_id': volume.id, 'host': host_p,
+                    'force_host_copy': force_host_copy}
+        if self.client.can_send_version('1.36'):
+            version = '1.36'
+            msg_args['volume'] = volume
+        else:
+            version = '1.8'
+
+        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt.cast(ctxt, 'migrate_volume', **msg_args)
 
     def migrate_volume_completion(self, ctxt, volume, new_volume, error):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.10')
-        return cctxt.call(ctxt, 'migrate_volume_completion',
-                          volume_id=volume['id'],
-                          new_volume_id=new_volume['id'],
-                          error=error)
+        new_host = utils.extract_host(volume.host)
+
+        msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
+                    'error': error}
+        if self.client.can_send_version('1.36'):
+            version = '1.36'
+            msg_args['volume'] = volume
+            msg_args['new_volume'] = new_volume
+        else:
+            version = '1.10'
+
+        cctxt = self.client.prepare(server=new_host, version=version)
+        return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
 
     def retype(self, ctxt, volume, new_type_id, dest_host,
                migration_policy='never', reservations=None):
@@ -296,7 +313,7 @@ class VolumeAPI(object):
     def update_migrated_volume(self, ctxt, volume, new_volume,
                                original_volume_status):
         host = utils.extract_host(new_volume['host'])
-        cctxt = self.client.prepare(server=host, version='1.19')
+        cctxt = self.client.prepare(server=host, version='1.36')
         cctxt.call(ctxt,
                    'update_migrated_volume',
                    volume=volume,