]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add support for file I/O volume migration
authorJon Bernard <jobernar@redhat.com>
Mon, 1 Jun 2015 18:09:30 +0000 (14:09 -0400)
committerSean Dague <sean@dague.net>
Mon, 31 Aug 2015 18:12:49 +0000 (18:12 +0000)
This patch changes the generic volume migration logic to support
non-attachable volumes.  Non-attachable refers to volume drivers that do
not support attachment via the typical iSCSI or similar protocols where
a block device is made available on the host machine.  Device drivers
such as RBD make volumes available to cinder via a file handle that
proxies read() and write() calls to the Ceph cluster.

This patch improves the generic migration logic to determine whether a
migration operation can proceed with dd using block device paths or file
operations on handles returned from the os-brick connectors.

Changes to the RBD driver are included to correctly rename the target
volume during the completion phase of a successful migration.

It appears there is still some work to be done for attached in-use
volume migration for certain configurations.  Successful tests were seen
for:

  LVM to LVM (available and in-use)
  LVM to/from NFS (available and in-use)
  LVM to/from Ceph (available)
  Ceph to LVM (in-use)
  NFS to/from Ceph (available)
  Ceph to NFS (in-use)

Failures were seen (due to Nova) for the following:

  LVM to Ceph (in-use)
  NFS to Ceph (in-use)

(Pulled from gate, cinder can no longer pass unit tests)

Blueprint: generic-volume-migration
Closes-Bug: #1489335
Closes-Bug: #1489337
Change-Id: Iece2776fa751152f97b389ddab426e50c6f79bea

cinder/tests/unit/test_rbd.py
cinder/tests/unit/test_volume.py
cinder/tests/unit/test_volume_rpcapi.py
cinder/tests/unit/test_volume_utils.py
cinder/volume/drivers/rbd.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
cinder/volume/utils.py

index 101d05b3c050b88a249c257048a9a6eb78b320fd..d407bb917161438a595be673a7fae855a9fb3bd8 100644 (file)
@@ -863,6 +863,30 @@ class RBDTestCase(test.TestCase):
         self.assertTrue(self.driver.retype(context, fake_volume,
                                            fake_type, diff, host))
 
+    @common_mocks
+    def test_update_migrated_volume(self):
+        client = self.mock_client.return_value
+        client.__enter__.return_value = client
+
+        with mock.patch.object(self.driver.rbd.RBD(), 'rename') as mock_rename:
+            context = {}
+            current_volume = {'id': 'curr_id',
+                              'name': 'curr_name',
+                              'provider_location': 'curr_provider_location'}
+            original_volume = {'id': 'orig_id',
+                               'name': 'orig_name',
+                               'provider_location': 'orig_provider_location'}
+            mock_rename.return_value = 0
+            model_update = self.driver.update_migrated_volume(context,
+                                                              original_volume,
+                                                              current_volume,
+                                                              'available')
+            mock_rename.assert_called_with(client.ioctx,
+                                           'volume-%s' % current_volume['id'],
+                                           'volume-%s' % original_volume['id'])
+            self.assertEqual({'_name_id': None,
+                              'provider_location': None}, model_update)
+
     def test_rbd_volume_proxy_init(self):
         mock_driver = mock.Mock(name='driver')
         mock_driver._connect_to_rados.return_value = (None, None)
index 537e1586998c28c7fdff0be2b814e0fe17d9c41e..35f7293dca1ffdb6a09ffb6034ccda04f4ae48f0 100644 (file)
@@ -40,7 +40,6 @@ from taskflow.engines.action_engine import engine
 
 from cinder.api import common
 from cinder.brick.local_dev import lvm as brick_lvm
-from cinder.compute import nova
 from cinder import context
 from cinder import db
 from cinder import exception
@@ -4243,20 +4242,21 @@ class VolumeTestCase(BaseVolumeTestCase):
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
 
-    @mock.patch.object(nova.API, 'update_server_volume')
+    @mock.patch('cinder.compute.API')
     @mock.patch('cinder.volume.manager.VolumeManager.'
                 'migrate_volume_completion')
     @mock.patch('cinder.db.volume_get')
     def test_migrate_volume_generic(self, volume_get,
                                     migrate_volume_completion,
-                                    update_server_volume):
+                                    nova_api):
         fake_volume_id = 'fake_volume_id'
         fake_new_volume = {'status': 'available', 'id': fake_volume_id}
         host_obj = {'host': 'newhost', 'capabilities': {}}
         volume_get.return_value = fake_new_volume
