FAKE_HTTP_OPENER = urllib.request.build_opener()
+NO_RECORDS_RESPONSE = etree.XML("""
+ <results status="passed">
+ <num-records>0</num-records>
+ </results>
+""")
+
GET_OPERATIONAL_NETWORK_INTERFACE_ADDRESSES_RESPONSE = etree.XML("""
<results status="passed">
<num-records>2</num-records>
</results>
""" % {"address1": "1.2.3.4", "address2": "99.98.97.96"})
+QOS_POLICY_GROUP_GET_ITER_RESPONSE = etree.XML("""
+ <results status="passed">
+ <attributes-list>
+ <qos-policy-group-info>
+ <max-throughput>30KB/S</max-throughput>
+ <num-workloads>1</num-workloads>
+ <pgid>53</pgid>
+ <policy-group>fake_qos_policy_group_name</policy-group>
+ <policy-group-class>user_defined</policy-group-class>
+ <uuid>12496028-b641-11e5-abbd-123478563412</uuid>
+ <vserver>cinder-iscsi</vserver>
+ </qos-policy-group-info>
+ </attributes-list>
+ <num-records>1</num-records>
+ </results>
+""")
+
VOLUME_LIST_INFO_RESPONSE = etree.XML("""
<results status="passed">
<volumes>
def tearDown(self):
super(NetAppCmodeClientTestCase, self).tearDown()
+ def test_has_records(self):
+
+ result = self.client._has_records(netapp_api.NaElement(
+ fake_client.QOS_POLICY_GROUP_GET_ITER_RESPONSE))
+
+ self.assertTrue(result)
+
+ def test_has_records_not_found(self):
+
+ result = self.client._has_records(
+ netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE))
+
+ self.assertFalse(result)
+
def test_get_iscsi_target_details_no_targets(self):
response = netapp_api.NaElement(
etree.XML("""<results status="passed">
self.assertEqual(0, self.connection.qos_policy_group_create.call_count)
- def test_provision_qos_policy_group_with_qos_spec(self):
+ def test_provision_qos_policy_group_with_qos_spec_create(self):
+ self.mock_object(self.client,
+ 'qos_policy_group_exists',
+ mock.Mock(return_value=False))
self.mock_object(self.client, 'qos_policy_group_create')
+ self.mock_object(self.client, 'qos_policy_group_modify')
self.client.provision_qos_policy_group(fake.QOS_POLICY_GROUP_INFO)
self.client.qos_policy_group_create.assert_has_calls([
mock.call(fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT)])
+ self.assertFalse(self.client.qos_policy_group_modify.called)
+
+ def test_provision_qos_policy_group_with_qos_spec_modify(self):
+
+ self.mock_object(self.client,
+ 'qos_policy_group_exists',
+ mock.Mock(return_value=True))
+ self.mock_object(self.client, 'qos_policy_group_create')
+ self.mock_object(self.client, 'qos_policy_group_modify')
+
+ self.client.provision_qos_policy_group(fake.QOS_POLICY_GROUP_INFO)
+
+ self.assertFalse(self.client.qos_policy_group_create.called)
+ self.client.qos_policy_group_modify.assert_has_calls([
+ mock.call(fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT)])
+
+ def test_qos_policy_group_exists(self):
+
+ self.mock_send_request.return_value = netapp_api.NaElement(
+ fake_client.QOS_POLICY_GROUP_GET_ITER_RESPONSE)
+
+ result = self.client.qos_policy_group_exists(
+ fake.QOS_POLICY_GROUP_NAME)
+
+ api_args = {
+ 'query': {
+ 'qos-policy-group-info': {
+ 'policy-group': fake.QOS_POLICY_GROUP_NAME,
+ },
+ },
+ 'desired-attributes': {
+ 'qos-policy-group-info': {
+ 'policy-group': None,
+ },
+ },
+ }
+ self.mock_send_request.assert_has_calls([
+ mock.call('qos-policy-group-get-iter', api_args, False)])
+ self.assertTrue(result)
+
+ def test_qos_policy_group_exists_not_found(self):
+
+ self.mock_send_request.return_value = netapp_api.NaElement(
+ fake_client.NO_RECORDS_RESPONSE)
+
+ result = self.client.qos_policy_group_exists(
+ fake.QOS_POLICY_GROUP_NAME)
+
+ self.assertFalse(result)
def test_qos_policy_group_create(self):
self.mock_send_request.assert_has_calls([
mock.call('qos-policy-group-create', api_args, False)])
+ def test_qos_policy_group_modify(self):
+
+ api_args = {
+ 'policy-group': fake.QOS_POLICY_GROUP_NAME,
+ 'max-throughput': fake.MAX_THROUGHPUT,
+ }
+
+ self.client.qos_policy_group_modify(
+ fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT)
+
+ self.mock_send_request.assert_has_calls([
+ mock.call('qos-policy-group-modify', api_args, False)])
+
def test_qos_policy_group_delete(self):
api_args = {
'id': CLONE_DESTINATION_ID,
}
+SNAPSHOT_NAME = 'fake_snapshot_name'
+SNAPSHOT_LUN_HANDLE = 'fake_snapshot_lun_handle'
+
SNAPSHOT = {
- 'name': 'fake_snapshot_name',
+ 'name': SNAPSHOT_NAME,
'volume_size': SIZE,
'volume_id': 'fake_volume_id',
}
pools = self.library._get_pool_stats()
self.assertListEqual([], pools)
+
+ def test_delete_volume(self):
+ self.library.vol_refresh_voluntary = False
+ mock_super_delete_volume = self.mock_object(
+ block_base.NetAppBlockStorageLibrary, 'delete_volume')
+
+ self.library.delete_volume(fake.VOLUME)
+
+ mock_super_delete_volume.assert_called_once_with(fake.VOLUME)
+ self.assertTrue(self.library.vol_refresh_voluntary)
+
+ def test_delete_snapshot(self):
+ self.library.vol_refresh_voluntary = False
+ mock_super_delete_snapshot = self.mock_object(
+ block_base.NetAppBlockStorageLibrary, 'delete_snapshot')
+
+ self.library.delete_snapshot(fake.SNAPSHOT)
+
+ mock_super_delete_snapshot.assert_called_once_with(fake.SNAPSHOT)
+ self.assertTrue(self.library.vol_refresh_voluntary)
import mock
from oslo_utils import units
+import six
from cinder import exception
from cinder.i18n import _
['lun1'])
def test_delete_volume(self):
+ mock_delete_lun = self.mock_object(self.library, '_delete_lun')
+
+ self.library.delete_volume(fake.VOLUME)
+
+ mock_delete_lun.assert_called_once_with(fake.LUN_NAME)
+
+ def test_delete_lun(self):
mock_get_lun_attr = self.mock_object(self.library, '_get_lun_attr')
mock_get_lun_attr.return_value = fake.LUN_METADATA
self.library.zapi_client = mock.Mock()
self.library.lun_table = fake.LUN_TABLE
- self.library.delete_volume(fake.VOLUME)
+ self.library._delete_lun(fake.LUN_NAME)
mock_get_lun_attr.assert_called_once_with(
fake.LUN_NAME, 'metadata')
self.library.zapi_client.destroy_lun.assert_called_once_with(fake.PATH)
- def test_delete_volume_no_metadata(self):
+ def test_delete_lun_no_metadata(self):
self.mock_object(self.library, '_get_lun_attr', mock.Mock(
return_value=None))
self.library.zapi_client = mock.Mock()
self.mock_object(self.library, 'zapi_client')
- self.library.delete_volume(fake.VOLUME)
+ self.library._delete_lun(fake.LUN_NAME)
self.library._get_lun_attr.assert_called_once_with(
fake.LUN_NAME, 'metadata')
self.zapi_client.
mark_qos_policy_group_for_deletion.call_count)
+ def test_delete_snapshot(self):
+ mock_delete_lun = self.mock_object(self.library, '_delete_lun')
+
+ self.library.delete_snapshot(fake.SNAPSHOT)
+
+ mock_delete_lun.assert_called_once_with(fake.SNAPSHOT_NAME)
+
def test_clone_source_to_destination(self):
self.mock_object(na_utils, 'get_volume_extra_specs', mock.Mock(
return_value=fake.EXTRA_SPECS))
self.mock_object(self.library, '_setup_qos_for_volume', mock.Mock(
return_value=fake.QOS_POLICY_GROUP_INFO))
self.mock_object(self.library, '_clone_lun')
- self.mock_object(self.library, 'extend_volume')
+ self.mock_object(self.library, '_extend_volume')
self.mock_object(self.library, 'delete_volume')
self.mock_object(self.library, '_mark_qos_policy_group_for_deletion')
self.library.lun_space_reservation = 'false'
fake.CLONE_SOURCE_NAME, fake.CLONE_DESTINATION_NAME,
space_reserved='false',
qos_policy_group_name=fake.QOS_POLICY_GROUP_NAME)
- self.library.extend_volume.assert_called_once_with(
+ self.library._extend_volume.assert_called_once_with(
fake.CLONE_DESTINATION, fake.CLONE_DESTINATION_SIZE,
- qos_policy_group_name=fake.QOS_POLICY_GROUP_NAME)
+ fake.QOS_POLICY_GROUP_NAME)
self.assertEqual(0, self.library.delete_volume.call_count)
self.assertEqual(0, self.library.
_mark_qos_policy_group_for_deletion.call_count)
self.mock_object(self.library, '_setup_qos_for_volume', mock.Mock(
return_value=fake.QOS_POLICY_GROUP_INFO))
self.mock_object(self.library, '_clone_lun')
- self.mock_object(self.library, 'extend_volume', mock.Mock(
+ self.mock_object(self.library, '_extend_volume', mock.Mock(
side_effect=Exception))
self.mock_object(self.library, 'delete_volume')
self.mock_object(self.library, '_mark_qos_policy_group_for_deletion')
fake.CLONE_SOURCE_NAME, fake.CLONE_DESTINATION_NAME,
space_reserved='true',
qos_policy_group_name=fake.QOS_POLICY_GROUP_NAME)
- self.library.extend_volume.assert_called_once_with(
+ self.library._extend_volume.assert_called_once_with(
fake.CLONE_DESTINATION, fake.CLONE_DESTINATION_SIZE,
- qos_policy_group_name=fake.QOS_POLICY_GROUP_NAME)
+ fake.QOS_POLICY_GROUP_NAME)
self.assertEqual(1, self.library.delete_volume.call_count)
self.assertEqual(1, self.library.
_mark_qos_policy_group_for_deletion.call_count)
mock_do_clone.assert_has_calls([
mock.call(source, fake.VOLUME)])
+
+ def test_extend_volume(self):
+
+ new_size = 100
+ volume_copy = copy.copy(fake.VOLUME)
+ volume_copy['size'] = new_size
+
+ mock_get_volume_extra_specs = self.mock_object(
+ na_utils, 'get_volume_extra_specs',
+ mock.Mock(return_value=fake.EXTRA_SPECS))
+ mock_setup_qos_for_volume = self.mock_object(
+ self.library, '_setup_qos_for_volume',
+ mock.Mock(return_value=fake.QOS_POLICY_GROUP_INFO))
+ mock_extend_volume = self.mock_object(self.library, '_extend_volume')
+
+ self.library.extend_volume(fake.VOLUME, new_size)
+
+ mock_get_volume_extra_specs.assert_called_once_with(fake.VOLUME)
+ mock_setup_qos_for_volume.assert_called_once_with(volume_copy,
+ fake.EXTRA_SPECS)
+ mock_extend_volume.assert_called_once_with(fake.VOLUME,
+ new_size,
+ fake.QOS_POLICY_GROUP_NAME)
+
+ def test_extend_volume_api_error(self):
+
+ new_size = 100
+ volume_copy = copy.copy(fake.VOLUME)
+ volume_copy['size'] = new_size
+
+ mock_get_volume_extra_specs = self.mock_object(
+ na_utils, 'get_volume_extra_specs',
+ mock.Mock(return_value=fake.EXTRA_SPECS))
+ mock_setup_qos_for_volume = self.mock_object(
+ self.library, '_setup_qos_for_volume',
+ mock.Mock(return_value=fake.QOS_POLICY_GROUP_INFO))
+ mock_extend_volume = self.mock_object(
+ self.library, '_extend_volume',
+ mock.Mock(side_effect=netapp_api.NaApiError))
+
+ self.assertRaises(netapp_api.NaApiError,
+ self.library.extend_volume,
+ fake.VOLUME,
+ new_size)
+
+ mock_get_volume_extra_specs.assert_called_once_with(fake.VOLUME)
+ mock_setup_qos_for_volume.assert_has_calls([
+ mock.call(volume_copy, fake.EXTRA_SPECS),
+ mock.call(fake.VOLUME, fake.EXTRA_SPECS)])
+ mock_extend_volume.assert_called_once_with(
+ fake.VOLUME, new_size, fake.QOS_POLICY_GROUP_NAME)
+
+ def test__extend_volume_direct(self):
+
+ current_size = fake.LUN_SIZE
+ current_size_bytes = current_size * units.Gi
+ new_size = fake.LUN_SIZE * 2
+ new_size_bytes = new_size * units.Gi
+ max_size = fake.LUN_SIZE * 10
+ max_size_bytes = max_size * units.Gi
+ fake_volume = copy.copy(fake.VOLUME)
+ fake_volume['size'] = new_size
+
+ fake_lun = block_base.NetAppLun(fake.LUN_HANDLE,
+ fake.LUN_ID,
+ current_size_bytes,
+ fake.LUN_METADATA)
+ mock_get_lun_from_table = self.mock_object(
+ self.library, '_get_lun_from_table',
+ mock.Mock(return_value=fake_lun))
+ fake_lun_geometry = {'max_resize': six.text_type(max_size_bytes)}
+ mock_get_lun_geometry = self.mock_object(
+ self.library.zapi_client, 'get_lun_geometry',
+ mock.Mock(return_value=fake_lun_geometry))
+ mock_do_direct_resize = self.mock_object(self.library.zapi_client,
+ 'do_direct_resize')
+ mock_do_sub_clone_resize = self.mock_object(self.library,
+ '_do_sub_clone_resize')
+ self.library.lun_table = {fake.VOLUME['name']: fake_lun}
+
+ self.library._extend_volume(fake.VOLUME, new_size, 'fake_qos_policy')
+
+ mock_get_lun_from_table.assert_called_once_with(fake.VOLUME['name'])
+ mock_get_lun_geometry.assert_called_once_with(
+ fake.LUN_METADATA['Path'])
+ mock_do_direct_resize.assert_called_once_with(
+ fake.LUN_METADATA['Path'], six.text_type(new_size_bytes))
+ self.assertFalse(mock_do_sub_clone_resize.called)
+ self.assertEqual(six.text_type(new_size_bytes),
+ self.library.lun_table[fake.VOLUME['name']].size)
+
+ def test__extend_volume_clone(self):
+
+ current_size = fake.LUN_SIZE
+ current_size_bytes = current_size * units.Gi
+ new_size = fake.LUN_SIZE * 20
+ new_size_bytes = new_size * units.Gi
+ max_size = fake.LUN_SIZE * 10
+ max_size_bytes = max_size * units.Gi
+ fake_volume = copy.copy(fake.VOLUME)
+ fake_volume['size'] = new_size
+
+ fake_lun = block_base.NetAppLun(fake.LUN_HANDLE,
+ fake.LUN_ID,
+ current_size_bytes,
+ fake.LUN_METADATA)
+ mock_get_lun_from_table = self.mock_object(
+ self.library, '_get_lun_from_table',
+ mock.Mock(return_value=fake_lun))
+ fake_lun_geometry = {'max_resize': six.text_type(max_size_bytes)}
+ mock_get_lun_geometry = self.mock_object(
+ self.library.zapi_client, 'get_lun_geometry',
+ mock.Mock(return_value=fake_lun_geometry))
+ mock_do_direct_resize = self.mock_object(self.library.zapi_client,
+ 'do_direct_resize')
+ mock_do_sub_clone_resize = self.mock_object(self.library,
+ '_do_sub_clone_resize')
+ self.library.lun_table = {fake.VOLUME['name']: fake_lun}
+
+ self.library._extend_volume(fake.VOLUME, new_size, 'fake_qos_policy')
+
+ mock_get_lun_from_table.assert_called_once_with(fake.VOLUME['name'])
+ mock_get_lun_geometry.assert_called_once_with(
+ fake.LUN_METADATA['Path'])
+ self.assertFalse(mock_do_direct_resize.called)
+ mock_do_sub_clone_resize.assert_called_once_with(
+ fake.LUN_METADATA['Path'], six.text_type(new_size_bytes),
+ qos_policy_group_name='fake_qos_policy')
+ self.assertEqual(six.text_type(new_size_bytes),
+ self.library.lun_table[fake.VOLUME['name']].size)
+
+ def test__extend_volume_no_change(self):
+
+ current_size = fake.LUN_SIZE
+ current_size_bytes = current_size * units.Gi
+ new_size = fake.LUN_SIZE
+ max_size = fake.LUN_SIZE * 10
+ max_size_bytes = max_size * units.Gi
+ fake_volume = copy.copy(fake.VOLUME)
+ fake_volume['size'] = new_size
+
+ fake_lun = block_base.NetAppLun(fake.LUN_HANDLE,
+ fake.LUN_ID,
+ current_size_bytes,
+ fake.LUN_METADATA)
+ mock_get_lun_from_table = self.mock_object(
+ self.library, '_get_lun_from_table',
+ mock.Mock(return_value=fake_lun))
+ fake_lun_geometry = {'max_resize': six.text_type(max_size_bytes)}
+ mock_get_lun_geometry = self.mock_object(
+ self.library.zapi_client, 'get_lun_geometry',
+ mock.Mock(return_value=fake_lun_geometry))
+ mock_do_direct_resize = self.mock_object(self.library.zapi_client,
+ 'do_direct_resize')
+ mock_do_sub_clone_resize = self.mock_object(self.library,
+ '_do_sub_clone_resize')
+ self.library.lun_table = {fake_volume['name']: fake_lun}
+
+ self.library._extend_volume(fake_volume, new_size, 'fake_qos_policy')
+
+ mock_get_lun_from_table.assert_called_once_with(fake_volume['name'])
+ self.assertFalse(mock_get_lun_geometry.called)
+ self.assertFalse(mock_do_direct_resize.called)
+ self.assertFalse(mock_do_sub_clone_resize.called)
self.library.ssc_vols = None
self.fake_lun = block_base.NetAppLun(fake.LUN_HANDLE, fake.LUN_NAME,
fake.SIZE, None)
+ self.fake_snapshot_lun = block_base.NetAppLun(
+ fake.SNAPSHOT_LUN_HANDLE, fake.SNAPSHOT_NAME, fake.SIZE, None)
self.mock_object(self.library, 'lun_table')
- self.library.lun_table = {fake.LUN_NAME: self.fake_lun}
+ self.library.lun_table = {
+ fake.LUN_NAME: self.fake_lun,
+ fake.SNAPSHOT_NAME: self.fake_snapshot_lun,
+ }
self.mock_object(block_base.NetAppBlockStorageLibrary, 'delete_volume')
def tearDown(self):
.assert_called_once_with(None)
self.assertEqual(1, self.library._update_stale_vols.call_count)
+ def test_delete_snapshot(self):
+ self.mock_object(block_base.NetAppLun, 'get_metadata_property',
+ mock.Mock(return_value=fake.NETAPP_VOLUME))
+ mock_super_delete_snapshot = self.mock_object(
+ block_base.NetAppBlockStorageLibrary, 'delete_snapshot')
+ mock_update_stale_vols = self.mock_object(self.library,
+ '_update_stale_vols')
+ self.library.delete_snapshot(fake.SNAPSHOT)
+
+ mock_super_delete_snapshot.assert_called_once_with(fake.SNAPSHOT)
+ self.assertTrue(mock_update_stale_vols.called)
+
+ def test_delete_snapshot_no_netapp_vol(self):
+ self.mock_object(block_base.NetAppLun, 'get_metadata_property',
+ mock.Mock(return_value=None))
+ mock_super_delete_snapshot = self.mock_object(
+ block_base.NetAppBlockStorageLibrary, 'delete_snapshot')
+ mock_update_stale_vols = self.mock_object(self.library,
+ '_update_stale_vols')
+ self.library.delete_snapshot(fake.SNAPSHOT)
+
+ mock_super_delete_snapshot.assert_called_once_with(fake.SNAPSHOT)
+ self.assertFalse(mock_update_stale_vols.called)
+
def test_setup_qos_for_volume(self):
self.mock_object(na_utils, 'get_valid_qos_policy_group_info',
mock.Mock(
from cinder import test
from cinder.tests.unit.volume.drivers.netapp.dataontap import fakes as fake
from cinder import utils
+from cinder.volume.drivers.netapp.dataontap.client import api as netapp_api
from cinder.volume.drivers.netapp.dataontap import nfs_base
from cinder.volume.drivers.netapp import utils as na_utils
from cinder.volume.drivers import nfs
self.assertEqual(expected, result)
+ def test_extend_volume(self):
+
+ new_size = 100
+ volume_copy = copy.copy(fake.VOLUME)
+ volume_copy['size'] = new_size
+
+ path = '%s/%s' % (fake.NFS_SHARE, fake.NFS_VOLUME['name'])
+ self.mock_object(self.driver,
+ 'local_path',
+ mock.Mock(return_value=path))
+ mock_resize_image_file = self.mock_object(self.driver,
+ '_resize_image_file')
+ mock_get_volume_extra_specs = self.mock_object(
+ na_utils, 'get_volume_extra_specs',
+ mock.Mock(return_value=fake.EXTRA_SPECS))
+ mock_do_qos_for_volume = self.mock_object(self.driver,
+ '_do_qos_for_volume')
+
+ self.driver.extend_volume(fake.VOLUME, new_size)
+
+ mock_resize_image_file.assert_called_once_with(path, new_size)
+ mock_get_volume_extra_specs.assert_called_once_with(fake.VOLUME)
+ mock_do_qos_for_volume.assert_called_once_with(volume_copy,
+ fake.EXTRA_SPECS,
+ cleanup=False)
+
+ def test_extend_volume_resize_error(self):
+
+ new_size = 100
+ volume_copy = copy.copy(fake.VOLUME)
+ volume_copy['size'] = new_size
+
+ path = '%s/%s' % (fake.NFS_SHARE, fake.NFS_VOLUME['name'])
+ self.mock_object(self.driver,
+ 'local_path',
+ mock.Mock(return_value=path))
+ mock_resize_image_file = self.mock_object(
+ self.driver, '_resize_image_file',
+ mock.Mock(side_effect=netapp_api.NaApiError))
+ mock_get_volume_extra_specs = self.mock_object(
+ na_utils, 'get_volume_extra_specs',
+ mock.Mock(return_value=fake.EXTRA_SPECS))
+ mock_do_qos_for_volume = self.mock_object(self.driver,
+ '_do_qos_for_volume')
+
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.driver.extend_volume,
+ fake.VOLUME,
+ new_size)
+
+ mock_resize_image_file.assert_called_once_with(path, new_size)
+ self.assertFalse(mock_get_volume_extra_specs.called)
+ self.assertFalse(mock_do_qos_for_volume.called)
+
+ def test_extend_volume_qos_error(self):
+
+ new_size = 100
+ volume_copy = copy.copy(fake.VOLUME)
+ volume_copy['size'] = new_size
+
+ path = '%s/%s' % (fake.NFS_SHARE, fake.NFS_VOLUME['name'])
+ self.mock_object(self.driver,
+ 'local_path',
+ mock.Mock(return_value=path))
+ mock_resize_image_file = self.mock_object(self.driver,
+ '_resize_image_file')
+ mock_get_volume_extra_specs = self.mock_object(
+ na_utils, 'get_volume_extra_specs',
+ mock.Mock(return_value=fake.EXTRA_SPECS))
+ mock_do_qos_for_volume = self.mock_object(
+ self.driver, '_do_qos_for_volume',
+ mock.Mock(side_effect=netapp_api.NaApiError))
+
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.driver.extend_volume,
+ fake.VOLUME,
+ new_size)
+
+ mock_resize_image_file.assert_called_once_with(path, new_size)
+ mock_get_volume_extra_specs.assert_called_once_with(fake.VOLUME)
+ mock_do_qos_for_volume.assert_called_once_with(volume_copy,
+ fake.EXTRA_SPECS,
+ cleanup=False)
+
def test_is_share_clone_compatible(self):
self.assertRaises(NotImplementedError,
self.driver._is_share_clone_compatible,
'volume_type_id': VOLUME_TYPE_ID,
}
+SNAPSHOT_NAME = 'fake_snapshot_name'
+SNAPSHOT_ID = 'fake_snapshot_id'
+
+SNAPSHOT = {
+ 'name': SNAPSHOT_NAME,
+ 'id': SNAPSHOT_ID,
+ 'volume_id': VOLUME_ID,
+ 'volume_name': VOLUME_NAME,
+ 'volume_size': 42,
+}
QOS_SPECS = {}
self.assertEqual(expected, result)
+ def test_map_qos_spec_maxiopspergib(self):
+ qos_spec = {'maxIOPSperGiB': 1000}
+ mock_get_name = self.mock_object(na_utils, 'get_qos_policy_group_name')
+ mock_get_name.return_value = 'fake_qos_policy'
+ expected = {
+ 'policy_name': 'fake_qos_policy',
+ 'max_throughput': '42000iops',
+ }
+
+ result = na_utils.map_qos_spec(qos_spec, fake.VOLUME)
+
+ self.assertEqual(expected, result)
+
def test_map_qos_spec_maxbps(self):
qos_spec = {'maxBPS': 1000000}
mock_get_name = self.mock_object(na_utils, 'get_qos_policy_group_name')
self.assertEqual(expected, result)
+ def test_map_qos_spec_maxbpspergib(self):
+ qos_spec = {'maxBPSperGiB': 100000}
+ mock_get_name = self.mock_object(na_utils, 'get_qos_policy_group_name')
+ mock_get_name.return_value = 'fake_qos_policy'
+ expected = {
+ 'policy_name': 'fake_qos_policy',
+ 'max_throughput': '4200000B/s',
+ }
+
+ result = na_utils.map_qos_spec(qos_spec, fake.VOLUME)
+
+ self.assertEqual(expected, result)
+
def test_map_qos_spec_no_key_present(self):
qos_spec = {}
mock_get_name = self.mock_object(na_utils, 'get_qos_policy_group_name')
self.vol_refresh_voluntary = True
LOG.debug('Deleted LUN with name %s', volume['name'])
+ def delete_snapshot(self, snapshot):
+ """Driver entry point for deleting a snapshot."""
+ super(NetAppBlockStorage7modeLibrary, self).delete_snapshot(snapshot)
+ self.vol_refresh_voluntary = True
+
def _is_lun_valid_on_storage(self, lun):
"""Validate LUN specific to storage system."""
if self.volume_list:
Volume driver library for NetApp 7/C-mode block storage systems.
"""
+import copy
import math
import sys
import uuid
def delete_volume(self, volume):
"""Driver entry point for destroying existing volumes."""
- name = volume['name']
- metadata = self._get_lun_attr(name, 'metadata')
- if not metadata:
+ self._delete_lun(volume['name'])
+
+ def _delete_lun(self, lun_name):
+ """Helper method to delete LUN backing a volume or snapshot."""
+
+ metadata = self._get_lun_attr(lun_name, 'metadata')
+ if metadata:
+ self.zapi_client.destroy_lun(metadata['Path'])
+ self.lun_table.pop(lun_name)
+ else:
LOG.warning(_LW("No entry in LUN table for volume/snapshot"
- " %(name)s."), {'name': name})
- return
- self.zapi_client.destroy_lun(metadata['Path'])
- self.lun_table.pop(name)
+ " %(name)s."), {'name': lun_name})
def ensure_export(self, context, volume):
"""Driver entry point to get the export info for an existing volume."""
def delete_snapshot(self, snapshot):
"""Driver entry point for deleting a snapshot."""
- self.delete_volume(snapshot)
+ self._delete_lun(snapshot['name'])
LOG.debug("Snapshot %s deletion successful", snapshot['name'])
def create_volume_from_snapshot(self, volume, snapshot):
if destination_size != source_size:
try:
- self.extend_volume(
- destination_volume, destination_size,
- qos_policy_group_name=qos_policy_group_name)
+ self._extend_volume(destination_volume,
+ destination_size,
+ qos_policy_group_name)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(
def _update_volume_stats(self):
raise NotImplementedError()
- def extend_volume(self, volume, new_size, qos_policy_group_name=None):
+ def extend_volume(self, volume, new_size):
+ """Driver entry point to increase the size of a volume."""
+
+ extra_specs = na_utils.get_volume_extra_specs(volume)
+
+ # Create volume copy with new size for size-dependent QOS specs
+ volume_copy = copy.copy(volume)
+ volume_copy['size'] = new_size
+
+ qos_policy_group_info = self._setup_qos_for_volume(volume_copy,
+ extra_specs)
+ qos_policy_group_name = (
+ na_utils.get_qos_policy_group_name_from_info(
+ qos_policy_group_info))
+
+ try:
+ self._extend_volume(volume, new_size, qos_policy_group_name)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ # If anything went wrong, revert QoS settings
+ self._setup_qos_for_volume(volume, extra_specs)
+
+ def _extend_volume(self, volume, new_size, qos_policy_group_name):
"""Extend an existing volume to the new size."""
name = volume['name']
lun = self._get_lun_from_table(name)
msg = 'Deleted LUN with name %(name)s and QoS info %(qos)s'
LOG.debug(msg, {'name': volume['name'], 'qos': qos_policy_group_info})
+ def delete_snapshot(self, snapshot):
+ """Driver entry point for deleting a snapshot."""
+ lun = self.lun_table.get(snapshot['name'])
+ netapp_vol = lun.get_metadata_property('Volume') if lun else None
+
+ super(NetAppBlockStorageCmodeLibrary, self).delete_snapshot(snapshot)
+
+ if netapp_vol:
+ self._update_stale_vols(
+ volume=ssc_cmode.NetAppVolume(netapp_vol, self.vserver))
+
def _check_volume_type_for_lun(self, volume, lun, existing_ref,
extra_specs):
"""Check if LUN satisfies volume type."""
result = server.invoke_successfully(na_element, True)
return result
+ def _has_records(self, api_result_element):
+ num_records = api_result_element.get_child_content('num-records')
+ return bool(num_records and '0' != num_records)
+
def set_vserver(self, vserver):
self.connection.set_vserver(vserver)
spec = qos_policy_group_info.get('spec')
if spec is not None:
- self.qos_policy_group_create(spec['policy_name'],
- spec['max_throughput'])
+ if not self.qos_policy_group_exists(spec['policy_name']):
+ self.qos_policy_group_create(spec['policy_name'],
+ spec['max_throughput'])
+ else:
+ self.qos_policy_group_modify(spec['policy_name'],
+ spec['max_throughput'])
+
+ def qos_policy_group_exists(self, qos_policy_group_name):
+ """Checks if a QOS policy group exists."""
+ api_args = {
+ 'query': {
+ 'qos-policy-group-info': {
+ 'policy-group': qos_policy_group_name,
+ },
+ },
+ 'desired-attributes': {
+ 'qos-policy-group-info': {
+ 'policy-group': None,
+ },
+ },
+ }
+ result = self.send_request('qos-policy-group-get-iter',
+ api_args,
+ False)
+ return self._has_records(result)
def qos_policy_group_create(self, qos_policy_group_name, max_throughput):
"""Creates a QOS policy group."""
}
return self.send_request('qos-policy-group-create', api_args, False)
- def qos_policy_group_delete(self, qos_policy_group_name):
- """Attempts to delete a QOS policy group."""
+ def qos_policy_group_modify(self, qos_policy_group_name, max_throughput):
+ """Modifies a QOS policy group."""
api_args = {
'policy-group': qos_policy_group_name,
+ 'max-throughput': max_throughput,
}
+ return self.send_request('qos-policy-group-modify', api_args, False)
+
+ def qos_policy_group_delete(self, qos_policy_group_name):
+ """Attempts to delete a QOS policy group."""
+ api_args = {'policy-group': qos_policy_group_name}
return self.send_request('qos-policy-group-delete', api_args, False)
def qos_policy_group_rename(self, qos_policy_group_name, new_name):
Volume driver for NetApp NFS storage.
"""
+import copy
import math
import os
import re
def extend_volume(self, volume, new_size):
"""Extend an existing volume to the new size."""
+
LOG.info(_LI('Extending volume %s.'), volume['name'])
- path = self.local_path(volume)
- self._resize_image_file(path, new_size)
+
+ try:
+ path = self.local_path(volume)
+ self._resize_image_file(path, new_size)
+ except Exception as err:
+ exception_msg = (_("Failed to extend volume "
+ "%(name)s, Error msg: %(msg)s.") %
+ {'name': volume['name'],
+ 'msg': six.text_type(err)})
+ raise exception.VolumeBackendAPIException(data=exception_msg)
+
+ try:
+ extra_specs = na_utils.get_volume_extra_specs(volume)
+
+ # Create volume copy with new size for size-dependent QOS specs
+ volume_copy = copy.copy(volume)
+ volume_copy['size'] = new_size
+
+ self._do_qos_for_volume(volume_copy, extra_specs, cleanup=False)
+ except Exception as err:
+ exception_msg = (_("Failed to set QoS for existing volume "
+ "%(name)s, Error msg: %(msg)s.") %
+ {'name': volume['name'],
+ 'msg': six.text_type(err)})
+ raise exception.VolumeBackendAPIException(data=exception_msg)
def _is_share_clone_compatible(self, volume, share):
"""Checks if share is compatible with volume to host its clone."""
'netapp_nodedup': 'netapp_dedup',
'netapp_nocompression': 'netapp_compression',
'netapp_thick_provisioned': 'netapp_thin_provisioned'}
-QOS_KEYS = frozenset(['maxIOPS', 'maxBPS'])
+QOS_KEYS = frozenset(['maxIOPS', 'maxIOPSperGiB', 'maxBPS', 'maxBPSperGiB'])
BACKEND_QOS_CONSUMERS = frozenset(['back-end', 'both'])
"""Map Cinder QOS spec to limit/throughput-value as used in client API."""
if qos_spec is None:
return None
+
qos_spec = map_dict_to_lower(qos_spec)
spec = dict(policy_name=get_qos_policy_group_name(volume),
max_throughput=None)
- # IOPS and BPS specifications are exclusive of one another.
+
+ # QoS specs are exclusive of one another.
if 'maxiops' in qos_spec:
spec['max_throughput'] = '%siops' % qos_spec['maxiops']
+ elif 'maxiopspergib' in qos_spec:
+ spec['max_throughput'] = '%siops' % six.text_type(
+ int(qos_spec['maxiopspergib']) * int(volume['size']))
elif 'maxbps' in qos_spec:
spec['max_throughput'] = '%sB/s' % qos_spec['maxbps']
+ elif 'maxbpspergib' in qos_spec:
+ spec['max_throughput'] = '%sB/s' % six.text_type(
+ int(qos_spec['maxbpspergib']) * int(volume['size']))
+
return spec