self.assertTrue(self.driver.retype(context, fake_volume,
fake_type, diff, host))
+ @common_mocks
+ def test_update_migrated_volume(self):
+ client = self.mock_client.return_value
+ client.__enter__.return_value = client
+
+ with mock.patch.object(self.driver.rbd.RBD(), 'rename') as mock_rename:
+ context = {}
+ current_volume = {'id': 'curr_id',
+ 'name': 'curr_name',
+ 'provider_location': 'curr_provider_location'}
+ original_volume = {'id': 'orig_id',
+ 'name': 'orig_name',
+ 'provider_location': 'orig_provider_location'}
+ mock_rename.return_value = 0
+ model_update = self.driver.update_migrated_volume(context,
+ original_volume,
+ current_volume,
+ 'available')
+ mock_rename.assert_called_with(client.ioctx,
+ 'volume-%s' % current_volume['id'],
+ 'volume-%s' % original_volume['id'])
+ self.assertEqual({'_name_id': None,
+ 'provider_location': None}, model_update)
+
def test_rbd_volume_proxy_init(self):
mock_driver = mock.Mock(name='driver')
mock_driver._connect_to_rados.return_value = (None, None)
from cinder.api import common
from cinder.brick.local_dev import lvm as brick_lvm
-from cinder.compute import nova
from cinder import context
from cinder import db
from cinder import exception
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
- @mock.patch.object(nova.API, 'update_server_volume')
+ @mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
'migrate_volume_completion')
@mock.patch('cinder.db.volume_get')
def test_migrate_volume_generic(self, volume_get,
migrate_volume_completion,
- update_server_volume):
+ nova_api):
fake_volume_id = 'fake_volume_id'
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
host_obj = {'host': 'newhost', 'capabilities': {}}
volume_get.return_value = fake_new_volume
+ update_server_volume = nova_api.return_value.update_server_volume
volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
- with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+ with mock.patch.object(self.volume, '_copy_volume_data') as \
mock_copy_volume:
self.volume._migrate_volume_generic(self.context, volume,
host_obj, None)
volume['id'],
fake_new_volume['id'],
error=False)
+ self.assertFalse(update_server_volume.called)
- @mock.patch.object(nova.API, 'update_server_volume')
+ @mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
'migrate_volume_completion')
@mock.patch('cinder.db.volume_get')
def test_migrate_volume_generic_attached_volume(self, volume_get,
migrate_volume_completion,
- update_server_volume):
+ nova_api):
attached_host = 'some-host'
fake_volume_id = 'fake_volume_id'
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
host_obj = {'host': 'newhost', 'capabilities': {}}
fake_uuid = fakes.get_fake_uuid()
+ update_server_volume = nova_api.return_value.update_server_volume
volume_get.return_value = fake_new_volume
volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
self.volume._migrate_volume_generic(self.context, volume,
host_obj, None)
self.assertFalse(migrate_volume_completion.called)
- with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
- mock_copy_volume:
- self.volume._migrate_volume_generic(self.context, volume,
- host_obj, None)
- self.assertFalse(mock_copy_volume.called)
- self.assertFalse(migrate_volume_completion.called)
+ update_server_volume.assert_called_with(self.context, fake_uuid,
+ volume['id'], fake_volume_id)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'update_migrated_volume')
@mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume')
host_obj = {'host': 'newhost', 'capabilities': {}}
with mock.patch.object(self.volume.driver, 'migrate_volume') as \
mock_migrate_volume,\
- mock.patch.object(self.volume.driver, 'copy_volume_data'), \
+ mock.patch.object(self.volume, '_copy_volume_data'),\
mock.patch.object(self.volume.driver, 'delete_volume') as \
delete_volume:
create_volume.side_effect = self._fake_create_volume
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
as mock_create_volume,\
- mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+ mock.patch.object(self.volume, '_copy_volume_data') as \
mock_copy_volume,\
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
mock.patch.object(self.volume, 'migrate_volume_completion'),\
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
as mock_create_volume,\
- mock.patch.object(self.volume.driver, 'copy_volume_data') as \
+ mock.patch.object(self.volume, '_copy_volume_data') as \
mock_copy_volume,\
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
mock.patch.object(self.volume, 'migrate_volume_completion'),\
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
as mock_create_volume,\
- mock.patch.object(self.volume.driver, 'copy_volume_data'),\
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
mock.patch.object(self.volume, 'migrate_volume_completion')\
as mock_migrate_compl,\
- mock.patch.object(self.volume.driver, 'create_export'):
+ mock.patch.object(self.volume.driver, 'create_export'), \
+ mock.patch.object(self.volume, '_attach_volume'), \
+ mock.patch.object(self.volume, '_detach_volume'), \
+ mock.patch.object(os_brick.initiator.connector,
+ 'get_connector_properties') \
+ as mock_get_connector_properties:
# Exception case at delete_volume
# source_volume['migration_status'] is 'completing'
mock_create_volume.side_effect = self._fake_create_volume
mock_migrate_compl.side_effect = fake_migrate_volume_completion
+ mock_get_connector_properties.return_value = {}
volume = tests_utils.create_volume(self.context, size=0,
host=CONF.host)
host_obj = {'host': 'newhost', 'capabilities': {}}
host='fake_host',
discover=True,
version='1.29')
+
+ def test_remove_export(self):
+ self._test_volume_api('remove_export',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ version='1.30')
import datetime
+import io
import mock
from oslo_concurrency import processutils
'iflag=direct', 'oflag=direct',
'conv=sparse', run_as_root=True)
+ @mock.patch('cinder.volume.utils._copy_volume_with_file')
+ def test_copy_volume_handles(self, mock_copy):
+ handle1 = io.RawIOBase()
+ handle2 = io.RawIOBase()
+ output = volume_utils.copy_volume(handle1, handle2, 1024, 1)
+ self.assertIsNone(output)
+ mock_copy.assert_called_once_with(handle1, handle2, 1024)
+
+ @mock.patch('cinder.volume.utils._transfer_data')
+ @mock.patch('cinder.volume.utils._open_volume_with_path')
+ def test_copy_volume_handle_transfer(self, mock_open, mock_transfer):
+ handle = io.RawIOBase()
+ output = volume_utils.copy_volume('/foo/bar', handle, 1024, 1)
+ self.assertIsNone(output)
+ mock_transfer.assert_called_once_with(mock.ANY, mock.ANY,
+ 1073741824, mock.ANY)
+
class VolumeUtilsTestCase(test.TestCase):
def test_null_safe_str(self):
class RBDDriver(driver.TransferVD, driver.ExtendVD,
driver.CloneableVD, driver.CloneableImageVD, driver.SnapshotVD,
- driver.BaseVD):
+ driver.MigrateVD, driver.BaseVD):
"""Implements RADOS block device (RBD) volume commands."""
VERSION = '1.2.0'
'size': image_size})
raise exception.VolumeBackendAPIException(
data=exception_message)
+
+ def update_migrated_volume(self, ctxt, volume, new_volume,
+ original_volume_status):
+ """Return model update from RBD for migrated volume.
+
+ This method should rename the back-end volume name(id) on the
+ destination host back to its original name(id) on the source host.
+
+ :param ctxt: The context used to run the method update_migrated_volume
+ :param volume: The original volume that was migrated to this backend
+ :param new_volume: The migration volume object that was created on
+ this backend as part of the migration process
+ :param original_volume_status: The status of the original volume
+ :return model_update to update DB with any needed changes
+ """
+ name_id = None
+ provider_location = None
+
+ existing_name = CONF.volume_name_template % new_volume['id']
+ wanted_name = CONF.volume_name_template % volume['id']
+ with RADOSClient(self) as client:
+ try:
+ self.RBDProxy().rename(client.ioctx,
+ utils.convert_str(existing_name),
+ utils.convert_str(wanted_name))
+ except self.rbd.ImageNotFound:
+ LOG.error(_LE('Unable to rename the logical volume '
+ 'for volume %s.'), volume['id'])
+ # If the rename fails, _name_id should be set to the new
+ # volume id and provider_location should be set to the
+ # one from the new volume as well.
+ name_id = new_volume['_name_id'] or new_volume['id']
+ provider_location = new_volume['provider_location']
+ return {'_name_id': name_id, 'provider_location': provider_location}
+
+ def migrate_volume(self, context, volume, host):
+ return (False, None)
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
+from oslo_utils import units
from oslo_utils import uuidutils
from osprofiler import profiler
import six
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.29'
+ RPC_API_VERSION = '1.30'
target = messaging.Target(version=RPC_API_VERSION)
LOG.info(_LI("Terminate volume connection completed successfully."),
resource=volume_ref)
+ def remove_export(self, context, volume_id):
+ """Removes an export for a volume."""
+
+ utils.require_driver_initialized(self.driver)
+ volume_ref = self.db.volume_get(context, volume_id)
+ try:
+ self.driver.remove_export(context, volume_ref)
+ except Exception:
+ msg = _("Remove volume export failed.")
+ LOG.exception(msg, resource=volume_ref)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ LOG.info(_LI("Remove volume export completed successfully."),
+ resource=volume_ref)
+
def accept_transfer(self, context, volume_id, new_user, new_project):
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
resource=volume_ref)
return model_update
+ def _connect_device(self, conn):
+ use_multipath = self.configuration.use_multipath_for_image_xfer
+ device_scan_attempts = self.configuration.num_volume_device_scan_tries
+ protocol = conn['driver_volume_type']
+ connector = utils.brick_get_connector(
+ protocol,
+ use_multipath=use_multipath,
+ device_scan_attempts=device_scan_attempts,
+ conn=conn)
+ vol_handle = connector.connect_volume(conn['data'])
+
+ root_access = True
+
+ if not connector.check_valid_device(vol_handle['path'], root_access):
+ if isinstance(vol_handle['path'], six.string_types):
+ raise exception.DeviceUnavailable(
+ path=vol_handle['path'],
+ reason=(_("Unable to access the backend storage via the "
+ "path %(path)s.") %
+ {'path': vol_handle['path']}))
+ else:
+ raise exception.DeviceUnavailable(
+ path=None,
+ reason=(_("Unable to access the backend storage via file "
+ "handle.")))
+
+ return {'conn': conn, 'device': vol_handle, 'connector': connector}
+
+ def _attach_volume(self, ctxt, volume, properties, remote=False):
+ status = volume['status']
+
+ if remote:
+ rpcapi = volume_rpcapi.VolumeAPI()
+ try:
+ conn = rpcapi.initialize_connection(ctxt, volume, properties)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Failed to attach volume %(vol)s."),
+ {'vol': volume['id']})
+ self.db.volume_update(ctxt, volume['id'],
+ {'status': status})
+ else:
+ conn = self.initialize_connection(ctxt, volume['id'], properties)
+
+ return self._connect_device(conn)
+
+ def _detach_volume(self, ctxt, attach_info, volume, properties,
+ force=False, remote=False):
+ connector = attach_info['connector']
+ connector.disconnect_volume(attach_info['conn']['data'],
+ attach_info['device'])
+
+ if remote:
+ rpcapi = volume_rpcapi.VolumeAPI()
+ rpcapi.terminate_connection(ctxt, volume, properties, force=force)
+ rpcapi.remove_export(ctxt, volume)
+ else:
+ try:
+ self.terminate_connection(ctxt, volume['id'], properties,
+ force=force)
+ self.remove_export(ctxt, volume['id'])
+ except Exception as err:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE('Unable to terminate volume connection: '
+ '%(err)s.') % {'err': err})
+
+ def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None):
+ """Copy data from src_vol to dest_vol."""
+
+ LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.',
+ {'src': src_vol['name'], 'dest': dest_vol['name']})
+
+ properties = utils.brick_get_connector_properties()
+
+ dest_remote = remote in ['dest', 'both']
+ dest_attach_info = self._attach_volume(ctxt, dest_vol, properties,
+ remote=dest_remote)
+
+ try:
+ src_remote = remote in ['src', 'both']
+ src_attach_info = self._attach_volume(ctxt, src_vol, properties,
+ remote=src_remote)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Failed to attach source volume for copy."))
+ self._detach_volume(ctxt, dest_attach_info, dest_vol,
+ properties, remote=dest_remote)
+
+ copy_error = True
+ try:
+ size_in_mb = int(src_vol['size']) * units.Ki # vol size is in GB
+ vol_utils.copy_volume(src_attach_info['device']['path'],
+ dest_attach_info['device']['path'],
+ size_in_mb,
+ self.configuration.volume_dd_blocksize)
+ copy_error = False
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Failed to copy volume %(src)s to %(dest)s."),
+ {'src': src_vol['id'], 'dest': dest_vol['id']})
+ finally:
+ try:
+ self._detach_volume(ctxt, dest_attach_info, dest_vol,
+ properties, force=copy_error,
+ remote=dest_remote)
+ finally:
+ self._detach_volume(ctxt, src_attach_info, src_vol,
+ properties, force=copy_error,
+ remote=src_remote)
+
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
rpcapi = volume_rpcapi.VolumeAPI()
try:
attachments = volume['volume_attachment']
if not attachments:
- self.driver.copy_volume_data(ctxt, volume, new_volume,
- remote='dest')
+ self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
# The above call is synchronous so we complete the migration
self.migrate_volume_completion(ctxt, volume['id'],
new_volume['id'],
1.27 - Adds support for replication V2
1.28 - Adds manage_existing_snapshot
1.29 - Adds get_capabilities.
+ 1.30 - Adds remove_export
"""
BASE_RPC_API_VERSION = '1.0'
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
- self.client = rpc.get_client(target, '1.29', serializer=serializer)
+ self.client = rpc.get_client(target, '1.30', serializer=serializer)
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force)
+ def remove_export(self, ctxt, volume):
+ new_host = utils.extract_host(volume['host'])
+ cctxt = self.client.prepare(server=new_host, version='1.30')
+ cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
+
def publish_service_capabilities(self, ctxt):
cctxt = self.client.prepare(fanout=True, version='1.2')
cctxt.cast(ctxt, 'publish_service_capabilities')
import ast
import math
import re
+import time
import uuid
from Crypto.Random import random
+import eventlet
+from eventlet import tpool
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import units
+import six
from six.moves import range
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import context
from cinder import db
from cinder import exception
-from cinder.i18n import _LI, _LW
+from cinder.i18n import _, _LI, _LW, _LE
from cinder import rpc
from cinder import utils
from cinder.volume import throttling
return False
-def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
- execute=utils.execute, ionice=None, sparse=False):
+def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize,
+ sync=False, execute=utils.execute, ionice=None,
+ sparse=False):
# Use O_DIRECT to avoid thrashing the system buffer cache
extra_flags = []
if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
{'size_in_m': size_in_m, 'mbps': mbps})
-def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
+def _open_volume_with_path(path, mode):
+ try:
+ with utils.temporary_chown(path):
+ handle = open(path, mode)
+ return handle
+ except Exception:
+ LOG.error(_LE("Failed to open volume from %(path)s."), {'path': path})
+
+
+def _transfer_data(src, dest, length, chunk_size):
+ """Transfer data between files (Python IO objects)."""
+
+ chunks = int(math.ceil(length / chunk_size))
+ remaining_length = length
+
+ LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.",
+ {'chunks': chunks, 'bytes': chunk_size})
+
+ for chunk in xrange(0, chunks):
+ before = time.time()
+ data = tpool.execute(src.read, min(chunk_size, remaining_length))
+
+ # If we have reached end of source, discard any extraneous bytes from
+ # destination volume if trim is enabled and stop writing.
+ if data == '':
+ break
+
+ tpool.execute(dest.write, data)
+ remaining_length -= len(data)
+ delta = (time.time() - before)
+ rate = (chunk_size / delta) / units.Ki
+ LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).",
+ {'chunk': chunk + 1, 'chunks': chunks, 'rate': rate})
+
+ # yield to any other pending operations
+ eventlet.sleep(0)
+
+ tpool.execute(dest.flush)
+
+
+def _copy_volume_with_file(src, dest, size_in_m):
+ src_handle = src
+ if isinstance(src, six.string_types):
+ src_handle = _open_volume_with_path(src, 'rb')
+
+ dest_handle = dest
+ if isinstance(dest, six.string_types):
+ dest_handle = _open_volume_with_path(dest, 'wb')
+
+ if not src_handle:
+ raise exception.DeviceUnavailable(
+ _("Failed to copy volume, source device unavailable."))
+
+ if not dest_handle:
+ raise exception.DeviceUnavailable(
+ _("Failed to copy volume, destination device unavailable."))
+
+ start_time = timeutils.utcnow()
+
+ _transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4)
+
+ duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow()))
+
+ if isinstance(src, six.string_types):
+ src_handle.close()
+ if isinstance(dest, six.string_types):
+ dest_handle.close()
+
+ mbps = (size_in_m / duration)
+ LOG.info(_LI("Volume copy completed (%(size_in_m).2f MB at "
+ "%(mbps).2f MB/s)."),
+ {'size_in_m': size_in_m, 'mbps': mbps})
+
+
+def copy_volume(src, dest, size_in_m, blocksize, sync=False,
execute=utils.execute, ionice=None, throttle=None,
sparse=False):
- if not throttle:
- throttle = throttling.Throttle.get_default()
- with throttle.subcommand(srcstr, deststr) as throttle_cmd:
- _copy_volume(throttle_cmd['prefix'], srcstr, deststr,
- size_in_m, blocksize, sync=sync,
- execute=execute, ionice=ionice, sparse=sparse)
+ """Copy data from the source volume to the destination volume.
+
+ The parameters 'src' and 'dest' are both typically of type str, which
+ represents the path to each volume on the filesystem. Connectors can
+ optionally return a volume handle of type RawIOBase for volumes that are
+ not available on the local filesystem for open/close operations.
+
+ If either 'src' or 'dest' are not of type str, then they are assumed to be
+ of type RawIOBase or any derivative that supports file operations such as
+ read and write. In this case, the handles are treated as file handles
+ instead of file paths and, at present moment, throttling is unavailable.
+ """
+
+ if (isinstance(src, six.string_types) and
+ isinstance(dest, six.string_types)):
+ if not throttle:
+ throttle = throttling.Throttle.get_default()
+ with throttle.subcommand(src, dest) as throttle_cmd:
+ _copy_volume_with_path(throttle_cmd['prefix'], src, dest,
+ size_in_m, blocksize, sync=sync,
+ execute=execute, ionice=ionice,
+ sparse=sparse)
+ else:
+ _copy_volume_with_file(src, dest, size_in_m)
def clear_volume(volume_size, volume_path, volume_clear=None,