+        update_server_volume = nova_api.return_value.update_server_volume
         volume = tests_utils.create_volume(self.context, size=1,
                                            host=CONF.host)
-        with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+        with mock.patch.object(self.volume, '_copy_volume_data') as \
                 mock_copy_volume:
             self.volume._migrate_volume_generic(self.context, volume,
                                                 host_obj, None)
@@ -4267,19 +4267,21 @@ class VolumeTestCase(BaseVolumeTestCase):
                                                          volume['id'],
                                                          fake_new_volume['id'],
                                                          error=False)
+            self.assertFalse(update_server_volume.called)
 
-    @mock.patch.object(nova.API, 'update_server_volume')
+    @mock.patch('cinder.compute.API')
     @mock.patch('cinder.volume.manager.VolumeManager.'
                 'migrate_volume_completion')
     @mock.patch('cinder.db.volume_get')
     def test_migrate_volume_generic_attached_volume(self, volume_get,
                                                     migrate_volume_completion,
-                                                    update_server_volume):
+                                                    nova_api):
         attached_host = 'some-host'
         fake_volume_id = 'fake_volume_id'
         fake_new_volume = {'status': 'available', 'id': fake_volume_id}
         host_obj = {'host': 'newhost', 'capabilities': {}}
         fake_uuid = fakes.get_fake_uuid()
+        update_server_volume = nova_api.return_value.update_server_volume
         volume_get.return_value = fake_new_volume
         volume = tests_utils.create_volume(self.context, size=1,
                                            host=CONF.host)
@@ -4293,12 +4295,8 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.volume._migrate_volume_generic(self.context, volume,
                                             host_obj, None)
         self.assertFalse(migrate_volume_completion.called)
-        with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
-                mock_copy_volume:
-            self.volume._migrate_volume_generic(self.context, volume,
-                                                host_obj, None)
-            self.assertFalse(mock_copy_volume.called)
-            self.assertFalse(migrate_volume_completion.called)
+        update_server_volume.assert_called_with(self.context, fake_uuid,
+                                                volume['id'], fake_volume_id)
 
     @mock.patch.object(volume_rpcapi.VolumeAPI, 'update_migrated_volume')
     @mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume')
@@ -4312,7 +4310,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         host_obj = {'host': 'newhost', 'capabilities': {}}
         with mock.patch.object(self.volume.driver, 'migrate_volume') as \
                 mock_migrate_volume,\
-                mock.patch.object(self.volume.driver, 'copy_volume_data'), \
+                mock.patch.object(self.volume, '_copy_volume_data'),\
                 mock.patch.object(self.volume.driver, 'delete_volume') as \
                 delete_volume:
             create_volume.side_effect = self._fake_create_volume
@@ -4331,7 +4329,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         with mock.patch.object(self.volume.driver, 'migrate_volume'),\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
                 as mock_create_volume,\
-                mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+                mock.patch.object(self.volume, '_copy_volume_data') as \
                 mock_copy_volume,\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
                 mock.patch.object(self.volume, 'migrate_volume_completion'),\
@@ -4455,7 +4453,7 @@ class VolumeTestCase(BaseVolumeTestCase):
         with mock.patch.object(self.volume.driver, 'migrate_volume'),\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
                 as mock_create_volume,\
-                mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+                mock.patch.object(self.volume, '_copy_volume_data') as \
                 mock_copy_volume,\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
                 mock.patch.object(self.volume, 'migrate_volume_completion'),\
@@ -4489,16 +4487,21 @@ class VolumeTestCase(BaseVolumeTestCase):
         with mock.patch.object(self.volume.driver, 'migrate_volume'),\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
                 as mock_create_volume,\
-                mock.patch.object(self.volume.driver, 'copy_volume_data'),\
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
                 mock.patch.object(self.volume, 'migrate_volume_completion')\
                 as mock_migrate_compl,\
-                mock.patch.object(self.volume.driver, 'create_export'):
+                mock.patch.object(self.volume.driver, 'create_export'), \
+                mock.patch.object(self.volume, '_attach_volume'), \
+                mock.patch.object(self.volume, '_detach_volume'), \
+                mock.patch.object(os_brick.initiator.connector,
+                                  'get_connector_properties') \
+                as mock_get_connector_properties:
 
             # Exception case at delete_volume
             # source_volume['migration_status'] is 'completing'
             mock_create_volume.side_effect = self._fake_create_volume
             mock_migrate_compl.side_effect = fake_migrate_volume_completion
