--- /dev/null
+# Copyright (c) 2014 Symantec Corporation
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import mock
+from oslo.config import cfg
+
+from cinder import context
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import test
+from cinder.volume import configuration as conf
+from cinder.volume.drivers import symantec_cnfs as cnfs
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class SymantecCNFSDriverTestCase(test.TestCase):
+
+ """Test case for SymantecCNFS driver."""
+ TEST_CNFS_SHARE = 'cnfs-host1:/share'
+ TEST_VOL_NM = 'volume-a6707cd3-348c-45cd-9524-255be0939b60'
+ TEST_SNAP_NM = 'snapshot-73368c68-1c0b-4027-ba8a-14629918945e'
+ TEST_VOL_SIZE = 1
+ TEST_MNT_BASE = '/cnfs/share'
+ TEST_LOCAL_PATH = '/cnfs/share/mnt'
+ TEST_VOL_LOCAL_PATH = TEST_LOCAL_PATH + '/' + TEST_VOL_NM
+ TEST_SNAP_LOCAL_PATH = TEST_LOCAL_PATH + '/' + TEST_SNAP_NM
+ TEST_SPL_SNAP_LOCAL_PATH = TEST_SNAP_LOCAL_PATH + "::snap:vxfs:"
+ TEST_NFS_SHARES_CONFIG = '/etc/cinder/symc_nfs_share'
+ TEST_NFS_MOUNT_OPTIONS_FAIL_NONE = ''
+ TEST_NFS_MOUNT_OPTIONS_FAIL_V4 = 'nfsvers=4'
+ TEST_NFS_MOUNT_OPTIONS_FAIL_V2 = 'nfsvers=2'
+ TEST_NFS_MOUNT_OPTIONS_PASS_V3 = 'nfsvers=3'
+ TEST_VOL_ID = 'a6707cd3-348c-45cd-9524-255be0939b60'
+ SNAPSHOT_ID = '73368c68-1c0b-4027-ba8a-14629918945e'
+ TEST_VOL = {'name': TEST_VOL_NM, 'size': TEST_VOL_SIZE,
+ 'id': TEST_VOL_ID, 'provider_location': TEST_CNFS_SHARE}
+ TEST_SNAP = {'name': TEST_SNAP_NM, 'volume_name': TEST_VOL_NM,
+ 'volume_id': TEST_VOL_ID, 'provider_location': None}
+
+ def setUp(self):
+ super(SymantecCNFSDriverTestCase, self).setUp()
+ self.configuration = mock.Mock(conf.Configuration)
+ self.configuration.nfs_shares_config = self.TEST_NFS_SHARES_CONFIG
+ self.configuration.nfs_sparsed_volumes = True
+ self.configuration.nfs_mount_point_base = self.TEST_MNT_BASE
+ self.configuration.nfs_mount_options = (self.
+ TEST_NFS_MOUNT_OPTIONS_PASS_V3)
+ self.configuration.nfs_oversub_ratio = 1.0
+ self.configuration.nfs_used_ratio = 0.95
+ self.configuration.nfs_disk_util = 'df'
+ self.configuration.nas_secure_file_operations = 'false'
+ self.configuration.nas_secure_file_permissions = 'false'
+ self.driver = cnfs.SymantecCNFSDriver(configuration=self.configuration)
+
+ def test_throw_error_if_nfs_mount_options_not_configured(self):
+ """Fail if no nfs mount options are configured"""
+ drv = self.driver
+ drv._mounted_shares = [self.TEST_CNFS_SHARE]
+ drv._ensure_shares_mounted = mock.Mock()
+ none_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_NONE
+ self.configuration.nfs_mount_options = none_opts
+ self.assertRaises(
+ exception.NfsException, drv.do_setup, context.RequestContext)
+
+ def test_throw_error_if_nfs_mount_options_configured_with_NFSV2(self):
+ """Fail if nfs mount options is not nfsv4 """
+ drv = self.driver
+ drv._mounted_shares = [self.TEST_CNFS_SHARE]
+ drv._ensure_shares_mounted = mock.Mock()
+ nfs_v2_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_V2
+ self.configuration.nfs_mount_options = nfs_v2_opts
+ self.assertRaises(
+ exception.NfsException, drv.do_setup, context.RequestContext)
+
+ def test_throw_error_if_nfs_mount_options_configured_with_NFSV4(self):
+ """Fail if nfs mount options is not nfsv4 """
+ drv = self.driver
+ drv._ensure_shares_mounted = mock.Mock()
+ drv._mounted_shares = [self.TEST_CNFS_SHARE]
+ nfs_v4_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_V4
+ self.configuration.nfs_mount_options = nfs_v4_opts
+ self.assertRaises(
+ exception.NfsException, drv.do_setup, context.RequestContext)
+
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_get_local_volume_path')
+ @mock.patch.object(os.path, 'exists')
+ def test_do_clone_volume_success(self, m_exists, m_get_local_volume_path):
+ """test _do_clone_volume() when filesnap over nfs is supported"""
+ drv = self.driver
+ m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_SNAP_NM).\
+ return_value = self.TEST_SNAP_LOCAL_PATH
+ m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_VOL_NM).\
+ return_value = self.TEST_VOL_LOCAL_PATH
+ with mock.patch.object(drv, '_execute'):
+ m_exists.return_value = True
+ drv._do_clone_volume(self.TEST_VOL, self.TEST_VOL_NM,
+ self.TEST_SNAP)
+ self.assertEqual(self.TEST_VOL['provider_location'],
+ self.TEST_SNAP['provider_location'])
+
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_get_local_volume_path')
+ @mock.patch.object(os.path, 'exists')
+ def test_do_clone_volume_fail(self, m_exists, m_get_local_volume_path):
+ """test _do_clone_volume() when filesnap over nfs is supported"""
+ drv = self.driver
+ m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_SNAP_NM).\
+ return_value = self.TEST_SNAP_LOCAL_PATH
+ m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_VOL_NM).\
+ return_value = self.TEST_VOL_LOCAL_PATH
+ with mock.patch.object(drv, '_execute'):
+ m_exists.return_value = False
+ self.assertRaises(exception.NfsException, drv._do_clone_volume,
+ self.TEST_VOL, self.TEST_VOL_NM,
+ self.TEST_SNAP)
+
+ def assign_provider_loc(self, src_vol, tgt_vol):
+ tgt_vol['provider_location'] = src_vol['provider_location']
+
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_do_clone_volume')
+ def test_create_volume_from_snapshot(self, m_do_clone_volume):
+ """test create volume from snapshot"""
+ drv = self.driver
+ self.TEST_SNAP['provider_location'] = self.TEST_CNFS_SHARE
+ self.TEST_VOL['provider_location'] = None
+ loc = self.assign_provider_loc(self.TEST_SNAP, self.TEST_VOL)
+ m_do_clone_volume(self.TEST_SNAP, self.TEST_SNAP_NM,
+ self.TEST_VOL).return_value = loc
+ loc = drv.create_volume_from_snapshot(self.TEST_VOL, self.TEST_SNAP)
+ self.assertIsNotNone(loc)
+
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_volid_to_vol')
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_do_clone_volume')
+ def test_create_snapshot(self, m_do_clone_volume, m_volid_to_vol):
+ """test create snapshot"""
+ drv = self.driver
+ self.TEST_SNAP['provider_location'] = None
+ self.TEST_VOL['provider_location'] = self.TEST_CNFS_SHARE
+ m_volid_to_vol(self.TEST_SNAP['volume_id']).AndReturn(self.TEST_VOL)
+ m_do_clone_volume(self.TEST_VOL, self.TEST_VOL_NM, self.TEST_SNAP).\
+ AndReturn(self.assign_provider_loc(self.TEST_VOL, self.TEST_SNAP))
+ loc = drv.create_snapshot(self.TEST_SNAP)
+ self.assertIsNotNone(loc)
+
+ @mock.patch.object(cnfs.SymantecCNFSDriver, '_ensure_share_mounted')
+ @mock.patch.object(cnfs.SymantecCNFSDriver, 'local_path')
+ def test_delete_snapshot(self, m_local_path, m_ensure_share_mounted):
+ """test delete snapshot"""
+ drv = self.driver
+ self.TEST_SNAP['provider_location'] = self.TEST_CNFS_SHARE
+ m_ensure_share_mounted(self.TEST_CNFS_SHARE).AndReturn(None)
+ m_local_path(self.TEST_SNAP).AndReturn(self.TEST_SNAP_LOCAL_PATH)
+ with mock.patch.object(drv, '_execute'):
+ drv.delete_snapshot(self.TEST_SNAP)
--- /dev/null
+# Copyright (c) 2014 Symantec Corporation
+# Copyright (c) 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from cinder import exception
+from cinder.i18n import _, _LW
+from cinder.openstack.common import log as logging
+from cinder.volume.drivers import nfs
+
+LOG = logging.getLogger(__name__)
+
+
+class SymantecCNFSDriver(nfs.NfsDriver):
+
+ """Symantec Clustered NFS based cinder driver.
+
+ Executes commands relating to Volumes.
+ """
+
+ VERSION = "1.0.1"
+ driver_volume_type = 'nfs'
+
+ def __init__(self, *args, **kwargs):
+ self._execute = None
+ self._context = None
+ super(SymantecCNFSDriver, self).__init__(*args, **kwargs)
+
+ def do_setup(self, context):
+ self._context = context
+ super(SymantecCNFSDriver, self).do_setup(context)
+ opts = self.configuration.nfs_mount_options
+ if not opts or opts.find('vers=3') == -1 or (
+ opts.find('nfsvers=3')) == -1:
+ msg = _("NFS is not configured to use NFSv3")
+ LOG.error(msg)
+ raise exception.NfsException(msg)
+
+ def create_volume_from_snapshot(self, volume, snapshot):
+ """Creates a volume from snapshot."""
+ vol_name = volume['name']
+ snap_name = snapshot['name']
+ LOG.debug("SymantecNFSDriver create_volume_from_snapshot called "
+ "vol_name %r snap_name %r", vol_name, snap_name)
+ self._do_clone_volume(snapshot, snap_name, volume)
+ LOG.debug("SymantecNFSDriver create_volume_from_snapshot-2 %r",
+ volume['provider_location'])
+ return {'provider_location': volume['provider_location']}
+
+ def _volid_to_vol(self, volid):
+ vol = self.db.volume_get(self._context, volid)
+ return vol
+
+ def create_snapshot(self, snapshot):
+ """Create a snapshot of the volume."""
+ src_vol_id = snapshot['volume_id']
+ src_vol_name = snapshot['volume_name']
+ src_vol = self._volid_to_vol(src_vol_id)
+ self._do_clone_volume(src_vol, src_vol_name, snapshot)
+ LOG.debug("SymantecNFSDriver create_snapshot %r",
+ snapshot['provider_location'])
+ return {'provider_location': snapshot['provider_location']}
+
+ def delete_snapshot(self, snapshot):
+ """Delete a snapshot."""
+ if not snapshot['provider_location']:
+ LOG.warn(_LW('Snapshot %s does not have provider_location '
+ 'specified, skipping'), snapshot['name'])
+ return
+ self._ensure_share_mounted(snapshot['provider_location'])
+ snap_path = self.local_path(snapshot)
+ self._execute('rm', '-f', snap_path, run_as_root=True)
+
+ def create_cloned_volume(self, volume, src_vref):
+ """Create a clone of the volume."""
+ self.create_volume_from_snapshot(volume, src_vref)
+
+ def _get_local_volume_path(self, provider_loc, vol_name):
+ mnt_path = self._get_mount_point_for_share(provider_loc)
+ vol_path = os.path.join(mnt_path, vol_name)
+ return vol_path
+
+ def _do_clone_volume(self, src_vol, src_vol_name, tgt_vol):
+ cnfs_share = src_vol['provider_location']
+ tgt_vol_name = tgt_vol['name']
+ tgt_vol_path = self._get_local_volume_path(cnfs_share, tgt_vol_name)
+ src_vol_path = self._get_local_volume_path(cnfs_share, src_vol_name)
+ tgt_vol_path_spl = tgt_vol_path + "::snap:vxfs:"
+ self._execute('ln', src_vol_path, tgt_vol_path_spl, run_as_root=True)
+ LOG.debug("SymantecNFSDrivers: do_clone_volume src_vol_path %r "
+ "tgt_vol_path %r tgt_vol_path_spl %r",
+ src_vol_path, tgt_vol_path, tgt_vol_path_spl)
+ if not os.path.exists(tgt_vol_path):
+ self._execute('rm', '-f', tgt_vol_path_spl, run_as_root=True)
+ msg = _("Filesnap over NFS is not supported, "
+ "removing the ::snap:vxfs: file")
+ LOG.error(msg)
+ raise exception.NfsException(msg)
+ tgt_vol['provider_location'] = src_vol['provider_location']
+
+ def _update_volume_stats(self):
+ super(SymantecCNFSDriver, self)._update_volume_stats()
+ backend_name = self.configuration.safe_get('volume_backend_name')
+ self._stats["volume_backend_name"] = backend_name or 'SymantecCNFS'
+ self._stats["vendor_name"] = 'Symantec'
+ self._stats["driver_version"] = self.VERSION
+ self._stats["storage_protocol"] = self.driver_volume_type