"""Tests for Volume Code."""
import datetime
+import ddt
import os
import shutil
import socket
from cinder.tests.unit import utils as tests_utils
from cinder import utils
import cinder.volume
+from cinder.volume import api as volume_api
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
self.assertEqual('error', volume['status'])
+@ddt.ddt
class VolumeMigrationTestCase(VolumeTestCase):
def test_migrate_volume_driver(self):
"""Test volume migration done by driver."""
mock_copy.assert_called_once_with('foo', 'bar', 0, '1M',
sparse=True)
+ def fake_attach_volume(self, ctxt, volume, instance_uuid, host_name,
+ mountpoint, mode):
+ tests_utils.attach_volume(ctxt, volume.id,
+ instance_uuid, host_name,
+ '/dev/vda')
+
def _test_migrate_volume_completion(self, status='available',
instance_uuid=None, attached_host=None,
retyping=False,
previous_status='available'):
- def fake_attach_volume(ctxt, volume, instance_uuid, host_name,
- mountpoint, mode):
- tests_utils.attach_volume(ctxt, volume.id,
- instance_uuid, host_name,
- '/dev/vda')
initial_status = retyping and 'retyping' or status
old_volume = tests_utils.create_volume(self.context, size=0,
mock.patch.object(volume_rpcapi.VolumeAPI,
'update_migrated_volume'),\
mock.patch.object(self.volume.driver, 'attach_volume'):
- mock_attach_volume.side_effect = fake_attach_volume
+ mock_attach_volume.side_effect = self.fake_attach_volume
self.volume.migrate_volume_completion(self.context, old_volume.id,
new_volume.id)
after_new_volume = objects.Volume.get_by_id(self.context,
retyping=False,
previous_status='in-use')
+ @ddt.data(False, True)
+ def test_api_migrate_volume_completion_from_swap_with_no_migration(
+ self, swap_error):
+ # This test validates that Cinder properly finishes the swap volume
+ # status updates for the case that no migration has occurred
+ instance_uuid = '83c969d5-065e-4c9c-907d-5394bc2e98e2'
+ attached_host = 'attached-host'
+ orig_attached_vol = tests_utils.create_volume(self.context, size=0)
+ orig_attached_vol = tests_utils.attach_volume(
+ self.context, orig_attached_vol['id'], instance_uuid,
+ attached_host, '/dev/vda')
+ new_volume = tests_utils.create_volume(self.context, size=0)
+
+ @mock.patch.object(volume_rpcapi.VolumeAPI, 'detach_volume')
+ @mock.patch.object(volume_rpcapi.VolumeAPI, 'attach_volume')
+ def _run_migration_completion(rpc_attach_volume,
+ rpc_detach_volume):
+ attachment = orig_attached_vol['volume_attachment'][0]
+ attachment_id = attachment['id']
+ rpc_attach_volume.side_effect = self.fake_attach_volume
+ vol_id = volume_api.API().migrate_volume_completion(
+ self.context, orig_attached_vol, new_volume, swap_error)
+ if swap_error:
+ # When swap failed, we don't want to finish attachment
+ self.assertFalse(rpc_detach_volume.called)
+ self.assertFalse(rpc_attach_volume.called)
+ else:
+ # When no error, we should be finishing the attachment
+ rpc_detach_volume.assert_called_with(self.context,
+ orig_attached_vol,
+ attachment_id)
+ rpc_attach_volume.assert_called_with(
+ self.context, new_volume, attachment['instance_uuid'],
+ attachment['attached_host'], attachment['mountpoint'],
+ 'rw')
+ self.assertEqual(new_volume['id'], vol_id)
+
+ _run_migration_completion()
+
def test_retype_setup_fail_volume_is_available(self):
"""Verify volume is still available if retype prepare failed."""
elevated = context.get_admin_context()
# This is a volume swap initiated by Nova, not Cinder. Nova expects
# us to return the new_volume_id.
if not (volume.migration_status or new_volume.migration_status):
- # Don't need to do migration, but still need to finish the
- # volume attach and detach so volumes don't end in 'attaching'
- # and 'detaching' state
- attachments = volume.volume_attachment
- for attachment in attachments:
- self.detach(context, volume, attachment.id)
-
- self.attach(context, new_volume,
- attachment.instance_uuid,
- attachment.attached_host,
- attachment.mountpoint,
- 'rw')
+ # When we're not migrating and haven't hit any errors, we issue
+ # volume attach and detach requests so the volumes don't end in
+ # 'attaching' and 'detaching' state
+ if not error:
+ attachments = volume.volume_attachment
+ for attachment in attachments:
+ self.detach(context, volume, attachment.id)
+
+ self.attach(context, new_volume,
+ attachment.instance_uuid,
+ attachment.attached_host,
+ attachment.mountpoint,
+ 'rw')
return new_volume.id