return (volume, self._get_vdisk_uid(volume['name']))
- def test_manage_existing_bad_ref(self):
+ def test_manage_existing_get_size_bad_ref(self):
"""Error on manage with bad reference.
This test case attempts to manage an existing volume but passes in
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_get_size, volume, ref)
- def test_manage_existing_bad_uid(self):
+ def test_manage_existing_get_size_bad_uid(self):
"""Error when the specified UUID does not exist."""
volume = self._generate_vol_info(None, None)
ref = {'source-id': 'bad_uid'}
self.driver.manage_existing_get_size, volume, ref)
pass
+ def test_manage_existing_get_size_bad_name(self):
+ """Error when the specified name does not exist."""
+ volume = self._generate_vol_info(None, None)
+ ref = {'source-name': 'bad_name'}
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing_get_size, volume, ref)
+
+ def test_manage_existing_bad_ref(self):
+ """Error on manage with bad reference.
+
+ This test case attempts to manage an existing volume but passes in
+ a bad reference that the Storwize driver doesn't understand. We
+ expect an exception to be raised.
+ """
+
+ # Error when neither UUID nor name are specified.
+ volume = self._generate_vol_info(None, None)
+ ref = {}
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing, volume, ref)
+
+ # Error when the specified UUID does not exist.
+ volume = self._generate_vol_info(None, None)
+ ref = {'source-id': 'bad_uid'}
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing, volume, ref)
+
+ # Error when the specified name does not exist.
+ volume = self._generate_vol_info(None, None)
+ ref = {'source-name': 'bad_name'}
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing, volume, ref)
+
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_vdisk_copy_attrs')
+ def test_manage_existing_mismatch(self,
+ get_vdisk_copy_attrs):
+ ctxt = testutils.get_test_admin_context()
+ _volume, uid = self._create_volume_and_return_uid('manage_test')
+
+ opts = {'rsize': -1}
+ type_thick_ref = volume_types.create(ctxt, 'testtype1', opts)
+
+ opts = {'rsize': 2}
+ type_thin_ref = volume_types.create(ctxt, 'testtype2', opts)
+
+ opts = {'rsize': 2, 'compression': True}
+ type_comp_ref = volume_types.create(ctxt, 'testtype3', opts)
+
+ opts = {'rsize': -1, 'iogrp': 1}
+ type_iogrp_ref = volume_types.create(ctxt, 'testtype4', opts)
+
+ new_volume = self._generate_vol_info(None, None)
+ ref = {'source-name': _volume['name']}
+
+ fake_copy_thin = self._get_default_opts()
+ fake_copy_thin['autoexpand'] = 'on'
+
+ fake_copy_comp = self._get_default_opts()
+ fake_copy_comp['autoexpand'] = 'on'
+ fake_copy_comp['compressed_copy'] = 'yes'
+
+ fake_copy_thick = self._get_default_opts()
+ fake_copy_thick['autoexpand'] = ''
+ fake_copy_thick['compressed_copy'] = 'no'
+
+ fake_copy_no_comp = self._get_default_opts()
+ fake_copy_no_comp['compressed_copy'] = 'no'
+
+ valid_iogrp = self.driver._state['available_iogrps']
+ self.driver._state['available_iogrps'] = [9999]
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+ self.driver._state['available_iogrps'] = valid_iogrp
+
+ get_vdisk_copy_attrs.side_effect = [fake_copy_thin,
+ fake_copy_thick,
+ fake_copy_no_comp,
+ fake_copy_comp,
+ fake_copy_thick,
+ fake_copy_thick
+ ]
+ new_volume['volume_type_id'] = type_thick_ref['id']
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ new_volume['volume_type_id'] = type_thin_ref['id']
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ new_volume['volume_type_id'] = type_comp_ref['id']
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ new_volume['volume_type_id'] = type_thin_ref['id']
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ new_volume['volume_type_id'] = type_iogrp_ref['id']
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ new_volume['volume_type_id'] = type_thick_ref['id']
+ no_exist_pool = 'i-dont-exist-%s' % random.randint(10000, 99999)
+ self._set_flag('storwize_svc_volpool_name', no_exist_pool)
+ self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
+ self.driver.manage_existing, new_volume, ref)
+
+ self._reset_flags()
+ volume_types.destroy(ctxt, type_thick_ref['id'])
+ volume_types.destroy(ctxt, type_comp_ref['id'])
+ volume_types.destroy(ctxt, type_iogrp_ref['id'])
+
def test_manage_existing_good_uid_not_mapped(self):
"""Tests managing a volume with no mappings.
uid_of_new_volume = self._get_vdisk_uid(new_volume['name'])
self.assertEqual(uid, uid_of_new_volume)
- def test_manage_existing_good_uid_mapped(self):
+ def test_manage_existing_good_name_not_mapped(self):
+ """Tests managing a volume with no mappings.
+
+ This test case attempts to manage an existing volume by name, and
+ we expect it to succeed. We verify that the backend volume was
+ renamed to have the name of the Cinder volume that we asked for it to
+ be associated with.
+ """
+
+ # Create a volume as a way of getting a vdisk created, and find out the
+ # UID of that vdisk.
+ _volume, uid = self._create_volume_and_return_uid('manage_test')
+
+ # Descriptor of the Cinder volume that we want to own the vdisk
+ # referenced by uid.
+ new_volume = self._generate_vol_info(None, None)
+
+ # Submit the request to manage it.
+ ref = {'source-name': _volume['name']}
+ size = self.driver.manage_existing_get_size(new_volume, ref)
+ self.assertEqual(10, size)
+ self.driver.manage_existing(new_volume, ref)
+
+ # Assert that there is a disk named after the new volume that has the
+ # ID that we passed in, indicating that the disk has been renamed.
+ uid_of_new_volume = self._get_vdisk_uid(new_volume['name'])
+ self.assertEqual(uid, uid_of_new_volume)
+
+ def test_manage_existing_mapped(self):
"""Tests managing a mapped volume with no override.
This test case attempts to manage an existing volume by UID, but
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_get_size, volume, ref)
+ ref = {'source-name': volume['name']}
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing_get_size, volume, ref)
+
def test_manage_existing_good_uid_mapped_with_override(self):
"""Tests managing a mapped volume with override.
uid_of_new_volume = self._get_vdisk_uid(new_volume['name'])
self.assertEqual(uid, uid_of_new_volume)
+ def test_manage_existing_good_name_mapped_with_override(self):
+ """Tests managing a mapped volume with override.
+
+ This test case attempts to manage an existing volume by name, when it
+ already mapped to a host, but the ref specifies that this is OK.
+ We verify that the backend volume was renamed to have the name of the
+ Cinder volume that we asked for it to be associated with.
+ """
+ # Create a volume as a way of getting a vdisk created, and find out the
+ # UUID of that vdisk.
+ volume, uid = self._create_volume_and_return_uid('manage_test')
+
+ # Map a host to the disk
+ conn = {'initiator': u'unicode:initiator3',
+ 'ip': '10.10.10.12',
+ 'host': u'unicode.foo}.bar}.baz'}
+ self.driver.initialize_connection(volume, conn)
+
+ # Descriptor of the Cinder volume that we want to own the vdisk
+ # referenced by uid.
+ new_volume = self._generate_vol_info(None, None)
+
+ # Submit the request to manage it, specifying that it is OK to
+ # manage a volume that is already attached.
+ ref = {'source-name': volume['name'], 'manage_if_in_use': True}
+ size = self.driver.manage_existing_get_size(new_volume, ref)
+ self.assertEqual(10, size)
+ self.driver.manage_existing(new_volume, ref)
+
+ # Assert that there is a disk named after the new volume that has the
+ # ID that we passed in, indicating that the disk has been renamed.
+ uid_of_new_volume = self._get_vdisk_uid(new_volume['name'])
+ self.assertEqual(uid, uid_of_new_volume)
+
class CLIResponseTestCase(test.TestCase):
def test_empty(self):
raise exception.VolumeBackendAPIException(data=msg)
return iogrps
+ def get_volume_io_group(self, vol_name):
+ vdisk = self.ssh.lsvdisk(vol_name)
+ if vdisk:
+ resp = self.ssh.lsiogrp()
+ for iogrp in resp:
+ if iogrp['name'] == vdisk['IO_group_name']:
+ return int(iogrp['id'])
+ return None
+
def get_node_info(self):
"""Return dictionary containing information on system's nodes."""
nodes = {}
if we got here then we have a vdisk that isn't in use (or we don't
care if it is in use.
"""
- vdisk = self._helpers.vdisk_by_uid(ref['source-id'])
- if vdisk is None:
- reason = (_('No vdisk with the UID specified by source-id %s.')
- % ref['source-id'])
- raise exception.ManageExistingInvalidReference(existing_ref=ref,
- reason=reason)
+ # Check that the reference is valid
+ vdisk = self._manage_input_check(ref)
+ vdisk_io_grp = self._helpers.get_volume_io_group(vdisk['name'])
+ if vdisk_io_grp not in self._state['available_iogrps']:
+ msg = (_("Failed to manage existing volume due to "
+ "the volume to be managed is not in a valid "
+ "I/O group."))
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+ if volume['volume_type_id']:
+ opts = self._get_vdisk_params(volume['volume_type_id'],
+ volume_metadata=
+ volume.get('volume_metadata'))
+ vdisk_copy = self._helpers.get_vdisk_copy_attrs(vdisk['name'], '0')
+
+ if vdisk_copy['autoexpand'] == 'on' and opts['rsize'] == -1:
+ msg = (_("Failed to manage existing volume due to "
+ "the volume to be managed is thin, but "
+ "the volume type chosen is thick."))
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+
+ if not vdisk_copy['autoexpand'] and opts['rsize'] != -1:
+ msg = (_("Failed to manage existing volume due to "
+ "the volume to be managed is thick, but "
+ "the volume type chosen is thin."))
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+
+ if (vdisk_copy['compressed_copy'] == 'no' and
+ opts['compression']):
+ msg = (_("Failed to manage existing volume due to the "
+ "volume to be managed is not compress, but "
+ "the volume type chosen is compress."))
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+
+ if (vdisk_copy['compressed_copy'] == 'yes' and
+ not opts['compression']):
+ msg = (_("Failed to manage existing volume due to the "
+ "volume to be managed is compress, but "
+ "the volume type chosen is not compress."))
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+
+ if vdisk_io_grp != opts['iogrp']:
+ msg = (_("Failed to manage existing volume due to "
+ "I/O group mismatch. The I/O group of the "
+ "volume to be managed is %(vdisk_iogrp)s. I/O group"
+ "of the chosen type is %(opt_iogrp)s.") %
+ {'vdisk_iogrp': vdisk['IO_group_name'],
+ 'opt_iogrp': opts['iogrp']})
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
+
+ if (vdisk['mdisk_grp_name'] !=
+ self.configuration.storwize_svc_volpool_name):
+ msg = (_("Failed to manage existing volume due to the "
+ "pool of the volume to be managed does not "
+ "match the backend pool. Pool of the "
+ "volume to be managed is %(vdisk_pool)s. Pool "
+ "of the backend is %(backend_pool)s.") %
+ {'vdisk_pool': vdisk['mdisk_grp_name'],
+ 'backend_pool':
+ self.configuration.storwize_svc_volpool_name})
+ raise exception.ManageExistingVolumeTypeMismatch(reason=msg)
self._helpers.rename_vdisk(vdisk['name'], volume['name'])
def manage_existing_get_size(self, volume, ref):
"""Return size of an existing Vdisk for manage_existing.
existing_ref is a dictionary of the form:
- {'source-id': <uid of disk>}
+ {'source-id': <uid of disk>} or
+ {'source-name': <name of the disk>}
Optional elements are:
'manage_if_in_use': True/False (default is False)
"""
# Check that the reference is valid
- if 'source-id' not in ref:
- reason = _('Reference must contain source-id element.')
- raise exception.ManageExistingInvalidReference(existing_ref=ref,
- reason=reason)
-
- # Check for existence of the vdisk
- vdisk = self._helpers.vdisk_by_uid(ref['source-id'])
- if vdisk is None:
- reason = (_('No vdisk with the UID specified by source-id %s.')
- % (ref['source-id']))
- raise exception.ManageExistingInvalidReference(existing_ref=ref,
- reason=reason)
+ vdisk = self._manage_input_check(ref)
# Check if the disk is in use, if we need to.
manage_if_in_use = ref.get('manage_if_in_use', False)
data.update(self.replication.get_replication_info())
self._stats = data
+
+ def _manage_input_check(self, ref):
+ """Verify the input of manage function."""
+ # Check that the reference is valid
+ if 'source-name' in ref:
+ manage_source = ref['source-name']
+ vdisk = self._helpers.get_vdisk_attributes(manage_source)
+ elif 'source-id' in ref:
+ manage_source = ref['source-id']
+ vdisk = self._helpers.vdisk_by_uid(manage_source)
+ else:
+ reason = _('Reference must contain source-id or '
+ 'source-name element.')
+ raise exception.ManageExistingInvalidReference(existing_ref=ref,
+ reason=reason)
+
+ if vdisk is None:
+ reason = (_('No vdisk with the UID specified by ref %s.')
+ % manage_source)
+ raise exception.ManageExistingInvalidReference(existing_ref=ref,
+ reason=reason)
+ return vdisk