+            mock_get_connector_properties.return_value = {}
             volume = tests_utils.create_volume(self.context, size=0,
                                                host=CONF.host)
             host_obj = {'host': 'newhost', 'capabilities': {}}
index 9df7f0c95b37d7c1d87af58f8b64a06b7219445f..db93e60859b0215c337d3459d64c0f0a9120f8e2 100644 (file)
@@ -409,3 +409,9 @@ class VolumeRpcAPITestCase(test.TestCase):
                               host='fake_host',
                               discover=True,
                               version='1.29')
+
+    def test_remove_export(self):
+        self._test_volume_api('remove_export',
+                              rpc_method='cast',
+                              volume=self.fake_volume,
+                              version='1.30')
index c7ab1581e8503a803ab1f6ec699912df7996f8a8..8baecd4c3db7e372040b37c871cdafd7726e8a5f 100644 (file)
@@ -17,6 +17,7 @@
 
 
 import datetime
+import io
 import mock
 
 from oslo_concurrency import processutils
@@ -626,6 +627,23 @@ class CopyVolumeTestCase(test.TestCase):
                                           'iflag=direct', 'oflag=direct',
                                           'conv=sparse', run_as_root=True)
 
+    @mock.patch('cinder.volume.utils._copy_volume_with_file')
+    def test_copy_volume_handles(self, mock_copy):
+        handle1 = io.RawIOBase()
+        handle2 = io.RawIOBase()
+        output = volume_utils.copy_volume(handle1, handle2, 1024, 1)
+        self.assertIsNone(output)
+        mock_copy.assert_called_once_with(handle1, handle2, 1024)
+
+    @mock.patch('cinder.volume.utils._transfer_data')
+    @mock.patch('cinder.volume.utils._open_volume_with_path')
+    def test_copy_volume_handle_transfer(self, mock_open, mock_transfer):
+        handle = io.RawIOBase()
+        output = volume_utils.copy_volume('/foo/bar', handle, 1024, 1)
+        self.assertIsNone(output)
+        mock_transfer.assert_called_once_with(mock.ANY, mock.ANY,
+                                              1073741824, mock.ANY)
+
 
 class VolumeUtilsTestCase(test.TestCase):
     def test_null_safe_str(self):
index 71e766d0048713b309f6ab1bea7e28304259dea6..e5eefe8fcfb3ca89e41d6176540a732fc43527d5 100644 (file)
@@ -266,7 +266,7 @@ class RADOSClient(object):
 
 class RBDDriver(driver.TransferVD, driver.ExtendVD,
                 driver.CloneableVD, driver.CloneableImageVD, driver.SnapshotVD,
-                driver.BaseVD):
+                driver.MigrateVD, driver.BaseVD):
     """Implements RADOS block device (RBD) volume commands."""
 
     VERSION = '1.2.0'
@@ -1057,3 +1057,40 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
                                         'size': image_size})
                 raise exception.VolumeBackendAPIException(
                     data=exception_message)
+
+    def update_migrated_volume(self, ctxt, volume, new_volume,
+                               original_volume_status):
+        """Return model update from RBD for migrated volume.
+
+        This method should rename the back-end volume name(id) on the
+        destination host back to its original name(id) on the source host.
+
+        :param ctxt: The context used to run the method update_migrated_volume
+        :param volume: The original volume that was migrated to this backend
+        :param new_volume: The migration volume object that was created on
+                           this backend as part of the migration process
+        :param original_volume_status: The status of the original volume
+        :return model_update to update DB with any needed changes
+        """
+        name_id = None
+        provider_location = None
+
+        existing_name = CONF.volume_name_template % new_volume['id']
+        wanted_name = CONF.volume_name_template % volume['id']
+        with RADOSClient(self) as client:
+            try:
+                self.RBDProxy().rename(client.ioctx,
+                                       utils.convert_str(existing_name),
+                                       utils.convert_str(wanted_name))
+            except self.rbd.ImageNotFound:
+                LOG.error(_LE('Unable to rename the logical volume '
+                              'for volume %s.'), volume['id'])
+                # If the rename fails, _name_id should be set to the new
+                # volume id and provider_location should be set to the
+                # one from the new volume as well.
+                name_id = new_volume['_name_id'] or new_volume['id']
+                provider_location = new_volume['provider_location']
+        return {'_name_id': name_id, 'provider_location': provider_location}
+
+    def migrate_volume(self, context, volume, host):
+        return (False, None)
index 9aaeb41dba2522bdecc2cffdff230730974ec9dd..6eda82eaa31a7abe0481c6132df253446521d582 100644 (file)
@@ -47,6 +47,7 @@ from oslo_service import periodic_task
 from oslo_utils import excutils
 from oslo_utils import importutils
 from oslo_utils import timeutils
