self.assertGreater(volume_api.availability_zones_last_fetched,
last_fetched)
+ def test_list_availability_zones_enabled_service(self):
+ services = [
+ {'availability_zone': 'ping', 'disabled': 0},
+ {'availability_zone': 'ping', 'disabled': 1},
+ {'availability_zone': 'pong', 'disabled': 0},
+ {'availability_zone': 'pung', 'disabled': 1},
+ ]
+
+ def stub_service_get_all_by_topic(*args, **kwargs):
+ return services
+
+ self.stubs.Set(db, 'service_get_all_by_topic',
+ stub_service_get_all_by_topic)
+
+ volume_api = cinder.volume.api.API()
+ azs = volume_api.list_availability_zones()
+ azs = list(azs).sort()
+
+ expected = [
+ {'name': 'pung', 'available': False},
+ {'name': 'pong', 'available': True},
+ {'name': 'ping', 'available': True},
+ ].sort()
+
+ self.assertEqual(expected, azs)
+
class VolumeTestCase(BaseVolumeTestCase):
self.volume.delete_snapshot(self.context, snapshot)
self.volume.delete_volume(self.context, volume['id'])
- def test_delete_volume_in_consistency_group(self):
- """Test deleting a volume that's tied to a consistency group fails."""
- volume_api = cinder.volume.api.API()
- volume = tests_utils.create_volume(self.context, **self.volume_params)
- consistencygroup_id = '12345678-1234-5678-1234-567812345678'
- volume = db.volume_update(self.context, volume['id'],
- {'status': 'available',
- 'consistencygroup_id': consistencygroup_id})
- self.assertRaises(exception.InvalidVolume,
- volume_api.delete, self.context, volume)
-
def test_can_delete_errored_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.delete_volume(self.context, volume_dst['id'])
self.volume.delete_volume(self.context, volume_src['id'])
- @mock.patch('cinder.db.volume_update')
- def test_update_migrated_volume(self, volume_update):
- fake_host = 'fake_host'
- fake_new_host = 'fake_new_host'
- fake_update = {'_name_id': 'updated_id',
- 'provider_location': 'updated_location'}
- fake_elevated = 'fake_elevated'
- volume = tests_utils.create_volume(self.context, size=1,
- status='available',
- host=fake_host)
- new_volume = tests_utils.create_volume(
- self.context, size=1,
- status='available',
- provider_location='fake_provider_location',
- _name_id='fake_name_id',
- host=fake_new_host)
- fake_update_error = {'_name_id': new_volume['_name_id'],
- 'provider_location':
- new_volume['provider_location']}
- expected_update = {'_name_id': volume['_name_id'],
- 'provider_location': volume['provider_location']}
- with mock.patch.object(self.volume.driver,
- 'update_migrated_volume') as \
- migrate_update,\
- mock.patch.object(self.context, 'elevated') as elevated:
- migrate_update.return_value = fake_update
- elevated.return_value = fake_elevated
- self.volume.update_migrated_volume(self.context, volume,
- new_volume, 'available')
- volume_update.assert_has_calls((
- mock.call(fake_elevated, volume['id'], fake_update),
- mock.call(fake_elevated, new_volume['id'], expected_update)))
+ def test_clean_temporary_volume(self):
+ def fake_delete_volume(ctxt, volume):
+ db.volume_destroy(ctxt, volume['id'])
- # Test the case for update_migrated_volume not implemented
- # for the driver.
- migrate_update.reset_mock()
- volume_update.reset_mock()
- migrate_update.side_effect = NotImplementedError
- self.volume.update_migrated_volume(self.context, volume,
- new_volume, 'available')
- volume_update.assert_has_calls((
- mock.call(fake_elevated, volume['id'], fake_update_error),
- mock.call(fake_elevated, new_volume['id'], expected_update)))
+ fake_volume = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ fake_new_volume = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ # Check when the migrated volume is in migration
+ db.volume_update(self.context, fake_volume['id'],
+ {'migration_status': 'migrating'})
+ # 1. Only clean the db
+ self.volume._clean_temporary_volume(self.context, fake_volume['id'],
+ fake_new_volume['id'],
+ clean_db_only=True)
+ self.assertRaises(exception.VolumeNotFound,
+ db.volume_get, self.context,
+ fake_new_volume['id'])
- def test_list_availability_zones_enabled_service(self):
- services = [
- {'availability_zone': 'ping', 'disabled': 0},
- {'availability_zone': 'ping', 'disabled': 1},
- {'availability_zone': 'pong', 'disabled': 0},
- {'availability_zone': 'pung', 'disabled': 1},
- ]
+ # 2. Delete the backend storage
+ fake_new_volume = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ with mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
+ mock_delete_volume:
+ mock_delete_volume.side_effect = fake_delete_volume
+ self.volume._clean_temporary_volume(self.context,
+ fake_volume['id'],
+ fake_new_volume['id'],
+ clean_db_only=False)
+ self.assertRaises(exception.VolumeNotFound,
+ db.volume_get, self.context,
+ fake_new_volume['id'])
- def stub_service_get_all_by_topic(*args, **kwargs):
- return services
+ # Check when the migrated volume is not in migration
+ fake_new_volume = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ db.volume_update(self.context, fake_volume['id'],
+ {'migration_status': 'non-migrating'})
+ self.volume._clean_temporary_volume(self.context, fake_volume['id'],
+ fake_new_volume['id'])
+ volume = db.volume_get(context.get_admin_context(),
+ fake_new_volume['id'])
+ self.assertIsNone(volume['migration_status'])
- self.stubs.Set(db, 'service_get_all_by_topic',
- stub_service_get_all_by_topic)
+ def test_update_volume_readonly_flag(self):
+ """Test volume readonly flag can be updated at API level."""
+ # create a volume and assign to host
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
+ self.volume.create_volume(self.context, volume['id'])
+ volume['status'] = 'in-use'
volume_api = cinder.volume.api.API()
- azs = volume_api.list_availability_zones()
- azs = list(azs).sort()
- expected = [
- {'name': 'pung', 'available': False},
- {'name': 'pong', 'available': True},
- {'name': 'ping', 'available': True},
- ].sort()
-
- self.assertEqual(expected, azs)
-
- @mock.patch.object(utils, 'brick_get_connector_properties')
- @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
- @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
- @mock.patch.object(volutils, 'copy_volume')
- @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
- def test_copy_volume_data(self,
- mock_get_capabilities,
- mock_copy,
- mock_detach,
- mock_attach,
- mock_get_connector):
- """Test function of _copy_volume_data."""
+ # Update fails when status != available
+ self.assertRaises(exception.InvalidVolume,
+ volume_api.update_readonly_flag,
+ self.context,
+ volume,
+ False)
- src_vol = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- dest_vol = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- mock_get_connector.return_value = {}
- self.volume.driver._throttle = mock.MagicMock()
+ volume['status'] = 'available'
- attach_expected = [
- mock.call(self.context, dest_vol, {}, remote=False),
- mock.call(self.context, src_vol, {}, remote=False)]
+ # works when volume in 'available' status
+ volume_api.update_readonly_flag(self.context, volume, False)
- detach_expected = [
- mock.call(self.context, {'device': {'path': 'bar'}},
- dest_vol, {}, force=False, remote=False),
- mock.call(self.context, {'device': {'path': 'foo'}},
- src_vol, {}, force=False, remote=False)]
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertEqual('available', volume['status'])
+ admin_metadata = volume['volume_admin_metadata']
+ self.assertEqual(1, len(admin_metadata))
+ self.assertEqual('readonly', admin_metadata[0]['key'])
+ self.assertEqual('False', admin_metadata[0]['value'])
- attach_volume_returns = [
- {'device': {'path': 'bar'}},
- {'device': {'path': 'foo'}}
- ]
+ # clean up
+ self.volume.delete_volume(self.context, volume['id'])
- # Test case for sparse_copy_volume = False
- mock_attach.side_effect = attach_volume_returns
- mock_get_capabilities.return_value = {}
- self.volume._copy_volume_data(self.context,
- src_vol,
- dest_vol)
+ def test_secure_file_operations_enabled(self):
+ """Test secure file operations setting for base driver.
- self.assertEqual(attach_expected, mock_attach.mock_calls)
- mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
- self.assertEqual(detach_expected, mock_detach.mock_calls)
+ General, non network file system based drivers do not have
+ anything to do with "secure_file_operations". This test verifies that
+ calling the method always returns False.
+ """
+ ret_flag = self.volume.driver.secure_file_operations_enabled()
+ self.assertFalse(ret_flag)
- # Test case for sparse_copy_volume = True
- mock_attach.reset_mock()
- mock_detach.reset_mock()
- mock_attach.side_effect = attach_volume_returns
- mock_get_capabilities.return_value = {'sparse_copy_volume': True}
- self.volume._copy_volume_data(self.context,
- src_vol,
- dest_vol)
+ @mock.patch('cinder.volume.flows.common.make_pretty_name',
+ new=mock.MagicMock())
+ @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.create_volume',
+ return_value=None)
+ @mock.patch('cinder.volume.flows.manager.create_volume.'
+ 'CreateVolumeFromSpecTask.execute',
+ side_effect=exception.DriverNotInitialized())
+ def test_create_volume_raise_rescheduled_exception(self, mock_execute,
+ mock_reschedule):
+ # Create source volume
+ test_vol = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ test_vol_id = test_vol['id']
+ self.assertRaises(exception.DriverNotInitialized,
+ self.volume.create_volume,
+ self.context, test_vol_id,
+ {'volume_properties': self.volume_params},
+ {'retry': {'num_attempts': 1, 'host': []}})
+ self.assertTrue(mock_reschedule.called)
+ volume = db.volume_get(context.get_admin_context(), test_vol_id)
+ self.assertEqual('creating', volume['status'])
- self.assertEqual(attach_expected, mock_attach.mock_calls)
- mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
- self.assertEqual(detach_expected, mock_detach.mock_calls)
+ @mock.patch('cinder.volume.flows.manager.create_volume.'
+ 'CreateVolumeFromSpecTask.execute')
+ def test_create_volume_raise_unrescheduled_exception(self, mock_execute):
+ # create source volume
+ test_vol = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ test_vol_id = test_vol['id']
+ mock_execute.side_effect = exception.VolumeNotFound(
+ volume_id=test_vol_id)
+ self.assertRaises(exception.VolumeNotFound,
+ self.volume.create_volume,
+ self.context, test_vol_id,
+ {'volume_properties': self.volume_params},
+ {'retry': {'num_attempts': 1, 'host': []}})
+ volume = db.volume_get(context.get_admin_context(), test_vol_id)
+ self.assertEqual('error', volume['status'])
- # cleanup resource
- db.volume_destroy(self.context, src_vol['id'])
- db.volume_destroy(self.context, dest_vol['id'])
+class VolumeMigrationTestCase(VolumeTestCase):
def test_migrate_volume_driver(self):
"""Test volume migration done by driver."""
# stub out driver and rpc functions
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
- def test_clean_temporary_volume(self):
- def fake_delete_volume(ctxt, volume):
- db.volume_destroy(ctxt, volume['id'])
-
- fake_volume = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- fake_new_volume = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- # Check when the migrated volume is in migration
- db.volume_update(self.context, fake_volume['id'],
- {'migration_status': 'migrating'})
- # 1. Only clean the db
- self.volume._clean_temporary_volume(self.context, fake_volume['id'],
- fake_new_volume['id'],
- clean_db_only=True)
- self.assertRaises(exception.VolumeNotFound,
- db.volume_get, self.context,
- fake_new_volume['id'])
-
- # 2. Delete the backend storage
- fake_new_volume = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- with mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
- mock_delete_volume:
- mock_delete_volume.side_effect = fake_delete_volume
- self.volume._clean_temporary_volume(self.context,
- fake_volume['id'],
- fake_new_volume['id'],
- clean_db_only=False)
- self.assertRaises(exception.VolumeNotFound,
- db.volume_get, self.context,
- fake_new_volume['id'])
+ @mock.patch('cinder.db.volume_update')
+ def test_update_migrated_volume(self, volume_update):
+ fake_host = 'fake_host'
+ fake_new_host = 'fake_new_host'
+ fake_update = {'_name_id': 'updated_id',
+ 'provider_location': 'updated_location'}
+ fake_elevated = 'fake_elevated'
+ volume = tests_utils.create_volume(self.context, size=1,
+ status='available',
+ host=fake_host)
+ new_volume = tests_utils.create_volume(
+ self.context, size=1,
+ status='available',
+ provider_location='fake_provider_location',
+ _name_id='fake_name_id',
+ host=fake_new_host)
+ new_volume['_name_id'] = 'fake_name_id'
+ new_volume['provider_location'] = 'fake_provider_location'
+ fake_update_error = {'_name_id': new_volume['_name_id'],
+ 'provider_location':
+ new_volume['provider_location']}
+ expected_update = {'_name_id': volume['_name_id'],
+ 'provider_location': volume['provider_location']}
+ with mock.patch.object(self.volume.driver,
+ 'update_migrated_volume') as migrate_update,\
+ mock.patch.object(self.context, 'elevated') as elevated:
+ migrate_update.return_value = fake_update
+ elevated.return_value = fake_elevated
+ self.volume.update_migrated_volume(self.context, volume,
+ new_volume, 'available')
+ volume_update.assert_has_calls((
+ mock.call(fake_elevated, volume['id'], fake_update),
+ mock.call(fake_elevated, new_volume['id'], expected_update)))
- # Check when the migrated volume is not in migration
- fake_new_volume = tests_utils.create_volume(self.context, size=1,
- host=CONF.host)
- db.volume_update(self.context, fake_volume['id'],
- {'migration_status': 'non-migrating'})
- self.volume._clean_temporary_volume(self.context, fake_volume['id'],
- fake_new_volume['id'])
- volume = db.volume_get(context.get_admin_context(),
- fake_new_volume['id'])
- self.assertIsNone(volume['migration_status'])
+ # Test the case for update_migrated_volume not implemented
+ # for the driver.
+ migrate_update.reset_mock()
+ volume_update.reset_mock()
+ migrate_update.side_effect = NotImplementedError
+ self.volume.update_migrated_volume(self.context, volume,
+ new_volume, 'available')
+ volume_update.assert_has_calls((
+ mock.call(fake_elevated, volume['id'], fake_update_error),
+ mock.call(fake_elevated, new_volume['id'], expected_update)))
def test_migrate_volume_generic_create_volume_error(self):
self.expected_status = 'error'
+
with mock.patch.object(self.volume.driver, 'migrate_volume'), \
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume') as \
mock_create_volume, \
self.volume.driver._initialized = True
self.volume.delete_volume(self.context, volume['id'])
- def test_update_volume_readonly_flag(self):
- """Test volume readonly flag can be updated at API level."""
- # create a volume and assign to host
- volume = tests_utils.create_volume(self.context,
- admin_metadata={'readonly': 'True'},
- **self.volume_params)
- self.volume.create_volume(self.context, volume['id'])
- volume['status'] = 'in-use'
+class ConsistencyGroupTestCase(BaseVolumeTestCase):
+ def test_delete_volume_in_consistency_group(self):
+ """Test deleting a volume that's tied to a consistency group fails."""
volume_api = cinder.volume.api.API()
-
- # Update fails when status != available
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
+ consistencygroup_id = '12345678-1234-5678-1234-567812345678'
+ volume = db.volume_update(self.context, volume['id'],
+ {'status': 'available',
+ 'consistencygroup_id': consistencygroup_id})
self.assertRaises(exception.InvalidVolume,
- volume_api.update_readonly_flag,
- self.context,
- volume,
- False)
-
- volume['status'] = 'available'
-
- # works when volume in 'available' status
- volume_api.update_readonly_flag(self.context, volume, False)
-
- volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual('available', volume['status'])
- admin_metadata = volume['volume_admin_metadata']
- self.assertEqual(1, len(admin_metadata))
- self.assertEqual('readonly', admin_metadata[0]['key'])
- self.assertEqual('False', admin_metadata[0]['value'])
-
- # clean up
- self.volume.delete_volume(self.context, volume['id'])
+ volume_api.delete, self.context, volume)
@mock.patch.object(CGQUOTAS, "reserve",
return_value=["RESERVATION"])
# Group is not deleted
self.assertEqual('available', cg.status)
- def test_secure_file_operations_enabled(self):
- """Test secure file operations setting for base driver.
-
- General, non network file system based drivers do not have
- anything to do with "secure_file_operations". This test verifies that
- calling the method always returns False.
- """
- ret_flag = self.volume.driver.secure_file_operations_enabled()
- self.assertFalse(ret_flag)
-
- @mock.patch('cinder.volume.flows.common.make_pretty_name',
- new=mock.MagicMock())
- @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.create_volume',
- return_value=None)
- @mock.patch('cinder.volume.flows.manager.create_volume.'
- 'CreateVolumeFromSpecTask.execute',
- side_effect=exception.DriverNotInitialized())
- def test_create_volume_raise_rescheduled_exception(self, mock_execute,
- mock_reschedule):
- # Create source volume
- test_vol = tests_utils.create_volume(self.context,
- **self.volume_params)
- test_vol_id = test_vol['id']
- self.assertRaises(exception.DriverNotInitialized,
- self.volume.create_volume,
- self.context, test_vol_id,
- {'volume_properties': self.volume_params},
- {'retry': {'num_attempts': 1, 'host': []}})
- self.assertTrue(mock_reschedule.called)
- volume = db.volume_get(context.get_admin_context(), test_vol_id)
- self.assertEqual('creating', volume['status'])
-
- @mock.patch('cinder.volume.flows.manager.create_volume.'
- 'CreateVolumeFromSpecTask.execute')
- def test_create_volume_raise_unrescheduled_exception(self, mock_execute):
- # create source volume
- test_vol = tests_utils.create_volume(self.context,
- **self.volume_params)
- test_vol_id = test_vol['id']
- mock_execute.side_effect = exception.VolumeNotFound(
- volume_id=test_vol_id)
- self.assertRaises(exception.VolumeNotFound,
- self.volume.create_volume,
- self.context, test_vol_id,
- {'volume_properties': self.volume_params},
- {'retry': {'num_attempts': 1, 'host': []}})
- volume = db.volume_get(context.get_admin_context(), test_vol_id)
- self.assertEqual('error', volume['status'])
-
def test_create_volume_with_consistencygroup_invalid_type(self):
"""Test volume creation with ConsistencyGroup & invalid volume type."""
vol_type = db.volume_type_create(
db.volume_destroy(self.context, src_vol['id'])
db.volume_destroy(self.context, dest_vol['id'])
+ @mock.patch.object(utils, 'brick_get_connector_properties')
+ @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
+ @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
+ @mock.patch.object(volutils, 'copy_volume')
+ @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
+ def test_copy_volume_data_mgr(self,
+ mock_get_capabilities,
+ mock_copy,
+ mock_detach,
+ mock_attach,
+ mock_get_connector):
+ """Test function of _copy_volume_data."""
+
+ src_vol = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ dest_vol = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host)
+ mock_get_connector.return_value = {}
+ self.volume.driver._throttle = mock.MagicMock()
+
+ attach_expected = [
+ mock.call(self.context, dest_vol, {}, remote=False),
+ mock.call(self.context, src_vol, {}, remote=False)]
+
+ detach_expected = [
+ mock.call(self.context, {'device': {'path': 'bar'}},
+ dest_vol, {}, force=False, remote=False),
+ mock.call(self.context, {'device': {'path': 'foo'}},
+ src_vol, {}, force=False, remote=False)]
+
+ attach_volume_returns = [
+ {'device': {'path': 'bar'}},
+ {'device': {'path': 'foo'}}
+ ]
+
+ # Test case for sparse_copy_volume = False
+ mock_attach.side_effect = attach_volume_returns
+ mock_get_capabilities.return_value = {}
+ self.volume._copy_volume_data(self.context,
+ src_vol,
+ dest_vol)
+
+ self.assertEqual(attach_expected, mock_attach.mock_calls)
+ mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
+ self.assertEqual(detach_expected, mock_detach.mock_calls)
+
+ # Test case for sparse_copy_volume = True
+ mock_attach.reset_mock()
+ mock_detach.reset_mock()
+ mock_attach.side_effect = attach_volume_returns
+ mock_get_capabilities.return_value = {'sparse_copy_volume': True}
+ self.volume._copy_volume_data(self.context,
+ src_vol,
+ dest_vol)
+
+ self.assertEqual(attach_expected, mock_attach.mock_calls)
+ mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
+ self.assertEqual(detach_expected, mock_detach.mock_calls)
+
+ # cleanup resource
+ db.volume_destroy(self.context, src_vol['id'])
+ db.volume_destroy(self.context, dest_vol['id'])
+
class LVMISCSIVolumeDriverTestCase(DriverTestCase):
"""Test case for VolumeDriver"""