# Sasikanth Eda <sasikanth.eda@in.ibm.com>
"""
-Tests for the IBM NAS family (SONAS, Storwize V7000 Unified).
+Tests for the IBM NAS family (SONAS, Storwize V7000 Unified,
+NAS based IBM GPFS Storage Systems).
"""
import mock
from cinder import context
from cinder import exception
from cinder.openstack.common import log as logging
+from cinder.openstack.common import units
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.ibm import ibmnas
'nas_ssh_port': 22,
'nas_password': 'pass',
'nas_private_key': 'nas.key',
+ 'ibmnas_platform_type': 'v7ku',
'nfs_shares_config': None,
'nfs_sparsed_volumes': True,
'nfs_used_ratio': 0.95,
self._set_flag('nas_password', None)
self._set_flag('nas_private_key', None)
+ self.assertRaises(exception.InvalidInput,
+ self._driver.check_for_setup_error)
+ self._set_flag('ibmnas_platform_type', None)
self.assertRaises(exception.InvalidInput,
self._driver.check_for_setup_error)
self.assertEqual(self.TEST_NFS_EXPORT.split(':')[1],
mock.drv._get_export_path(volume['id']))
- def test_create_ibmnas_snap_mount_point_provided(self):
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ensure_shares_mounted')
+ def test_update_volume_stats(self, mock_ensure):
+ """Check update volume stats."""
+
+ drv = self._driver
+ mock_ensure.return_value = True
+ fake_avail = 80 * units.Gi
+ fake_size = 2 * fake_avail
+ fake_used = 10 * units.Gi
+
+ with mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_capacity_info',
+ return_value=(fake_avail, fake_size, fake_used)):
+ stats = drv.get_volume_stats()
+ self.assertEqual(stats['volume_backend_name'], 'IBMNAS_NFS')
+ self.assertEqual(stats['storage_protocol'], 'nfs')
+ self.assertEqual(stats['driver_version'], '1.1.0')
+ self.assertEqual(stats['vendor_name'], 'IBM')
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver._run_ssh')
+ def test_ssh_operation(self, mock_ssh):
+
+ drv = self._driver
+ mock_ssh.return_value = None
+
+ self.assertEqual(None, drv._ssh_operation('ssh_cmd'))
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver._run_ssh')
+ def test_ssh_operation_exception(self, mock_ssh):
+
+ drv = self._driver
+ mock_ssh.side_effect = (
+ exception.VolumeBackendAPIException(data='Failed'))
+
+ self.assertRaises(exception.VolumeBackendAPIException,
+ drv._ssh_operation, 'ssh_cmd')
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ @mock.patch('cinder.openstack.common.processutils.execute')
+ def test_create_ibmnas_snap_mount_point_provided(self, mock_ssh,
+ mock_execute):
"""Create ibmnas snap if mount point is provided."""
drv = self._driver
- mock = self._mock
+ mock_ssh.return_value = True
+ mock_execute.return_value = True
+
+ self.assertEqual(None, drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH,
+ self.TEST_MNT_POINT))
- drv._create_ibmnas_snap = mock.drv._run_ssh.return_value.\
- drv._execute.return_value.drv._create_ibmnas_snap
- drv._create_ibmnas_snap.return_value = True
- self.assertEqual(True, mock.drv._run_ssh().
- drv._execute().
- drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
- self.TEST_SNAP_PATH,
- self.TEST_MNT_POINT))
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ @mock.patch('cinder.openstack.common.processutils.execute')
+ def test_create_ibmnas_snap_nas_gpfs(self, mock_ssh, mock_execute):
+ """Create ibmnas snap if mount point is provided."""
+
+ drv = self._driver
+ drv.configuration.platform = 'gpfs-nas'
+ mock_ssh.return_value = True
+ mock_execute.return_value = True
+
+ self.assertEqual(None, drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH,
+ self.TEST_MNT_POINT))
- def test_create_ibmnas_snap_no_mount_point_provided(self):
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ def test_create_ibmnas_snap_no_mount_point_provided(self, mock_ssh):
"""Create ibmnas snap if no mount point is provided."""
drv = self._driver
- mock = self._mock
+ mock_ssh.return_value = True
+
+ self.assertEqual(None, drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH,
+ None))
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ def test_create_ibmnas_snap_nas_gpfs_no_mount(self, mock_ssh):
+ """Create ibmnas snap (gpfs-nas) if mount point is provided."""
- drv._create_ibmnas_snap = mock.drv._run_ssh.return_value.\
- drv._execute.return_value.drv._create_ibmnas_snap
- drv._create_ibmnas_snap.return_value = None
- self.assertIsNone(mock.drv._run_ssh().
- drv._execute().
- drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
- self.TEST_SNAP_PATH,
- None))
+ drv = self._driver
+ drv.configuration.platform = 'gpfs-nas'
+ mock_ssh.return_value = True
+
+ drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH, None)
- def test_create_ibmnas_copy(self):
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ def test_create_ibmnas_copy(self, mock_ssh):
"""Create ibmnas copy test case."""
drv = self._driver
- mock = self._mock
+ TEST_DEST_SNAP = '/export/snapshot-123.snap'
+ TEST_DEST_PATH = '/export/snapshot-123'
+ mock_ssh.return_value = True
+
+ drv._create_ibmnas_copy(self.TEST_VOLUME_PATH,
+ TEST_DEST_PATH,
+ TEST_DEST_SNAP)
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_ssh_operation')
+ def test_create_ibmnas_copy_nas_gpfs(self, mock_ssh):
+ """Create ibmnas copy for gpfs-nas platform test case."""
+ drv = self._driver
TEST_DEST_SNAP = '/export/snapshot-123.snap'
TEST_DEST_PATH = '/export/snapshot-123'
+ drv.configuration.platform = 'gpfs-nas'
+ mock_ssh.return_value = True
- drv._create_ibmnas_copy = mock.drv._run_ssh.return_value.\
- drv._create_ibmnas_copy
- drv._create_ibmnas_copy.return_value = None
- self.assertIsNone(mock.drv._run_ssh().
- drv._create_ibmnas_copy(
- self.TEST_VOLUME_PATH,
- TEST_DEST_PATH,
- TEST_DEST_SNAP))
+ drv._create_ibmnas_copy(self.TEST_VOLUME_PATH,
+ TEST_DEST_PATH,
+ TEST_DEST_SNAP)
- def test_resize_volume_file(self):
+ @mock.patch('cinder.image.image_utils.resize_image')
+ def test_resize_volume_file(self, mock_size):
"""Resize volume file test case."""
drv = self._driver
- mock = self._mock
+ mock_size.return_value = True
- drv._resize_volume_file = mock.image_utils.resize_image.return_value.\
- drv._resize_volume_file
- drv._resize_volume_file.return_value = True
- self.assertEqual(True, mock.image_utils.resize_image().
- drv._resize_volume_file(
- self.TEST_LOCAL_PATH,
- self.TEST_EXTEND_SIZE_IN_GB))
+ self.assertEqual(True, drv._resize_volume_file(self.TEST_LOCAL_PATH,
+ self.TEST_EXTEND_SIZE_IN_GB))
+
+ @mock.patch('cinder.image.image_utils.resize_image')
+ def test_resize_volume_exception(self, mock_size):
+ """Resize volume file test case."""
- def test_extend_volume(self):
+ drv = self._driver
+ mock_size.side_effect = (
+ exception.VolumeBackendAPIException(data='Failed'))
+
+ self.assertRaises(exception.VolumeBackendAPIException,
+ drv._resize_volume_file,
+ self.TEST_LOCAL_PATH,
+ self.TEST_EXTEND_SIZE_IN_GB)
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.local_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_resize_volume_file')
+ def test_extend_volume(self, mock_resize, mock_local):
"""Extend volume to greater size test case."""
drv = self._driver
- mock = self._mock
+ mock_resize.return_value = True
+ mock_local.return_value = self.TEST_LOCAL_PATH
+ volume = FakeEnv()
+ volume['name'] = 'vol-123'
- drv.extend_volume = mock.drv.local_path.return_value.\
- drv._resize_volume_file.return_value.\
- drv.extend_volume
- drv.extend_volume.return_value = None
- self.assertIsNone(mock.drv.local_path().
- drv._resize_volume_file().
- drv.extend_volume(
- self.TEST_LOCAL_PATH,
- self.TEST_EXTEND_SIZE_IN_GB))
+ drv.extend_volume(volume,
+ self.TEST_EXTEND_SIZE_IN_GB)
- def test_delete_snapfiles(self):
- """Delete_snapfiles assert test case."""
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver._run_ssh')
+ def test_delete_snapfiles(self, mock_ssh):
+ """Delete_snapfiles test case."""
drv = self._driver
- mock = self._mock
+ mock_ssh.return_value = ('Parent Depth Parent inode'
+ 'File name\n yes 0 /ibm/gpfs0/gshare/\n'
+ 'volume-123\n EFSSG1000I The command'
+ 'completed successfully.', '')
+
+ drv._delete_snapfiles(self.TEST_VOLUME_PATH,
+ self.TEST_MNT_POINT)
- drv._delete_snapfiles = mock.drv._run_ssh.return_value.\
- drv._execute.return_value.\
- drv._delete_snapfiles
- drv._delete_snapfiles.return_value = None
- self.assertIsNone(mock.drv._run_ssh().
- drv._execute().
- drv._delete_snapfiles(
- self.TEST_VOLUME_PATH,
- self.TEST_MNT_POINT))
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver._run_ssh')
+ def test_delete_snapfiles_nas_gpfs(self, mock_ssh):
+ """Delete_snapfiles for gpfs-nas platform test case."""
+
+ drv = self._driver
+ drv.configuration.platform = 'gpfs-nas'
+ mock_ssh.return_value = ('Parent Depth Parent inode'
+ 'File name\n'
+ '------ ----- -------------'
+ '- ---------\n'
+ 'yes 0\n'
+ '/ibm/gpfs0/gshare/volume-123', '')
+
+ drv._delete_snapfiles(self.TEST_VOLUME_PATH,
+ self.TEST_MNT_POINT)
def test_delete_volume_no_provider_location(self):
"""Delete volume with no provider location specified."""
result = drv.delete_volume(volume)
self.assertIsNone(result)
- def test_delete_volume(self):
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_export_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_delete_snapfiles')
+ def test_delete_volume(self, mock_export, mock_snap):
"""Delete volume test case."""
drv = self._driver
- mock = self._mock
+ mock_export.return_value = self.TEST_VOLUME_PATH
+ mock_snap.return_value = True
volume = FakeEnv()
volume['id'] = '123'
- volume['provider_location'] = self.TEST_NFS_EXPORT
-
- drv.delete_volume = mock.drv._get_export_path.return_value.\
- drv._delete_snapfiles.return_value.drv.delete_volume
- drv.delete_volume.return_value = True
- self.assertEqual(True, mock.drv._get_export_path(volume['id']).
- drv._delete_snapfiles(
- self.TEST_VOLUME_PATH,
- self.TEST_MNT_POINT).
- drv.delete_volume(volume))
-
- def test_create_snapshot(self):
+ volume['name'] = '/volume-123'
+ volume['provider_location'] = self.TEST_VOLUME_PATH
+
+ self.assertEqual(None, drv.delete_volume(volume))
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_export_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_provider_location')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_mount_point_for_share')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_create_ibmnas_snap')
+ def test_create_snapshot(self, mock_export,
+ mock_provider,
+ mock_mount,
+ mock_snap):
"""Create snapshot simple test case."""
drv = self._driver
- mock = self._mock
+ mock_export.return_value = self.TEST_LOCAL_PATH
+ mock_provider.return_value = self.TEST_VOLUME_PATH
+ mock_mount.return_value = self.TEST_MNT_POINT
+ mock_snap.return_value = True
volume = FakeEnv()
volume['id'] = '123'
snapshot = FakeEnv()
snapshot['volume_id'] = volume['id']
- snapshot['volume_name'] = 'volume-123'
- snapshot.name = 'snapshot-123'
-
- drv.create_snapshot = mock.drv._get_export_path.return_value.\
- drv._get_provider_location.return_value.\
- drv._get_mount_point_for_share.return_value.\
- drv._create_ibmnas_snap.return_value.\
- drv.create_snapshot
- drv.create_snapshot.return_value = None
- self.assertIsNone(mock.drv._get_export_path(snapshot['volume_id']).
- drv._get_provider_location(snapshot['volume_id']).
- drv._get_mount_point_for_share(self.TEST_NFS_EXPORT).
- drv._create_ibmnas_snap(
- src=self.TEST_VOLUME_PATH,
- dest=self.TEST_SNAP_PATH,
- mount_path=self.TEST_MNT_POINT).
- drv.create_snapshot(snapshot))
-
- def test_delete_snapshot(self):
+ snapshot['volume_name'] = '/volume-123'
+ snapshot['name'] = '/snapshot-123'
+
+ drv.create_snapshot(snapshot)
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_provider_location')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_mount_point_for_share')
+ @mock.patch('cinder.openstack.common.processutils.execute')
+ def test_delete_snapshot(self, mock_mount, mock_provider, mock_execute):
"""Delete snapshot simple test case."""
drv = self._driver
- mock = self._mock
+ mock_mount.return_value = self.TEST_LOCAL_PATH
+ mock_provider.return_value = self.TEST_VOLUME_PATH
+ mock_execute.return_value = True
volume = FakeEnv()
volume['id'] = '123'
snapshot['volume_name'] = 'volume-123'
snapshot['name'] = 'snapshot-123'
- drv.delete_snapshot = mock.drv._get_provider_location.return_value.\
- drv._get_mount_point_for_share.return_value.drv._execute.\
- return_value.drv.delete_snapshot
- drv.delete_snapshot.return_value = None
- self.assertIsNone(mock.drv._get_provider_location(volume['id']).
- drv._get_mount_point_for_share(self.TEST_NFS_EXPORT).
- drv._execute().
- drv.delete_snapshot(snapshot))
-
- def test_create_cloned_volume(self):
+ drv.delete_snapshot(snapshot)
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_export_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_create_ibmnas_copy')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_resize_volume_file')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.local_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_find_share')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_set_rw_permissions_for_all')
+ def test_create_cloned_volume(self, mock_export, mock_copy,
+ mock_resize, mock_local,
+ mock_find, mock_rw):
"""Clone volume with equal size test case."""
drv = self._driver
- mock = self._mock
+ mock_export.return_value = self.TEST_VOLUME_PATH
+ mock_copy.return_value = self.TEST_LOCAL_PATH
volume_src = FakeEnv()
volume_src['id'] = '123'
- volume_src['name'] = 'volume-123'
+ volume_src['name'] = '/volume-123'
volume_src.size = self.TEST_SIZE_IN_GB
volume_dest = FakeEnv()
volume_dest['id'] = '456'
- volume_dest['name'] = 'volume-456'
+ volume_dest['name'] = '/volume-456'
volume_dest['size'] = self.TEST_SIZE_IN_GB
volume_dest.size = self.TEST_SIZE_IN_GB
- drv.create_cloned_volume = mock.drv._get_export_path.\
- return_value.drv._create_ibmnas_copy.return_value.\
- drv._find_share.return_value.\
- drv._set_rw_permissions_for_all.return_value.\
- drv._resize_volume_file.return_value.\
- drv.create_cloned_volume
- drv.create_cloned_volume.return_value = self.TEST_NFS_EXPORT
- self.assertEqual(self.TEST_NFS_EXPORT,
- mock.drv._get_export_path(volume_src['id']).
- drv._create_ibmnas_copy().
- drv._find_share().
- drv._set_rw_permissions_for_all().
- drv._resize_volume_file().
- drv.create_cloned_volume(
- volume_dest,
- volume_src))
-
- def test_create_volume_from_snapshot(self):
+ self.assertEqual({'provider_location': self.TEST_LOCAL_PATH},
+ drv.create_cloned_volume(volume_dest, volume_src))
+
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_get_export_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_create_ibmnas_snap')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_resize_volume_file')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.local_path')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_find_share')
+ @mock.patch('cinder.volume.drivers.ibm.ibmnas.IBMNAS_NFSDriver.'
+ '_set_rw_permissions_for_all')
+ def test_create_volume_from_snapshot(self, mock_export, mock_snap,
+ mock_resize, mock_local,
+ mock_find, mock_rw):
"""Create volume from snapshot test case."""
drv = self._driver
- mock = self._mock
+ mock_export.return_value = '/export'
+ mock_snap.return_value = self.TEST_LOCAL_PATH
+ mock_local.return_value = self.TEST_VOLUME_PATH
+ mock_find.return_value = self.TEST_LOCAL_PATH
volume = FakeEnv()
volume['id'] = '123'
- volume['name'] = 'volume-123'
+ volume['name'] = '/volume-123'
volume['size'] = self.TEST_SIZE_IN_GB
snapshot = FakeEnv()
snapshot['volume_id'] = volume['id']
snapshot['volume_name'] = 'volume-123'
snapshot['volume_size'] = self.TEST_SIZE_IN_GB
- snapshot.name = 'snapshot-123'
-
- drv.create_volume_from_snapshot = mock.drv._get_export_path.\
- return_value.drv._create_ibmnas_snap.return_value.\
- drv._find_share.return_value.\
- drv._set_rw_permissions_for_all.return_value.\
- drv._resize_volume_file.return_value.\
- drv.create_volume_from_snapshot
- drv.create_volume_from_snapshot.return_value = self.TEST_NFS_EXPORT
- self.assertEqual(self.TEST_NFS_EXPORT,
- mock.drv._get_export_path(volume['id']).
- drv._create_ibmnas_snap().
- drv._find_share().
- drv._set_rw_permissions_for_all().
- drv._resize_volume_file().
- drv.create_volume_from_snapshot(snapshot))
+ snapshot.name = '/snapshot-123'
+
+ self.assertEqual({'provider_location': self.TEST_LOCAL_PATH},
+ drv.create_volume_from_snapshot(volume, snapshot))
-# Copyright 2013 IBM Corp.
+# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# Authors:
# Nilesh Bhosale <nilesh.bhosale@in.ibm.com>
# Sasikanth Eda <sasikanth.eda@in.ibm.com>
+
"""
IBM NAS Volume Driver.
Currently, it supports the following IBM Storage Systems:
1. IBM Scale Out NAS (SONAS)
2. IBM Storwize V7000 Unified
+3. NAS based IBM GPFS Storage Systems
Notes:
1. If you specify both a password and a key file, this driver will use the
from cinder.volume.drivers.nfs import nas_opts
from cinder.volume.drivers.san import san
-VERSION = '1.0.0'
+VERSION = '1.1.0'
LOG = logging.getLogger(__name__)
+
+platform_opts = [
+ cfg.StrOpt('ibmnas_platform_type',
+ default='v7ku',
+ help=('IBMNAS platform type to be used as backend storage; '
+ 'valid values are - '
+ 'v7ku : for using IBM Storwize V7000 Unified, '
+ 'sonas : for using IBM Scale Out NAS, '
+ 'gpfs-nas : for using NFS based IBM GPFS deployments.')),
+]
+
CONF = cfg.CONF
+CONF.register_opts(platform_opts)
class IBMNAS_NFSDriver(nfs.NfsDriver, san.SanDriver):
- """IBM NAS NFS based cinder driver.
+ """IBMNAS NFS based cinder driver.
Creates file on NFS share for using it as block device on hypervisor.
Version history:
1.0.0 - Initial driver
+ 1.1.0 - Support for NFS based GPFS storage backend
"""
driver_volume_type = 'nfs'
self._context = None
super(IBMNAS_NFSDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(nas_opts)
+ self.configuration.append_config_values(platform_opts)
self.configuration.san_ip = self.configuration.nas_ip
self.configuration.san_login = self.configuration.nas_login
self.configuration.san_password = self.configuration.nas_password
self.configuration.san_private_key = \
self.configuration.nas_private_key
self.configuration.san_ssh_port = self.configuration.nas_ssh_port
+ self.configuration.ibmnas_platform_type = \
+ self.configuration.ibmnas_platform_type.lower()
+ LOG.info(_('Initialized driver for IBMNAS Platform: %s.'),
+ self.configuration.ibmnas_platform_type)
def set_execute(self, execute):
self._execute = utils.execute
def check_for_setup_error(self):
"""Ensure that the flags are set properly."""
- required_flags = ['nas_ip', 'nas_ssh_port', 'nas_login']
+ required_flags = ['nas_ip', 'nas_ssh_port', 'nas_login',
+ 'ibmnas_platform_type']
+ valid_platforms = ['v7ku', 'sonas', 'gpfs-nas']
for flag in required_flags:
if not self.configuration.safe_get(flag):
'authentication: set either nas_password or '
'nas_private_key option'))
+ # Ensure whether ibmnas platform type is set to appropriate value
+ if self.configuration.ibmnas_platform_type not in valid_platforms:
+ raise exception.InvalidInput(
+ reason = (_("Unsupported ibmnas_platform_type: %(given)s."
+ " Supported platforms: %(valid)s")
+ % {'given': self.configuration.ibmnas_platform_type,
+ 'valid': (', '.join(valid_platforms))}))
+
def _get_provider_location(self, volume_id):
"""Returns provider location for given volume."""
LOG.debug("Enter _get_provider_location: volume_id %s" % volume_id)
self._stats = data
LOG.debug("Exit _update_volume_stats")
+ def _ssh_operation(self, ssh_cmd):
+ try:
+ self._run_ssh(ssh_cmd)
+ except processutils.ProcessExecutionError as e:
+ msg = (_('Failed in _ssh_operation while execution of ssh_cmd:'
+ '%(cmd)s. Error: %(error)s') % {'cmd': ssh_cmd, 'error': e})
+ LOG.exception(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
def _create_ibmnas_snap(self, src, dest, mount_path):
"""Create volume clones and snapshots."""
LOG.debug("Enter _create_ibmnas_snap: src %(src)s, dest %(dest)s"
% {'src': src, 'dest': dest})
- if mount_path is not None:
- tmp_file_path = dest + '.snap'
- ssh_cmd = ['mkclone', '-p', dest, '-s', src, '-t', tmp_file_path]
- try:
- self._run_ssh(ssh_cmd)
- except processutils.ProcessExecutionError as e:
- msg = (_("Failed in _create_ibmnas_snap during "
- "create_snapshot. Error: %s") % e.stderr)
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
-
- #Now remove the tmp file
- tmp_file_local_path = os.path.join(mount_path,
- os.path.basename(tmp_file_path))
- self._execute('rm', '-f', tmp_file_local_path, run_as_root=True)
+ if self.configuration.ibmnas_platform_type == 'gpfs-nas':
+ ssh_cmd = ['mmclone', 'snap', src, dest]
+ self._ssh_operation(ssh_cmd)
else:
- ssh_cmd = ['mkclone', '-s', src, '-t', dest]
- try:
- self._run_ssh(ssh_cmd)
- except processutils.ProcessExecutionError as e:
- msg = (_("Failed in _create_ibmnas_snap during "
- "create_volume_from_snapshot. Error: %s") % e.stderr)
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
+ if mount_path is not None:
+ tmp_file_path = dest + '.snap'
+ ssh_cmd = ['mkclone', '-p', dest, '-s', src, '-t',
+ tmp_file_path]
+ try:
+ self._ssh_operation(ssh_cmd)
+ finally:
+ # Now remove the tmp file
+ tmp_file_local_path = os.path.join(mount_path, os.path.
+ basename(tmp_file_path))
+ self._execute('rm', '-f', tmp_file_local_path,
+ run_as_root=True)
+ else:
+ ssh_cmd = ['mkclone', '-s', src, '-t', dest]
+ self._ssh_operation(ssh_cmd)
LOG.debug("Exit _create_ibmnas_snap")
def _create_ibmnas_copy(self, src, dest, snap):
'snap %(snap)s' % {'src': src,
'dest': dest,
'snap': snap})
- ssh_cmd = ['mkclone', '-p', snap, '-s', src, '-t', dest]
- try:
- self._run_ssh(ssh_cmd)
- except processutils.ProcessExecutionError as e:
- msg = (_("Failed in _create_ibmnas_copy. Error: %s") % e.stderr)
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
+ if self.configuration.ibmnas_platform_type == 'gpfs-nas':
+ ssh_cmd = ['mmclone', 'snap', src, snap]
+ self._ssh_operation(ssh_cmd)
+ ssh_cmd = ['mmclone', 'copy', snap, dest]
+ self._ssh_operation(ssh_cmd)
+ else:
+ ssh_cmd = ['mkclone', '-p', snap, '-s', src, '-t', dest]
+ self._ssh_operation(ssh_cmd)
LOG.debug("Exit _create_ibmnas_copy")
def _resize_volume_file(self, path, new_size):
"""Resize the image file on share to new size."""
- LOG.info(_('Resizing file to %sG'), new_size)
+ LOG.debug("Resizing file to %sG." % new_size)
try:
image_utils.resize_image(path, new_size, run_as_root=True)
except processutils.ProcessExecutionError as e:
def extend_volume(self, volume, new_size):
"""Extend an existing volume to the new size."""
- LOG.info(_('Extending volume %s.'), volume['name'])
+ LOG.debug("Extending volume %s" % volume['name'])
path = self.local_path(volume)
self._resize_volume_file(path, new_size)
'mount_point %(mount_point)s'
% {'fchild': fchild,
'mount_point': mount_point})
- ssh_cmd = ['lsclone', fchild]
+ if self.configuration.ibmnas_platform_type == 'gpfs-nas':
+ ssh_cmd = ['mmclone', 'show', fchild]
+ else:
+ ssh_cmd = ['lsclone', fchild]
try:
(out, _err) = self._run_ssh(ssh_cmd, check_exit_code=False)
except processutils.ProcessExecutionError as e:
export_path = self._get_export_path(snapshot['volume_id'])
snapshot_path = os.path.join(export_path, snapshot.name)
volume_path = os.path.join(export_path, volume['name'])
- self._create_ibmnas_snap(snapshot_path, volume_path, None)
+
+ if self.configuration.ibmnas_platform_type == 'gpfs-nas':
+ ssh_cmd = ['mmclone', 'copy', snapshot_path, volume_path]
+ self._ssh_operation(ssh_cmd)
+ else:
+ self._create_ibmnas_snap(snapshot_path, volume_path, None)
volume['provider_location'] = self._find_share(volume['size'])
volume_path = self.local_path(volume)
self._set_rw_permissions_for_all(volume_path)
- #Extend the volume if required
+ # Extend the volume if required
self._resize_volume_file(volume_path, volume['size'])
return {'provider_location': volume['provider_location']}
self._set_rw_permissions_for_all(volume_path)
- #Extend the volume if required
+ # Extend the volume if required
self._resize_volume_file(volume_path, volume['size'])
return {'provider_location': volume['provider_location']}