+from oslo_utils import units
 from oslo_utils import uuidutils
 from osprofiler import profiler
 import six
@@ -190,7 +191,7 @@ def locked_snapshot_operation(f):
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.29'
+    RPC_API_VERSION = '1.30'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -1332,6 +1333,21 @@ class VolumeManager(manager.SchedulerDependentManager):
         LOG.info(_LI("Terminate volume connection completed successfully."),
                  resource=volume_ref)
 
+    def remove_export(self, context, volume_id):
+        """Removes an export for a volume."""
+
+        utils.require_driver_initialized(self.driver)
+        volume_ref = self.db.volume_get(context, volume_id)
+        try:
+            self.driver.remove_export(context, volume_ref)
+        except Exception:
+            msg = _("Remove volume export failed.")
+            LOG.exception(msg, resource=volume_ref)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        LOG.info(_LI("Remove volume export completed successfully."),
+                 resource=volume_ref)
+
     def accept_transfer(self, context, volume_id, new_user, new_project):
         # NOTE(flaper87): Verify the driver is enabled
         # before going forward. The exception will be caught
@@ -1367,6 +1383,116 @@ class VolumeManager(manager.SchedulerDependentManager):
                  resource=volume_ref)
         return model_update
 
+    def _connect_device(self, conn):
+        use_multipath = self.configuration.use_multipath_for_image_xfer
+        device_scan_attempts = self.configuration.num_volume_device_scan_tries
+        protocol = conn['driver_volume_type']
+        connector = utils.brick_get_connector(
+            protocol,
+            use_multipath=use_multipath,
+            device_scan_attempts=device_scan_attempts,
+            conn=conn)
+        vol_handle = connector.connect_volume(conn['data'])
+
+        root_access = True
+
+        if not connector.check_valid_device(vol_handle['path'], root_access):
+            if isinstance(vol_handle['path'], six.string_types):
+                raise exception.DeviceUnavailable(
+                    path=vol_handle['path'],
+                    reason=(_("Unable to access the backend storage via the "
+                              "path %(path)s.") %
+                            {'path': vol_handle['path']}))
+            else:
+                raise exception.DeviceUnavailable(
+                    path=None,
+                    reason=(_("Unable to access the backend storage via file "
+                              "handle.")))
+
+        return {'conn': conn, 'device': vol_handle, 'connector': connector}
+
+    def _attach_volume(self, ctxt, volume, properties, remote=False):
+        status = volume['status']
+
+        if remote:
+            rpcapi = volume_rpcapi.VolumeAPI()
+            try:
+                conn = rpcapi.initialize_connection(ctxt, volume, properties)
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    LOG.error(_LE("Failed to attach volume %(vol)s."),
+                              {'vol': volume['id']})
+                    self.db.volume_update(ctxt, volume['id'],
+                                          {'status': status})
+        else:
+            conn = self.initialize_connection(ctxt, volume['id'], properties)
+
+        return self._connect_device(conn)
+
+    def _detach_volume(self, ctxt, attach_info, volume, properties,
+                       force=False, remote=False):
+        connector = attach_info['connector']
+        connector.disconnect_volume(attach_info['conn']['data'],
+                                    attach_info['device'])
+
+        if remote:
+            rpcapi = volume_rpcapi.VolumeAPI()
+            rpcapi.terminate_connection(ctxt, volume, properties, force=force)
+            rpcapi.remove_export(ctxt, volume)
+        else:
+            try:
+                self.terminate_connection(ctxt, volume['id'], properties,
+                                          force=force)
+                self.remove_export(ctxt, volume['id'])
+            except Exception as err:
+                with excutils.save_and_reraise_exception():
+                    LOG.error(_LE('Unable to terminate volume connection: '
+                                  '%(err)s.') % {'err': err})
+
+    def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None):
+        """Copy data from src_vol to dest_vol."""
+
+        LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.',
+                  {'src': src_vol['name'], 'dest': dest_vol['name']})
+
+        properties = utils.brick_get_connector_properties()
+
+        dest_remote = remote in ['dest', 'both']
+        dest_attach_info = self._attach_volume(ctxt, dest_vol, properties,
+                                               remote=dest_remote)
+
+        try:
+            src_remote = remote in ['src', 'both']
+            src_attach_info = self._attach_volume(ctxt, src_vol, properties,
+                                                  remote=src_remote)
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE("Failed to attach source volume for copy."))
+                self._detach_volume(ctxt, dest_attach_info, dest_vol,
+                                    properties, remote=dest_remote)
+
+        copy_error = True
+        try:
+            size_in_mb = int(src_vol['size']) * units.Ki    # vol size is in GB
+            vol_utils.copy_volume(src_attach_info['device']['path'],
+                                  dest_attach_info['device']['path'],
+                                  size_in_mb,
+                                  self.configuration.volume_dd_blocksize)
+            copy_error = False
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE("Failed to copy volume %(src)s to %(dest)s."),
+                          {'src': src_vol['id'], 'dest': dest_vol['id']})
+        finally:
+            try:
+                self._detach_volume(ctxt, dest_attach_info, dest_vol,
+                                    properties, force=copy_error,
+                                    remote=dest_remote)
+            finally:
+                self._detach_volume(ctxt, src_attach_info, src_vol,
+                                    properties, force=copy_error,
+                                    remote=src_remote)
+
     def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
         rpcapi = volume_rpcapi.VolumeAPI()
 
