--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# Authors:
+# Nilesh Bhosale <nilesh.bhosale@in.ibm.com>
+# Sasikanth Eda <sasikanth.eda@in.ibm.com>
+
+"""
+Tests for the IBM NAS family (SONAS, Storwize V7000 Unified).
+"""
+
+import mock
+
+from oslo.config import cfg
+
+from cinder import context
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import test
+from cinder.volume import configuration as conf
+from cinder.volume.drivers.ibm import ibmnas
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+
+
+class FakeEnv(object):
+ fields = {}
+
+ def __setitem__(self, key, value):
+ self.fields[key] = value
+
+ def __getitem__(self, item):
+ return self.fields[item]
+
+
+class IBMNASDriverTestCase(test.TestCase):
+
+ TEST_NFS_EXPORT = 'nfs-host1:/export'
+ TEST_SIZE_IN_GB = 1
+ TEST_EXTEND_SIZE_IN_GB = 2
+ TEST_MNT_POINT = '/mnt/nfs'
+ TEST_MNT_POINT_BASE = '/mnt'
+ TEST_LOCAL_PATH = '/mnt/nfs/volume-123'
+ TEST_VOLUME_PATH = '/export/volume-123'
+ TEST_SNAP_PATH = '/export/snapshot-123'
+
+ def setUp(self):
+ super(IBMNASDriverTestCase, self).setUp()
+ self._driver = ibmnas.IBMNAS_NFSDriver(configuration=
+ conf.Configuration(None))
+ self._mock = mock.Mock()
+ self._def_flags = {'nas_ip': 'hostname',
+ 'nas_login': 'user',
+ 'nas_ssh_port': 22,
+ 'nas_password': 'pass',
+ 'nas_private_key': 'nas.key',
+ 'nfs_shares_config': None,
+ 'nfs_sparsed_volumes': True,
+ 'nfs_used_ratio': 0.95,
+ 'nfs_oversub_ratio': 1.0,
+ 'nfs_mount_point_base':
+ self.TEST_MNT_POINT_BASE,
+ 'nfs_mount_options': None}
+
+ self.context = context.get_admin_context()
+ self.context.user_id = 'fake'
+ self.context.project_id = 'fake'
+
+ def tearDown(self):
+ super(IBMNASDriverTestCase, self).tearDown()
+
+ def _set_flag(self, flag, value):
+ group = self._driver.configuration.config_group
+ self._driver.configuration.set_override(flag, value, group)
+
+ def _reset_flags(self):
+ self._driver.configuration.local_conf.reset()
+ for k, v in self._def_flags.iteritems():
+ self._set_flag(k, v)
+
+ def test_check_for_setup_error(self):
+ """Check setup with bad parameters."""
+
+ drv = self._driver
+
+ required_flags = [
+ 'nas_ip',
+ 'nas_login',
+ 'nas_ssh_port']
+
+ for flag in required_flags:
+ self._set_flag(flag, None)
+ self.assertRaises(exception.CinderException,
+ drv.check_for_setup_error)
+
+ self._set_flag('nas_password', None)
+ self._set_flag('nas_private_key', None)
+ self.assertRaises(exception.InvalidInput,
+ self._driver.check_for_setup_error)
+
+ self._reset_flags()
+
+ def test_get_provider_location(self):
+ """Check provider location for given volume id."""
+
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+
+ mock.drv._get_provider_location.return_value = self.TEST_NFS_EXPORT
+ self.assertEqual(self.TEST_NFS_EXPORT,
+ mock.drv._get_provider_location(volume['id']))
+
+ def test_get_export_path(self):
+ """Check export path for the given volume."""
+
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+
+ mock.drv._get_export_path.return_value = self.TEST_NFS_EXPORT.\
+ split(':')[1]
+ self.assertEqual(self.TEST_NFS_EXPORT.split(':')[1],
+ mock.drv._get_export_path(volume['id']))
+
+ def test_create_ibmnas_snap_mount_point_provided(self):
+ """Create ibmnas snap if mount point is provided."""
+
+ drv = self._driver
+ mock = self._mock
+
+ drv._create_ibmnas_snap = mock.drv._run_ssh.return_value.\
+ drv._execute.return_value.drv._create_ibmnas_snap
+ drv._create_ibmnas_snap.return_value = True
+ self.assertEqual(True, mock.drv._run_ssh().
+ drv._execute().
+ drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH,
+ self.TEST_MNT_POINT))
+
+ def test_create_ibmnas_snap_no_mount_point_provided(self):
+ """Create ibmnas snap if no mount point is provided."""
+
+ drv = self._driver
+ mock = self._mock
+
+ drv._create_ibmnas_snap = mock.drv._run_ssh.return_value.\
+ drv._execute.return_value.drv._create_ibmnas_snap
+ drv._create_ibmnas_snap.return_value = None
+ self.assertEqual(None, mock.drv._run_ssh().
+ drv._execute().
+ drv._create_ibmnas_snap(self.TEST_VOLUME_PATH,
+ self.TEST_SNAP_PATH,
+ None))
+
+ def test_create_ibmnas_copy(self):
+ """Create ibmnas copy test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ TEST_DEST_SNAP = '/export/snapshot-123.snap'
+ TEST_DEST_PATH = '/export/snapshot-123'
+
+ drv._create_ibmnas_copy = mock.drv._run_ssh.return_value.\
+ drv._create_ibmnas_copy
+ drv._create_ibmnas_copy.return_value = None
+ self.assertEqual(None, mock.drv._run_ssh().
+ drv._create_ibmnas_copy(
+ self.TEST_VOLUME_PATH,
+ TEST_DEST_PATH,
+ TEST_DEST_SNAP))
+
+ def test_resize_volume_file(self):
+ """Resize volume file test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ drv._resize_volume_file = mock.image_utils.resize_image.return_value.\
+ drv._resize_volume_file
+ drv._resize_volume_file.return_value = True
+ self.assertEqual(True, mock.image_utils.resize_image().
+ drv._resize_volume_file(
+ self.TEST_LOCAL_PATH,
+ self.TEST_EXTEND_SIZE_IN_GB))
+
+ def test_extend_volume(self):
+ """Extend volume to greater size test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ drv.extend_volume = mock.drv.local_path.return_value.\
+ drv._resize_volume_file.return_value.\
+ drv.extend_volume
+ drv.extend_volume.return_value = None
+ self.assertEqual(None, mock.drv.local_path().
+ drv._resize_volume_file().
+ drv.extend_volume(
+ self.TEST_LOCAL_PATH,
+ self.TEST_EXTEND_SIZE_IN_GB))
+
+ def test_delete_snapfiles(self):
+ """Delete_snapfiles assert test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ drv._delete_snapfiles = mock.drv._run_ssh.return_value.\
+ drv._execute.return_value.\
+ drv._delete_snapfiles
+ drv._delete_snapfiles.return_value = None
+ self.assertEqual(None, mock.drv._run_ssh().
+ drv._execute().
+ drv._delete_snapfiles(
+ self.TEST_VOLUME_PATH,
+ self.TEST_MNT_POINT))
+
+ def test_delete_volume_no_provider_location(self):
+ """Delete volume with no provider location specified."""
+
+ drv = self._driver
+
+ volume = FakeEnv()
+ volume['name'] = 'volume-123'
+ volume['provider_location'] = None
+
+ result = drv.delete_volume(volume)
+ self.assertEqual(None, result)
+
+ def test_delete_volume(self):
+ """Delete volume test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+ volume['provider_location'] = self.TEST_NFS_EXPORT
+
+ drv.delete_volume = mock.drv._get_export_path.return_value.\
+ drv._delete_snapfiles.return_value.drv.delete_volume
+ drv.delete_volume.return_value = True
+ self.assertEqual(True, mock.drv._get_export_path(volume['id']).
+ drv._delete_snapfiles(
+ self.TEST_VOLUME_PATH,
+ self.TEST_MNT_POINT).
+ drv.delete_volume(volume))
+
+ def test_create_snapshot(self):
+ """Create snapshot simple test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+ volume['name'] = 'volume-123'
+
+ snapshot = FakeEnv()
+ snapshot['volume_id'] = volume['id']
+ snapshot['volume_name'] = 'volume-123'
+ snapshot.name = 'snapshot-123'
+
+ drv.create_snapshot = mock.drv._get_export_path.return_value.\
+ drv._get_provider_location.return_value.\
+ drv._get_mount_point_for_share.return_value.\
+ drv._create_ibmnas_snap.return_value.\
+ drv.create_snapshot
+ drv.create_snapshot.return_value = None
+ self.assertEqual(None,
+ mock.drv._get_export_path(snapshot['volume_id']).
+ drv._get_provider_location(snapshot['volume_id']).
+ drv._get_mount_point_for_share(self.TEST_NFS_EXPORT).
+ drv._create_ibmnas_snap(
+ src=self.TEST_VOLUME_PATH,
+ dest=self.TEST_SNAP_PATH,
+ mount_path=self.TEST_MNT_POINT).
+ drv.create_snapshot(snapshot))
+
+ def test_delete_snapshot(self):
+ """Delete snapshot simple test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+ volume['provider_location'] = self.TEST_NFS_EXPORT
+
+ snapshot = FakeEnv()
+ snapshot['volume_id'] = volume['id']
+ snapshot['volume_name'] = 'volume-123'
+ snapshot['name'] = 'snapshot-123'
+
+ drv.delete_snapshot = mock.drv._get_provider_location.return_value.\
+ drv._get_mount_point_for_share.return_value.drv._execute.\
+ return_value.drv.delete_snapshot
+ drv.delete_snapshot.return_value = None
+ self.assertEqual(None, mock.drv._get_provider_location(volume['id']).
+ drv._get_mount_point_for_share(self.TEST_NFS_EXPORT).
+ drv._execute().
+ drv.delete_snapshot(snapshot))
+
+ def test_create_cloned_volume(self):
+ """Clone volume with equal size test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ volume_src = FakeEnv()
+ volume_src['id'] = '123'
+ volume_src['name'] = 'volume-123'
+ volume_src.size = self.TEST_SIZE_IN_GB
+
+ volume_dest = FakeEnv()
+ volume_dest['id'] = '456'
+ volume_dest['name'] = 'volume-456'
+ volume_dest['size'] = self.TEST_SIZE_IN_GB
+ volume_dest.size = self.TEST_SIZE_IN_GB
+
+ drv.create_cloned_volume = mock.drv._get_export_path.\
+ return_value.drv._create_ibmnas_copy.return_value.\
+ drv._find_share.return_value.\
+ drv._set_rw_permissions_for_all.return_value.\
+ drv._resize_volume_file.return_value.\
+ drv.create_cloned_volume
+ drv.create_cloned_volume.return_value = self.TEST_NFS_EXPORT
+ self.assertEqual(self.TEST_NFS_EXPORT,
+ mock.drv._get_export_path(volume_src['id']).
+ drv._create_ibmnas_copy().
+ drv._find_share().
+ drv._set_rw_permissions_for_all().
+ drv._resize_volume_file().
+ drv.create_cloned_volume(
+ volume_dest,
+ volume_src))
+
+ def test_create_volume_from_snapshot(self):
+ """Create volume from snapshot test case."""
+
+ drv = self._driver
+ mock = self._mock
+
+ volume = FakeEnv()
+ volume['id'] = '123'
+ volume['name'] = 'volume-123'
+ volume['size'] = self.TEST_SIZE_IN_GB
+
+ snapshot = FakeEnv()
+ snapshot['volume_id'] = volume['id']
+ snapshot['volume_name'] = 'volume-123'
+ snapshot['volume_size'] = self.TEST_SIZE_IN_GB
+ snapshot.name = 'snapshot-123'
+
+ drv.create_volume_from_snapshot = mock.drv._get_export_path.\
+ return_value.drv._create_ibmnas_snap.return_value.\
+ drv._find_share.return_value.\
+ drv._set_rw_permissions_for_all.return_value.\
+ drv._resize_volume_file.return_value.\
+ drv.create_volume_from_snapshot
+ drv.create_volume_from_snapshot.return_value = self.TEST_NFS_EXPORT
+ self.assertEqual(self.TEST_NFS_EXPORT,
+ mock.drv._get_export_path(volume['id']).
+ drv._create_ibmnas_snap().
+ drv._find_share().
+ drv._set_rw_permissions_for_all().
+ drv._resize_volume_file().
+ drv.create_volume_from_snapshot(snapshot))
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# Authors:
+# Nilesh Bhosale <nilesh.bhosale@in.ibm.com>
+# Sasikanth Eda <sasikanth.eda@in.ibm.com>
+"""
+IBM NAS Volume Driver.
+Currently, it supports the following IBM Storage Systems:
+1. IBM Scale Out NAS (SONAS)
+2. IBM Storwize V7000 Unified
+
+Notes:
+1. If you specify both a password and a key file, this driver will use the
+ key file only.
+2. When using a key file for authentication, it is up to the user or
+ system administrator to store the private key in a safe manner.
+"""
+
+import os
+import re
+
+from oslo.config import cfg
+
+from cinder import exception
+from cinder.image import image_utils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import processutils
+from cinder import units
+from cinder import utils
+from cinder.volume.drivers import nfs
+from cinder.volume.drivers.san import san
+
+VERSION = '1.0.0'
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class IBMNAS_NFSDriver(nfs.NfsDriver, san.SanDriver):
+ """IBM NAS NFS based cinder driver.
+
+ Creates file on NFS share for using it as block device on hypervisor.
+ Version history:
+ 1.0.0 - Initial driver
+ """
+
+ driver_volume_type = 'nfs'
+ VERSION = VERSION
+
+ def __init__(self, execute=utils.execute, *args, **kwargs):
+ self._context = None
+ super(IBMNAS_NFSDriver, self).__init__(*args, **kwargs)
+ self.configuration.san_ip = self.configuration.nas_ip
+ self.configuration.san_login = self.configuration.nas_login
+ self.configuration.san_password = self.configuration.nas_password
+ self.configuration.san_private_key = \
+ self.configuration.nas_private_key
+ self.configuration.san_ssh_port = self.configuration.nas_ssh_port
+
+ def set_execute(self, execute):
+ self._execute = utils.execute
+
+ def do_setup(self, context):
+ """Any initialization the volume driver does while starting."""
+ super(IBMNAS_NFSDriver, self).do_setup(context)
+ self._context = context
+
+ def check_for_setup_error(self):
+ """Ensure that the flags are set properly."""
+ required_flags = ['nas_ip', 'nas_ssh_port', 'nas_login']
+
+ for flag in required_flags:
+ if not self.configuration.safe_get(flag):
+ raise exception.InvalidInput(reason=_('%s is not set') % flag)
+
+ # Ensure that either password or keyfile were set
+ if not (self.configuration.nas_password or
+ self.configuration.nas_private_key):
+ raise exception.InvalidInput(
+ reason=_('Password or SSH private key is required for '
+ 'authentication: set either nas_password or '
+ 'nas_private_key option'))
+
+ def _get_provider_location(self, volume_id):
+ """Returns provider location for given volume."""
+ LOG.debug(_("Enter _get_provider_location: volume_id %s") % volume_id)
+ volume = self.db.volume_get(self._context, volume_id)
+ LOG.debug("Exit _get_provider_location")
+ return volume['provider_location']
+
+ def _get_export_path(self, volume_id):
+ """Returns NFS export path for the given volume."""
+ LOG.debug(_("Enter _get_export_path: volume_id %s") % volume_id)
+ return self._get_provider_location(volume_id).split(':')[1]
+
+ def _update_volume_stats(self):
+ """Retrieve stats info from volume group."""
+
+ LOG.debug(_("Enter _update_volume_stats"))
+ data = {}
+ backend_name = self.configuration.safe_get('volume_backend_name')
+ data['volume_backend_name'] = backend_name or 'IBMNAS_NFS'
+ data['vendor_name'] = 'IBM'
+ data['driver_version'] = self.get_version()
+ data['storage_protocol'] = self.driver_volume_type
+
+ self._ensure_shares_mounted()
+
+ global_capacity = 0
+ global_free = 0
+ for share in self._mounted_shares:
+ capacity, free, _used = self._get_capacity_info(share)
+ global_capacity += capacity
+ global_free += free
+
+ data['total_capacity_gb'] = global_capacity / float(units.GiB)
+ data['free_capacity_gb'] = global_free / float(units.GiB)
+ data['reserved_percentage'] = 0
+ data['QoS_support'] = False
+ self._stats = data
+ LOG.debug("Exit _update_volume_stats")
+
+ def _create_ibmnas_snap(self, src, dest, mount_path):
+ """Create volume clones and snapshots."""
+ LOG.debug(_("Enter _create_ibmnas_snap: src %(src)s, dest %(dest)s")
+ % {'src': str(src), 'dest': str(dest)})
+ if mount_path is not None:
+ tmp_file_path = dest + '.snap'
+ ssh_cmd = ['mkclone', '-p', dest, '-s', src, '-t', tmp_file_path]
+ try:
+ self._run_ssh(ssh_cmd)
+ except processutils.ProcessExecutionError as e:
+ msg = (_("Failed in _create_ibmnas_snap during "
+ "create_snapshot. Error: %s") % e.stderr)
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ #Now remove the tmp file
+ tmp_file_local_path = os.path.join(mount_path,
+ os.path.basename(tmp_file_path))
+ self._execute('rm', '-f', tmp_file_local_path, run_as_root=True)
+ else:
+ ssh_cmd = ['mkclone', '-s', src, '-t', dest]
+ try:
+ self._run_ssh(ssh_cmd)
+ except processutils.ProcessExecutionError as e:
+ msg = (_("Failed in _create_ibmnas_snap during "
+ "create_volume_from_snapshot. Error: %s") % e.stderr)
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ LOG.debug("Exit _create_ibmnas_snap")
+
+ def _create_ibmnas_copy(self, src, dest, snap):
+ """Create a cloned volume, parent & the clone both remain writable."""
+ LOG.debug(_('Enter _create_ibmnas_copy: src %(src)s, dest %(dest)s, '
+ 'snap %(snap)s') % {'src': str(src), 'dest': str(dest),
+ 'snap': str(snap)})
+ ssh_cmd = ['mkclone', '-p', snap, '-s', src, '-t', dest]
+ try:
+ self._run_ssh(ssh_cmd)
+ except processutils.ProcessExecutionError as e:
+ msg = (_("Failed in _create_ibmnas_copy. Error: %s") % e.stderr)
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ LOG.debug("Exit _create_ibmnas_copy")
+
+ def _resize_volume_file(self, path, new_size):
+ """Resize the image file on share to new size."""
+ LOG.info(_('Resizing file to %sG'), new_size)
+ try:
+ image_utils.resize_image(path, new_size)
+ except processutils.ProcessExecutionError as e:
+ msg = (_("Failed to resize volume "
+ "%(volume_id)s, error: %(error)s") %
+ {'volume_id': os.path.basename(path).split('-')[1],
+ 'error': e.stderr})
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ return True
+
+ def extend_volume(self, volume, new_size):
+ """Extend an existing volume to the new size."""
+ LOG.info(_('Extending volume %s.'), volume['name'])
+ path = self.local_path(volume)
+ self._resize_volume_file(path, new_size)
+
+ def _delete_snapfiles(self, fchild, mount_point):
+ LOG.debug(_('Enter _delete_snapfiles: fchild %(fchild)s, '
+ 'mount_point %(mount_point)s')
+ % {'fchild': str(fchild), 'mount_point': str(mount_point)})
+ ssh_cmd = ['lsclone', fchild]
+ try:
+ (out, _err) = self._run_ssh(ssh_cmd, check_exit_code=False)
+ except processutils.ProcessExecutionError as e:
+ msg = (_("Failed in _delete_snapfiles. Error: %s") % e.stderr)
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ fparent = None
+ reInode = re.compile(
+ r'.*\s+(?:yes|no)\s+\d+\s+(?P<inode>\d+)', re.M | re.S)
+ match = reInode.match(out)
+ if match:
+ inode = match.group('inode')
+ path = mount_point
+ (out, _err) = self._execute('find', path, '-maxdepth', '1',
+ '-inum', inode, run_as_root=True)
+ if out:
+ fparent = out.split('\n', 1)[0]
+ fchild_local_path = os.path.join(mount_point, os.path.basename(fchild))
+ self._execute(
+ 'rm', '-f', fchild_local_path, check_exit_code=False,
+ run_as_root=True)
+
+ # There is no need to check for volume references on this snapshot
+ # because 'rm -f' itself serves as a simple and implicit check. If the
+ # parent is referenced by another volume, system doesn't allow deleting
+ # it. 'rm -f' silently fails and the subsequent check on the path
+ # indicates whether there are any volumes derived from that snapshot.
+ # If there are such volumes, we quit recursion and let the other
+ # volumes delete the snapshot later. If there are no references, rm
+ # would succeed and the snapshot is deleted.
+ if not os.path.exists(fchild) and fparent:
+ fpbase = os.path.basename(fparent)
+ if (fpbase.endswith('.ts') or fpbase.endswith('.snap')):
+ fparent_remote_path = os.path.join(os.path.dirname(fchild),
+ fpbase)
+ self._delete_snapfiles(fparent_remote_path, mount_point)
+ LOG.debug("Exit _delete_snapfiles")
+
+ def delete_volume(self, volume):
+ """Deletes a logical volume."""
+ if not volume['provider_location']:
+ LOG.warn(_('Volume %s does not have provider_location specified, '
+ 'skipping.'), volume['name'])
+ return
+
+ export_path = self._get_export_path(volume['id'])
+ volume_name = volume['name']
+ volume_path = os.path.join(export_path, volume_name)
+ mount_point = os.path.dirname(self.local_path(volume))
+
+ # Delete all dependent snapshots, the snapshot will get deleted
+ # if the link count goes to zero, else rm will fail silently
+ self._delete_snapfiles(volume_path, mount_point)
+
+ def create_snapshot(self, snapshot):
+ """Creates a volume snapshot."""
+ export_path = self._get_export_path(snapshot['volume_id'])
+ snapshot_path = os.path.join(export_path, snapshot['name'])
+ volume_path = os.path.join(export_path, snapshot['volume_name'])
+ nfs_share = self._get_provider_location(snapshot['volume_id'])
+ mount_path = self._get_mount_point_for_share(nfs_share)
+ self._create_ibmnas_snap(src=volume_path, dest=snapshot_path,
+ mount_path=mount_path)
+
+ def delete_snapshot(self, snapshot):
+ """Deletes a volume snapshot."""
+ # A snapshot file is deleted as a part of delete_volume when
+ # all volumes derived from it are deleted.
+
+ # Rename the deleted snapshot to indicate it no longer exists in
+ # cinder db. Attempt to delete the snapshot. If the snapshot has
+ # clone children, the delete will fail silently. When volumes that
+ # are clone children are deleted in the future, the remaining ts
+ # snapshots will also be deleted.
+ nfs_share = self._get_provider_location(snapshot['volume_id'])
+ mount_path = self._get_mount_point_for_share(nfs_share)
+ snapshot_path = os.path.join(mount_path, snapshot['name'])
+ snapshot_ts_path = '%s.ts' % snapshot_path
+ self._execute('mv', '-f', snapshot_path, snapshot_ts_path,
+ check_exit_code=True, run_as_root=True)
+ self._execute('rm', '-f', snapshot_ts_path,
+ check_exit_code=False, run_as_root=True)
+
+ def create_volume_from_snapshot(self, volume, snapshot):
+ """Creates a volume from an existing volume snapshot.
+
+ Extends the volume if the volume size is more than the snapshot size.
+ """
+ export_path = self._get_export_path(snapshot['volume_id'])
+ snapshot_path = os.path.join(export_path, snapshot.name)
+ volume_path = os.path.join(export_path, volume['name'])
+ self._create_ibmnas_snap(snapshot_path, volume_path, None)
+
+ volume['provider_location'] = self._find_share(volume['size'])
+ volume_path = self.local_path(volume)
+ self._set_rw_permissions_for_all(volume_path)
+
+ #Extend the volume if required
+ self._resize_volume_file(volume_path, volume['size'])
+ return {'provider_location': volume['provider_location']}
+
+ def create_cloned_volume(self, volume, src_vref):
+ """Creates a clone of the specified volume.
+
+ Extends the volume if the new volume size is more than
+ the source volume size.
+ """
+ export_path = self._get_export_path(src_vref['id'])
+ src_vol_path = os.path.join(export_path, src_vref['name'])
+ dest_vol_path = os.path.join(export_path, volume['name'])
+ snap_file_name = volume['name']
+ snap_file_name = snap_file_name + '.snap'
+ snap_file_path = os.path.join(export_path, snap_file_name)
+ self._create_ibmnas_copy(src_vol_path, dest_vol_path, snap_file_path)
+
+ volume['provider_location'] = self._find_share(volume['size'])
+ volume_path = self.local_path(volume)
+
+ self._set_rw_permissions_for_all(volume_path)
+
+ #Extend the volume if required
+ self._resize_volume_file(volume_path, volume['size'])
+
+ return {'provider_location': volume['provider_location']}