From: Jon Bernard Date: Mon, 1 Jun 2015 18:09:30 +0000 (-0400) Subject: Add support for file I/O volume migration X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=f586043fa969b9d1dcf4933aacbf615f53691093;p=openstack-build%2Fcinder-build.git Add support for file I/O volume migration This patch changes the generic volume migration logic to support non-attachable volumes. Non-attachable refers to volume drivers that do not support attachment via the typical iSCSI or similar protocols where a block device is made available on the host machine. Device drivers such as RBD make volumes available to cinder via a file handle that proxies read() and write() calls to the Ceph cluster. This patch improves the generic migration logic to determine whether a migration operation can proceed with dd using block device paths or file operations on handles returned from the os-brick connectors. Changes to the RBD driver are included to correctly rename the target volume during the completion phase of a successful migration. It appears there is still some work to be done for attached in-use volume migration for certain configurations. Successful tests were seen for: LVM to LVM (available and in-use) LVM to/from NFS (available and in-use) LVM to/from Ceph (available) Ceph to LVM (in-use) NFS to/from Ceph (available) Ceph to NFS (in-use) Failures were seen (due to Nova) for the following: LVM to Ceph (in-use) NFS to Ceph (in-use) (Pulled from gate, cinder can no longer pass unit tests) Blueprint: generic-volume-migration Closes-Bug: #1489335 Closes-Bug: #1489337 Change-Id: Iece2776fa751152f97b389ddab426e50c6f79bea --- diff --git a/cinder/tests/unit/test_rbd.py b/cinder/tests/unit/test_rbd.py index 101d05b3c..d407bb917 100644 --- a/cinder/tests/unit/test_rbd.py +++ b/cinder/tests/unit/test_rbd.py @@ -863,6 +863,30 @@ class RBDTestCase(test.TestCase): self.assertTrue(self.driver.retype(context, fake_volume, fake_type, diff, host)) + @common_mocks + def test_update_migrated_volume(self): + client = self.mock_client.return_value + client.__enter__.return_value = client + + with mock.patch.object(self.driver.rbd.RBD(), 'rename') as mock_rename: + context = {} + current_volume = {'id': 'curr_id', + 'name': 'curr_name', + 'provider_location': 'curr_provider_location'} + original_volume = {'id': 'orig_id', + 'name': 'orig_name', + 'provider_location': 'orig_provider_location'} + mock_rename.return_value = 0 + model_update = self.driver.update_migrated_volume(context, + original_volume, + current_volume, + 'available') + mock_rename.assert_called_with(client.ioctx, + 'volume-%s' % current_volume['id'], + 'volume-%s' % original_volume['id']) + self.assertEqual({'_name_id': None, + 'provider_location': None}, model_update) + def test_rbd_volume_proxy_init(self): mock_driver = mock.Mock(name='driver') mock_driver._connect_to_rados.return_value = (None, None) diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index 537e15869..35f7293dc 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -40,7 +40,6 @@ from taskflow.engines.action_engine import engine from cinder.api import common from cinder.brick.local_dev import lvm as brick_lvm -from cinder.compute import nova from cinder import context from cinder import db from cinder import exception @@ -4243,20 +4242,21 @@ class VolumeTestCase(BaseVolumeTestCase): self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) - @mock.patch.object(nova.API, 'update_server_volume') + @mock.patch('cinder.compute.API') @mock.patch('cinder.volume.manager.VolumeManager.' 'migrate_volume_completion') @mock.patch('cinder.db.volume_get') def test_migrate_volume_generic(self, volume_get, migrate_volume_completion, - update_server_volume): + nova_api): fake_volume_id = 'fake_volume_id' fake_new_volume = {'status': 'available', 'id': fake_volume_id} host_obj = {'host': 'newhost', 'capabilities': {}} volume_get.return_value = fake_new_volume + update_server_volume = nova_api.return_value.update_server_volume volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) - with mock.patch.object(self.volume.driver, 'copy_volume_data') as \ + with mock.patch.object(self.volume, '_copy_volume_data') as \ mock_copy_volume: self.volume._migrate_volume_generic(self.context, volume, host_obj, None) @@ -4267,19 +4267,21 @@ class VolumeTestCase(BaseVolumeTestCase): volume['id'], fake_new_volume['id'], error=False) + self.assertFalse(update_server_volume.called) - @mock.patch.object(nova.API, 'update_server_volume') + @mock.patch('cinder.compute.API') @mock.patch('cinder.volume.manager.VolumeManager.' 'migrate_volume_completion') @mock.patch('cinder.db.volume_get') def test_migrate_volume_generic_attached_volume(self, volume_get, migrate_volume_completion, - update_server_volume): + nova_api): attached_host = 'some-host' fake_volume_id = 'fake_volume_id' fake_new_volume = {'status': 'available', 'id': fake_volume_id} host_obj = {'host': 'newhost', 'capabilities': {}} fake_uuid = fakes.get_fake_uuid() + update_server_volume = nova_api.return_value.update_server_volume volume_get.return_value = fake_new_volume volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) @@ -4293,12 +4295,8 @@ class VolumeTestCase(BaseVolumeTestCase): self.volume._migrate_volume_generic(self.context, volume, host_obj, None) self.assertFalse(migrate_volume_completion.called) - with mock.patch.object(self.volume.driver, 'copy_volume_data') as \ - mock_copy_volume: - self.volume._migrate_volume_generic(self.context, volume, - host_obj, None) - self.assertFalse(mock_copy_volume.called) - self.assertFalse(migrate_volume_completion.called) + update_server_volume.assert_called_with(self.context, fake_uuid, + volume['id'], fake_volume_id) @mock.patch.object(volume_rpcapi.VolumeAPI, 'update_migrated_volume') @mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') @@ -4312,7 +4310,7 @@ class VolumeTestCase(BaseVolumeTestCase): host_obj = {'host': 'newhost', 'capabilities': {}} with mock.patch.object(self.volume.driver, 'migrate_volume') as \ mock_migrate_volume,\ - mock.patch.object(self.volume.driver, 'copy_volume_data'), \ + mock.patch.object(self.volume, '_copy_volume_data'),\ mock.patch.object(self.volume.driver, 'delete_volume') as \ delete_volume: create_volume.side_effect = self._fake_create_volume @@ -4331,7 +4329,7 @@ class VolumeTestCase(BaseVolumeTestCase): with mock.patch.object(self.volume.driver, 'migrate_volume'),\ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\ as mock_create_volume,\ - mock.patch.object(self.volume.driver, 'copy_volume_data') as \ + mock.patch.object(self.volume, '_copy_volume_data') as \ mock_copy_volume,\ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\ mock.patch.object(self.volume, 'migrate_volume_completion'),\ @@ -4455,7 +4453,7 @@ class VolumeTestCase(BaseVolumeTestCase): with mock.patch.object(self.volume.driver, 'migrate_volume'),\ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\ as mock_create_volume,\ - mock.patch.object(self.volume.driver, 'copy_volume_data') as \ + mock.patch.object(self.volume, '_copy_volume_data') as \ mock_copy_volume,\ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\ mock.patch.object(self.volume, 'migrate_volume_completion'),\ @@ -4489,16 +4487,21 @@ class VolumeTestCase(BaseVolumeTestCase): with mock.patch.object(self.volume.driver, 'migrate_volume'),\ mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\ as mock_create_volume,\ - mock.patch.object(self.volume.driver, 'copy_volume_data'),\ mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\ mock.patch.object(self.volume, 'migrate_volume_completion')\ as mock_migrate_compl,\ - mock.patch.object(self.volume.driver, 'create_export'): + mock.patch.object(self.volume.driver, 'create_export'), \ + mock.patch.object(self.volume, '_attach_volume'), \ + mock.patch.object(self.volume, '_detach_volume'), \ + mock.patch.object(os_brick.initiator.connector, + 'get_connector_properties') \ + as mock_get_connector_properties: # Exception case at delete_volume # source_volume['migration_status'] is 'completing' mock_create_volume.side_effect = self._fake_create_volume mock_migrate_compl.side_effect = fake_migrate_volume_completion + mock_get_connector_properties.return_value = {} volume = tests_utils.create_volume(self.context, size=0, host=CONF.host) host_obj = {'host': 'newhost', 'capabilities': {}} diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 9df7f0c95..db93e6085 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -409,3 +409,9 @@ class VolumeRpcAPITestCase(test.TestCase): host='fake_host', discover=True, version='1.29') + + def test_remove_export(self): + self._test_volume_api('remove_export', + rpc_method='cast', + volume=self.fake_volume, + version='1.30') diff --git a/cinder/tests/unit/test_volume_utils.py b/cinder/tests/unit/test_volume_utils.py index c7ab1581e..8baecd4c3 100644 --- a/cinder/tests/unit/test_volume_utils.py +++ b/cinder/tests/unit/test_volume_utils.py @@ -17,6 +17,7 @@ import datetime +import io import mock from oslo_concurrency import processutils @@ -626,6 +627,23 @@ class CopyVolumeTestCase(test.TestCase): 'iflag=direct', 'oflag=direct', 'conv=sparse', run_as_root=True) + @mock.patch('cinder.volume.utils._copy_volume_with_file') + def test_copy_volume_handles(self, mock_copy): + handle1 = io.RawIOBase() + handle2 = io.RawIOBase() + output = volume_utils.copy_volume(handle1, handle2, 1024, 1) + self.assertIsNone(output) + mock_copy.assert_called_once_with(handle1, handle2, 1024) + + @mock.patch('cinder.volume.utils._transfer_data') + @mock.patch('cinder.volume.utils._open_volume_with_path') + def test_copy_volume_handle_transfer(self, mock_open, mock_transfer): + handle = io.RawIOBase() + output = volume_utils.copy_volume('/foo/bar', handle, 1024, 1) + self.assertIsNone(output) + mock_transfer.assert_called_once_with(mock.ANY, mock.ANY, + 1073741824, mock.ANY) + class VolumeUtilsTestCase(test.TestCase): def test_null_safe_str(self): diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py index 71e766d00..e5eefe8fc 100644 --- a/cinder/volume/drivers/rbd.py +++ b/cinder/volume/drivers/rbd.py @@ -266,7 +266,7 @@ class RADOSClient(object): class RBDDriver(driver.TransferVD, driver.ExtendVD, driver.CloneableVD, driver.CloneableImageVD, driver.SnapshotVD, - driver.BaseVD): + driver.MigrateVD, driver.BaseVD): """Implements RADOS block device (RBD) volume commands.""" VERSION = '1.2.0' @@ -1057,3 +1057,40 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD, 'size': image_size}) raise exception.VolumeBackendAPIException( data=exception_message) + + def update_migrated_volume(self, ctxt, volume, new_volume, + original_volume_status): + """Return model update from RBD for migrated volume. + + This method should rename the back-end volume name(id) on the + destination host back to its original name(id) on the source host. + + :param ctxt: The context used to run the method update_migrated_volume + :param volume: The original volume that was migrated to this backend + :param new_volume: The migration volume object that was created on + this backend as part of the migration process + :param original_volume_status: The status of the original volume + :return model_update to update DB with any needed changes + """ + name_id = None + provider_location = None + + existing_name = CONF.volume_name_template % new_volume['id'] + wanted_name = CONF.volume_name_template % volume['id'] + with RADOSClient(self) as client: + try: + self.RBDProxy().rename(client.ioctx, + utils.convert_str(existing_name), + utils.convert_str(wanted_name)) + except self.rbd.ImageNotFound: + LOG.error(_LE('Unable to rename the logical volume ' + 'for volume %s.'), volume['id']) + # If the rename fails, _name_id should be set to the new + # volume id and provider_location should be set to the + # one from the new volume as well. + name_id = new_volume['_name_id'] or new_volume['id'] + provider_location = new_volume['provider_location'] + return {'_name_id': name_id, 'provider_location': provider_location} + + def migrate_volume(self, context, volume, host): + return (False, None) diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 9aaeb41db..6eda82eaa 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -47,6 +47,7 @@ from oslo_service import periodic_task from oslo_utils import excutils from oslo_utils import importutils from oslo_utils import timeutils +from oslo_utils import units from oslo_utils import uuidutils from osprofiler import profiler import six @@ -190,7 +191,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '1.29' + RPC_API_VERSION = '1.30' target = messaging.Target(version=RPC_API_VERSION) @@ -1332,6 +1333,21 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.info(_LI("Terminate volume connection completed successfully."), resource=volume_ref) + def remove_export(self, context, volume_id): + """Removes an export for a volume.""" + + utils.require_driver_initialized(self.driver) + volume_ref = self.db.volume_get(context, volume_id) + try: + self.driver.remove_export(context, volume_ref) + except Exception: + msg = _("Remove volume export failed.") + LOG.exception(msg, resource=volume_ref) + raise exception.VolumeBackendAPIException(data=msg) + + LOG.info(_LI("Remove volume export completed successfully."), + resource=volume_ref) + def accept_transfer(self, context, volume_id, new_user, new_project): # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -1367,6 +1383,116 @@ class VolumeManager(manager.SchedulerDependentManager): resource=volume_ref) return model_update + def _connect_device(self, conn): + use_multipath = self.configuration.use_multipath_for_image_xfer + device_scan_attempts = self.configuration.num_volume_device_scan_tries + protocol = conn['driver_volume_type'] + connector = utils.brick_get_connector( + protocol, + use_multipath=use_multipath, + device_scan_attempts=device_scan_attempts, + conn=conn) + vol_handle = connector.connect_volume(conn['data']) + + root_access = True + + if not connector.check_valid_device(vol_handle['path'], root_access): + if isinstance(vol_handle['path'], six.string_types): + raise exception.DeviceUnavailable( + path=vol_handle['path'], + reason=(_("Unable to access the backend storage via the " + "path %(path)s.") % + {'path': vol_handle['path']})) + else: + raise exception.DeviceUnavailable( + path=None, + reason=(_("Unable to access the backend storage via file " + "handle."))) + + return {'conn': conn, 'device': vol_handle, 'connector': connector} + + def _attach_volume(self, ctxt, volume, properties, remote=False): + status = volume['status'] + + if remote: + rpcapi = volume_rpcapi.VolumeAPI() + try: + conn = rpcapi.initialize_connection(ctxt, volume, properties) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Failed to attach volume %(vol)s."), + {'vol': volume['id']}) + self.db.volume_update(ctxt, volume['id'], + {'status': status}) + else: + conn = self.initialize_connection(ctxt, volume['id'], properties) + + return self._connect_device(conn) + + def _detach_volume(self, ctxt, attach_info, volume, properties, + force=False, remote=False): + connector = attach_info['connector'] + connector.disconnect_volume(attach_info['conn']['data'], + attach_info['device']) + + if remote: + rpcapi = volume_rpcapi.VolumeAPI() + rpcapi.terminate_connection(ctxt, volume, properties, force=force) + rpcapi.remove_export(ctxt, volume) + else: + try: + self.terminate_connection(ctxt, volume['id'], properties, + force=force) + self.remove_export(ctxt, volume['id']) + except Exception as err: + with excutils.save_and_reraise_exception(): + LOG.error(_LE('Unable to terminate volume connection: ' + '%(err)s.') % {'err': err}) + + def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None): + """Copy data from src_vol to dest_vol.""" + + LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.', + {'src': src_vol['name'], 'dest': dest_vol['name']}) + + properties = utils.brick_get_connector_properties() + + dest_remote = remote in ['dest', 'both'] + dest_attach_info = self._attach_volume(ctxt, dest_vol, properties, + remote=dest_remote) + + try: + src_remote = remote in ['src', 'both'] + src_attach_info = self._attach_volume(ctxt, src_vol, properties, + remote=src_remote) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Failed to attach source volume for copy.")) + self._detach_volume(ctxt, dest_attach_info, dest_vol, + properties, remote=dest_remote) + + copy_error = True + try: + size_in_mb = int(src_vol['size']) * units.Ki # vol size is in GB + vol_utils.copy_volume(src_attach_info['device']['path'], + dest_attach_info['device']['path'], + size_in_mb, + self.configuration.volume_dd_blocksize) + copy_error = False + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Failed to copy volume %(src)s to %(dest)s."), + {'src': src_vol['id'], 'dest': dest_vol['id']}) + finally: + try: + self._detach_volume(ctxt, dest_attach_info, dest_vol, + properties, force=copy_error, + remote=dest_remote) + finally: + self._detach_volume(ctxt, src_attach_info, src_vol, + properties, force=copy_error, + remote=src_remote) + def _migrate_volume_generic(self, ctxt, volume, host, new_type_id): rpcapi = volume_rpcapi.VolumeAPI() @@ -1421,8 +1547,7 @@ class VolumeManager(manager.SchedulerDependentManager): try: attachments = volume['volume_attachment'] if not attachments: - self.driver.copy_volume_data(ctxt, volume, new_volume, - remote='dest') + self._copy_volume_data(ctxt, volume, new_volume, remote='dest') # The above call is synchronous so we complete the migration self.migrate_volume_completion(ctxt, volume['id'], new_volume['id'], diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index a2c748cc0..7a5c858af 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -75,6 +75,7 @@ class VolumeAPI(object): 1.27 - Adds support for replication V2 1.28 - Adds manage_existing_snapshot 1.29 - Adds get_capabilities. + 1.30 - Adds remove_export """ BASE_RPC_API_VERSION = '1.0' @@ -84,7 +85,7 @@ class VolumeAPI(object): target = messaging.Target(topic=CONF.volume_topic, version=self.BASE_RPC_API_VERSION) serializer = objects_base.CinderObjectSerializer() - self.client = rpc.get_client(target, '1.29', serializer=serializer) + self.client = rpc.get_client(target, '1.30', serializer=serializer) def create_consistencygroup(self, ctxt, group, host): new_host = utils.extract_host(host) @@ -197,6 +198,11 @@ class VolumeAPI(object): return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'], connector=connector, force=force) + def remove_export(self, ctxt, volume): + new_host = utils.extract_host(volume['host']) + cctxt = self.client.prepare(server=new_host, version='1.30') + cctxt.cast(ctxt, 'remove_export', volume_id=volume['id']) + def publish_service_capabilities(self, ctxt): cctxt = self.client.prepare(fanout=True, version='1.2') cctxt.cast(ctxt, 'publish_service_capabilities') diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index b8ce98aac..4d83ef429 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -18,22 +18,26 @@ import ast import math import re +import time import uuid from Crypto.Random import random +import eventlet +from eventlet import tpool from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging from oslo_utils import strutils from oslo_utils import timeutils from oslo_utils import units +import six from six.moves import range from cinder.brick.local_dev import lvm as brick_lvm from cinder import context from cinder import db from cinder import exception -from cinder.i18n import _LI, _LW +from cinder.i18n import _, _LI, _LW, _LE from cinder import rpc from cinder import utils from cinder.volume import throttling @@ -301,8 +305,9 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'): return False -def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False, - execute=utils.execute, ionice=None, sparse=False): +def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize, + sync=False, execute=utils.execute, ionice=None, + sparse=False): # Use O_DIRECT to avoid thrashing the system buffer cache extra_flags = [] if check_for_odirect_support(srcstr, deststr, 'iflag=direct'): @@ -354,15 +359,107 @@ def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False, {'size_in_m': size_in_m, 'mbps': mbps}) -def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False, +def _open_volume_with_path(path, mode): + try: + with utils.temporary_chown(path): + handle = open(path, mode) + return handle + except Exception: + LOG.error(_LE("Failed to open volume from %(path)s."), {'path': path}) + + +def _transfer_data(src, dest, length, chunk_size): + """Transfer data between files (Python IO objects).""" + + chunks = int(math.ceil(length / chunk_size)) + remaining_length = length + + LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.", + {'chunks': chunks, 'bytes': chunk_size}) + + for chunk in xrange(0, chunks): + before = time.time() + data = tpool.execute(src.read, min(chunk_size, remaining_length)) + + # If we have reached end of source, discard any extraneous bytes from + # destination volume if trim is enabled and stop writing. + if data == '': + break + + tpool.execute(dest.write, data) + remaining_length -= len(data) + delta = (time.time() - before) + rate = (chunk_size / delta) / units.Ki + LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).", + {'chunk': chunk + 1, 'chunks': chunks, 'rate': rate}) + + # yield to any other pending operations + eventlet.sleep(0) + + tpool.execute(dest.flush) + + +def _copy_volume_with_file(src, dest, size_in_m): + src_handle = src + if isinstance(src, six.string_types): + src_handle = _open_volume_with_path(src, 'rb') + + dest_handle = dest + if isinstance(dest, six.string_types): + dest_handle = _open_volume_with_path(dest, 'wb') + + if not src_handle: + raise exception.DeviceUnavailable( + _("Failed to copy volume, source device unavailable.")) + + if not dest_handle: + raise exception.DeviceUnavailable( + _("Failed to copy volume, destination device unavailable.")) + + start_time = timeutils.utcnow() + + _transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4) + + duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow())) + + if isinstance(src, six.string_types): + src_handle.close() + if isinstance(dest, six.string_types): + dest_handle.close() + + mbps = (size_in_m / duration) + LOG.info(_LI("Volume copy completed (%(size_in_m).2f MB at " + "%(mbps).2f MB/s)."), + {'size_in_m': size_in_m, 'mbps': mbps}) + + +def copy_volume(src, dest, size_in_m, blocksize, sync=False, execute=utils.execute, ionice=None, throttle=None, sparse=False): - if not throttle: - throttle = throttling.Throttle.get_default() - with throttle.subcommand(srcstr, deststr) as throttle_cmd: - _copy_volume(throttle_cmd['prefix'], srcstr, deststr, - size_in_m, blocksize, sync=sync, - execute=execute, ionice=ionice, sparse=sparse) + """Copy data from the source volume to the destination volume. + + The parameters 'src' and 'dest' are both typically of type str, which + represents the path to each volume on the filesystem. Connectors can + optionally return a volume handle of type RawIOBase for volumes that are + not available on the local filesystem for open/close operations. + + If either 'src' or 'dest' are not of type str, then they are assumed to be + of type RawIOBase or any derivative that supports file operations such as + read and write. In this case, the handles are treated as file handles + instead of file paths and, at present moment, throttling is unavailable. + """ + + if (isinstance(src, six.string_types) and + isinstance(dest, six.string_types)): + if not throttle: + throttle = throttling.Throttle.get_default() + with throttle.subcommand(src, dest) as throttle_cmd: + _copy_volume_with_path(throttle_cmd['prefix'], src, dest, + size_in_m, blocksize, sync=sync, + execute=execute, ionice=ionice, + sparse=sparse) + else: + _copy_volume_with_file(src, dest, size_in_m) def clear_volume(volume_size, volume_path, volume_clear=None,