@@ -1421,8 +1547,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         try:
             attachments = volume['volume_attachment']
             if not attachments:
-                self.driver.copy_volume_data(ctxt, volume, new_volume,
-                                             remote='dest')
+                self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
                 # The above call is synchronous so we complete the migration
                 self.migrate_volume_completion(ctxt, volume['id'],
                                                new_volume['id'],
index a2c748cc0b973b6dcbbee66467e833922723d471..7a5c858af9e6c4466390716a1b54795685ea11a0 100644 (file)
@@ -75,6 +75,7 @@ class VolumeAPI(object):
         1.27 - Adds support for replication V2
         1.28 - Adds manage_existing_snapshot
         1.29 - Adds get_capabilities.
+        1.30 - Adds remove_export
     """
 
     BASE_RPC_API_VERSION = '1.0'
@@ -84,7 +85,7 @@ class VolumeAPI(object):
         target = messaging.Target(topic=CONF.volume_topic,
                                   version=self.BASE_RPC_API_VERSION)
         serializer = objects_base.CinderObjectSerializer()
-        self.client = rpc.get_client(target, '1.29', serializer=serializer)
+        self.client = rpc.get_client(target, '1.30', serializer=serializer)
 
     def create_consistencygroup(self, ctxt, group, host):
         new_host = utils.extract_host(host)
@@ -197,6 +198,11 @@ class VolumeAPI(object):
         return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
                           connector=connector, force=force)
 
+    def remove_export(self, ctxt, volume):
+        new_host = utils.extract_host(volume['host'])
+        cctxt = self.client.prepare(server=new_host, version='1.30')
+        cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
+
     def publish_service_capabilities(self, ctxt):
         cctxt = self.client.prepare(fanout=True, version='1.2')
         cctxt.cast(ctxt, 'publish_service_capabilities')
index b8ce98aac4270af830a08b14acbbf5d849bb17bc..4d83ef4290adc95e38e76d6f045729c95ef5be5b 100644 (file)
 import ast
 import math
 import re
+import time
 import uuid
 
 from Crypto.Random import random
+import eventlet
+from eventlet import tpool
 from oslo_concurrency import processutils
 from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_utils import strutils
 from oslo_utils import timeutils
 from oslo_utils import units
+import six
 from six.moves import range
 
 from cinder.brick.local_dev import lvm as brick_lvm
 from cinder import context
 from cinder import db
 from cinder import exception
-from cinder.i18n import _LI, _LW
+from cinder.i18n import _, _LI, _LW, _LE
 from cinder import rpc
 from cinder import utils
 from cinder.volume import throttling
@@ -301,8 +305,9 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'):
         return False
 
 
-def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
-                 execute=utils.execute, ionice=None, sparse=False):
+def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize,
+                           sync=False, execute=utils.execute, ionice=None,
+                           sparse=False):
     # Use O_DIRECT to avoid thrashing the system buffer cache
     extra_flags = []
     if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
@@ -354,15 +359,107 @@ def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
              {'size_in_m': size_in_m, 'mbps': mbps})
 
 
-def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
+def _open_volume_with_path(path, mode):
+    try:
+        with utils.temporary_chown(path):
+            handle = open(path, mode)
+            return handle
+    except Exception:
+        LOG.error(_LE("Failed to open volume from %(path)s."), {'path': path})
+
+
+def _transfer_data(src, dest, length, chunk_size):
+    """Transfer data between files (Python IO objects)."""
+
+    chunks = int(math.ceil(length / chunk_size))
+    remaining_length = length
+
+    LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.",
+              {'chunks': chunks, 'bytes': chunk_size})
+
+    for chunk in xrange(0, chunks):
+        before = time.time()
+        data = tpool.execute(src.read, min(chunk_size, remaining_length))
+
+        # If we have reached end of source, discard any extraneous bytes from
+        # destination volume if trim is enabled and stop writing.
+        if data == '':
+            break
+
+        tpool.execute(dest.write, data)
+        remaining_length -= len(data)
+        delta = (time.time() - before)
+        rate = (chunk_size / delta) / units.Ki
+        LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).",
+                  {'chunk': chunk + 1, 'chunks': chunks, 'rate': rate})
+
+        # yield to any other pending operations
+        eventlet.sleep(0)
+
+    tpool.execute(dest.flush)
+
+
+def _copy_volume_with_file(src, dest, size_in_m):
+    src_handle = src
+    if isinstance(src, six.string_types):
+        src_handle = _open_volume_with_path(src, 'rb')
+
+    dest_handle = dest
+    if isinstance(dest, six.string_types):
+        dest_handle = _open_volume_with_path(dest, 'wb')
+
+    if not src_handle:
+        raise exception.DeviceUnavailable(
+            _("Failed to copy volume, source device unavailable."))
+
+    if not dest_handle:
+        raise exception.DeviceUnavailable(
+            _("Failed to copy volume, destination device unavailable."))
+
+    start_time = timeutils.utcnow()
+
+    _transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4)
+
+    duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow()))
+
+    if isinstance(src, six.string_types):
+        src_handle.close()
+    if isinstance(dest, six.string_types):
+        dest_handle.close()
+
+    mbps = (size_in_m / duration)
+    LOG.info(_LI("Volume copy completed (%(size_in_m).2f MB at "
+                 "%(mbps).2f MB/s)."),
+             {'size_in_m': size_in_m, 'mbps': mbps})
+
+
+def copy_volume(src, dest, size_in_m, blocksize, sync=False,
                 execute=utils.execute, ionice=None, throttle=None,
                 sparse=False):
-    if not throttle:
-        throttle = throttling.Throttle.get_default()
-    with throttle.subcommand(srcstr, deststr) as throttle_cmd:
-        _copy_volume(throttle_cmd['prefix'], srcstr, deststr,
-                     size_in_m, blocksize, sync=sync,
-                     execute=execute, ionice=ionice, sparse=sparse)
+    """Copy data from the source volume to the destination volume.
+
+    The parameters 'src' and 'dest' are both typically of type str, which
+    represents the path to each volume on the filesystem.  Connectors can
+    optionally return a volume handle of type RawIOBase for volumes that are
+    not available on the local filesystem for open/close operations.
+
+    If either 'src' or 'dest' are not of type str, then they are assumed to be
+    of type RawIOBase or any derivative that supports file operations such as
+    read and write.  In this case, the handles are treated as file handles
+    instead of file paths and, at present moment, throttling is unavailable.
+    """
+
+    if (isinstance(src, six.string_types) and
+            isinstance(dest, six.string_types)):
+        if not throttle:
+            throttle = throttling.Throttle.get_default()
+        with throttle.subcommand(src, dest) as throttle_cmd:
+            _copy_volume_with_path(throttle_cmd['prefix'], src, dest,
+                                   size_in_m, blocksize, sync=sync,
+                                   execute=execute, ionice=ionice,
+                                   sparse=sparse)
+    else:
+        _copy_volume_with_file(src, dest, size_in_m)
 
 
 def clear_volume(volume_size, volume_path, volume_clear=None,