check_exit_code=False,
run_as_root=False)])
- def test_update_migrated_volume_is_there(self):
- """Ensure that driver.update_migrated_volume() is there."""
+ @mock.patch.object(os, 'rename')
+ def test_update_migrated_available_volume(self, rename_volume):
+ self._test_update_migrated_volume('available', rename_volume)
- drv = nfs.NfsDriver(configuration=self.configuration)
+ @mock.patch.object(os, 'rename')
+ def test_update_migrated_available_volume_rename_fail(self, rename_volume):
+ self._test_update_migrated_volume('available', rename_volume,
+ rename_exception=True)
- v1 = DumbVolume()
- v2 = DumbVolume()
-
- self.assertRaises(NotImplementedError,
- drv.update_migrated_volume,
- self.context,
- v1,
- v2,
- mock.sentinel)
+ @mock.patch.object(os, 'rename')
+ def test_update_migrated_in_use_volume(self, rename_volume):
+ self._test_update_migrated_volume('in-use', rename_volume)
+
+ def _test_update_migrated_volume(self, volume_status, rename_volume,
+ rename_exception=False):
+ drv = nfs.NfsDriver(configuration=self.configuration)
+ fake_volume_id = 'vol1'
+ fake_new_volume_id = 'vol2'
+ fake_provider_source = 'fake_provider_source'
+ fake_provider = 'fake_provider'
+ base_dir = '/dir_base/'
+ volume_name_template = 'volume-%s'
+ original_volume_name = volume_name_template % fake_volume_id
+ current_name = volume_name_template % fake_new_volume_id
+ original_volume_path = base_dir + original_volume_name
+ current_path = base_dir + current_name
+ fake_volume = {'size': 1, 'id': fake_volume_id,
+ 'provider_location': fake_provider_source,
+ '_name_id': None}
+ fake_new_volume = {'size': 1, 'id': fake_new_volume_id,
+ 'provider_location': fake_provider,
+ '_name_id': None}
+
+ with mock.patch.object(drv, 'local_path') as local_path:
+ local_path.return_value = base_dir + current_name
+ if volume_status == 'in-use':
+ update = drv.update_migrated_volume(self.context,
+ fake_volume,
+ fake_new_volume,
+ volume_status)
+ self.assertEqual({'_name_id': fake_new_volume_id,
+ 'provider_location': fake_provider}, update)
+ elif rename_exception:
+ rename_volume.side_effect = OSError
+ update = drv.update_migrated_volume(self.context,
+ fake_volume,
+ fake_new_volume,
+ volume_status)
+ rename_volume.assert_called_once_with(current_path,
+ original_volume_path)
+ self.assertEqual({'_name_id': fake_new_volume_id,
+ 'provider_location': fake_provider}, update)
+ else:
+ update = drv.update_migrated_volume(self.context,
+ fake_volume,
+ fake_new_volume,
+ volume_status)
+ rename_volume.assert_called_once_with(current_path,
+ original_volume_path)
+ self.assertEqual({'_name_id': None,
+ 'provider_location': fake_provider}, update)
def test_retype_is_there(self):
"Ensure that driver.retype() is there."""
"for information on a secure NAS configuration."),
doc_html)
+ def update_migrated_volume(self, ctxt, volume, new_volume,
+ original_volume_status):
+ """Return the keys and values updated from NFS for migrated volume.
+
+ This method should rename the back-end volume name(id) on the
+ destination host back to its original name(id) on the source host.
+
+ :param ctxt: The context used to run the method update_migrated_volume
+ :param volume: The original volume that was migrated to this backend
+ :param new_volume: The migration volume object that was created on
+ this backend as part of the migration process
+ :param original_volume_status: The status of the original volume
+ :return model_update to update DB with any needed changes
+ """
+ # TODO(vhou) This method may need to be updated after
+ # NFS snapshots are introduced.
+ name_id = None
+ if original_volume_status == 'available':
+ current_name = CONF.volume_name_template % new_volume['id']
+ original_volume_name = CONF.volume_name_template % volume['id']
+ current_path = self.local_path(new_volume)
+ # Replace the volume name with the original volume name
+ original_path = current_path.replace(current_name,
+ original_volume_name)
+ try:
+ os.rename(current_path, original_path)
+ except OSError:
+ LOG.error(_LE('Unable to rename the logical volume '
+ 'for volume: %s'), volume['id'])
+ # If the rename fails, _name_id should be set to the new
+ # volume id and provider_location should be set to the
+ # one from the new volume as well.
+ name_id = new_volume['_name_id'] or new_volume['id']
+ else:
+ # The back-end will not be renamed.
+ name_id = new_volume['_name_id'] or new_volume['id']
+ return {'_name_id': name_id,
+ 'provider_location': new_volume['provider_location']}
+
def _update_volume_stats(self):
"""Retrieve stats info from volume group."""