+++ /dev/null
-# Copyright (c) 2014 Symantec Corporation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import os
-
-import mock
-from oslo_config import cfg
-from oslo_log import log as logging
-
-from cinder import context
-from cinder import exception
-from cinder import test
-from cinder.volume import configuration as conf
-from cinder.volume.drivers import symantec_cnfs as cnfs
-
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-
-
-class SymantecCNFSDriverTestCase(test.TestCase):
-
- """Test case for SymantecCNFS driver."""
- TEST_CNFS_SHARE = 'cnfs-host1:/share'
- TEST_VOL_NM = 'volume-a6707cd3-348c-45cd-9524-255be0939b60'
- TEST_SNAP_NM = 'snapshot-73368c68-1c0b-4027-ba8a-14629918945e'
- TEST_VOL_SIZE = 1
- TEST_MNT_BASE = '/cnfs/share'
- TEST_LOCAL_PATH = '/cnfs/share/mnt'
- TEST_VOL_LOCAL_PATH = TEST_LOCAL_PATH + '/' + TEST_VOL_NM
- TEST_SNAP_LOCAL_PATH = TEST_LOCAL_PATH + '/' + TEST_SNAP_NM
- TEST_SPL_SNAP_LOCAL_PATH = TEST_SNAP_LOCAL_PATH + "::snap:vxfs:"
- TEST_NFS_SHARES_CONFIG = '/etc/cinder/symc_nfs_share'
- TEST_NFS_MOUNT_OPTIONS_FAIL_NONE = ''
- TEST_NFS_MOUNT_OPTIONS_FAIL_V4 = 'nfsvers=4'
- TEST_NFS_MOUNT_OPTIONS_FAIL_V2 = 'nfsvers=2'
- TEST_NFS_MOUNT_OPTIONS_PASS_V3 = 'nfsvers=3'
- TEST_VOL_ID = 'a6707cd3-348c-45cd-9524-255be0939b60'
- SNAPSHOT_ID = '73368c68-1c0b-4027-ba8a-14629918945e'
- TEST_VOL = {'name': TEST_VOL_NM, 'size': TEST_VOL_SIZE,
- 'id': TEST_VOL_ID, 'provider_location': TEST_CNFS_SHARE}
- TEST_SNAP = {'name': TEST_SNAP_NM, 'volume_name': TEST_VOL_NM,
- 'volume_id': TEST_VOL_ID, 'provider_location': None}
-
- def setUp(self):
- super(SymantecCNFSDriverTestCase, self).setUp()
- self.configuration = mock.Mock(conf.Configuration)
- self.configuration.nfs_shares_config = self.TEST_NFS_SHARES_CONFIG
- self.configuration.nfs_sparsed_volumes = True
- self.configuration.nfs_mount_point_base = self.TEST_MNT_BASE
- self.configuration.nfs_mount_options = (self.
- TEST_NFS_MOUNT_OPTIONS_PASS_V3)
- self.configuration.nfs_oversub_ratio = 1.0
- self.configuration.nfs_used_ratio = 0.95
- self.configuration.nfs_disk_util = 'df'
- self.configuration.nas_secure_file_operations = 'false'
- self.configuration.nas_secure_file_permissions = 'false'
- self.driver = cnfs.SymantecCNFSDriver(configuration=self.configuration)
-
- def test_throw_error_if_nfs_mount_options_not_configured(self):
- """Fail if no nfs mount options are configured"""
- drv = self.driver
- drv._mounted_shares = [self.TEST_CNFS_SHARE]
- drv._ensure_shares_mounted = mock.Mock()
- none_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_NONE
- self.configuration.nfs_mount_options = none_opts
- self.assertRaises(
- exception.NfsException, drv.do_setup, context.RequestContext)
-
- def test_throw_error_if_nfs_mount_options_configured_with_NFSV2(self):
- """Fail if nfs mount options is not nfsv4 """
- drv = self.driver
- drv._mounted_shares = [self.TEST_CNFS_SHARE]
- drv._ensure_shares_mounted = mock.Mock()
- nfs_v2_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_V2
- self.configuration.nfs_mount_options = nfs_v2_opts
- self.assertRaises(
- exception.NfsException, drv.do_setup, context.RequestContext)
-
- def test_throw_error_if_nfs_mount_options_configured_with_NFSV4(self):
- """Fail if nfs mount options is not nfsv4 """
- drv = self.driver
- drv._ensure_shares_mounted = mock.Mock()
- drv._mounted_shares = [self.TEST_CNFS_SHARE]
- nfs_v4_opts = self.TEST_NFS_MOUNT_OPTIONS_FAIL_V4
- self.configuration.nfs_mount_options = nfs_v4_opts
- self.assertRaises(
- exception.NfsException, drv.do_setup, context.RequestContext)
-
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_get_local_volume_path')
- @mock.patch.object(os.path, 'exists')
- def test_do_clone_volume_success(self, m_exists, m_get_local_volume_path):
- """test _do_clone_volume() when filesnap over nfs is supported"""
- drv = self.driver
- m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_SNAP_NM).\
- return_value = self.TEST_SNAP_LOCAL_PATH
- m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_VOL_NM).\
- return_value = self.TEST_VOL_LOCAL_PATH
- with mock.patch.object(drv, '_execute'):
- m_exists.return_value = True
- drv._do_clone_volume(self.TEST_VOL, self.TEST_VOL_NM,
- self.TEST_SNAP)
- self.assertEqual(self.TEST_VOL['provider_location'],
- self.TEST_SNAP['provider_location'])
-
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_get_local_volume_path')
- @mock.patch.object(os.path, 'exists')
- def test_do_clone_volume_fail(self, m_exists, m_get_local_volume_path):
- """test _do_clone_volume() when filesnap over nfs is supported"""
- drv = self.driver
- m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_SNAP_NM).\
- return_value = self.TEST_SNAP_LOCAL_PATH
- m_get_local_volume_path(self.TEST_CNFS_SHARE, self.TEST_VOL_NM).\
- return_value = self.TEST_VOL_LOCAL_PATH
- with mock.patch.object(drv, '_execute'):
- m_exists.return_value = False
- self.assertRaises(exception.NfsException, drv._do_clone_volume,
- self.TEST_VOL, self.TEST_VOL_NM,
- self.TEST_SNAP)
-
- def assign_provider_loc(self, src_vol, tgt_vol):
- tgt_vol['provider_location'] = src_vol['provider_location']
-
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_do_clone_volume')
- def test_create_volume_from_snapshot(self, m_do_clone_volume):
- """test create volume from snapshot"""
- drv = self.driver
- self.TEST_SNAP['provider_location'] = self.TEST_CNFS_SHARE
- self.TEST_VOL['provider_location'] = None
- loc = self.assign_provider_loc(self.TEST_SNAP, self.TEST_VOL)
- m_do_clone_volume(self.TEST_SNAP, self.TEST_SNAP_NM,
- self.TEST_VOL).return_value = loc
- loc = drv.create_volume_from_snapshot(self.TEST_VOL, self.TEST_SNAP)
- self.assertIsNotNone(loc)
-
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_volid_to_vol')
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_do_clone_volume')
- def test_create_snapshot(self, m_do_clone_volume, m_volid_to_vol):
- """test create snapshot"""
- drv = self.driver
- self.TEST_SNAP['provider_location'] = None
- self.TEST_VOL['provider_location'] = self.TEST_CNFS_SHARE
- m_volid_to_vol(self.TEST_SNAP['volume_id']).AndReturn(self.TEST_VOL)
- m_do_clone_volume(self.TEST_VOL, self.TEST_VOL_NM, self.TEST_SNAP).\
- AndReturn(self.assign_provider_loc(self.TEST_VOL, self.TEST_SNAP))
- loc = drv.create_snapshot(self.TEST_SNAP)
- self.assertIsNotNone(loc)
-
- @mock.patch.object(cnfs.SymantecCNFSDriver, '_ensure_share_mounted')
- @mock.patch.object(cnfs.SymantecCNFSDriver, 'local_path')
- def test_delete_snapshot(self, m_local_path, m_ensure_share_mounted):
- """test delete snapshot"""
- drv = self.driver
- self.TEST_SNAP['provider_location'] = self.TEST_CNFS_SHARE
- m_ensure_share_mounted(self.TEST_CNFS_SHARE).AndReturn(None)
- m_local_path(self.TEST_SNAP).AndReturn(self.TEST_SNAP_LOCAL_PATH)
- with mock.patch.object(drv, '_execute'):
- drv.delete_snapshot(self.TEST_SNAP)
+++ /dev/null
-# Copyright (c) 2014 Symantec Corporation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-
-from oslo_log import log as logging
-
-from cinder import exception
-from cinder.i18n import _, _LW
-from cinder.volume.drivers import nfs
-
-LOG = logging.getLogger(__name__)
-
-
-class SymantecCNFSDriver(nfs.NfsDriver):
-
- """Symantec Clustered NFS based cinder driver.
-
- Executes commands relating to Volumes.
- """
-
- VERSION = "1.0.1"
- driver_volume_type = 'nfs'
-
- def __init__(self, *args, **kwargs):
- self._execute = None
- self._context = None
- super(SymantecCNFSDriver, self).__init__(*args, **kwargs)
-
- def do_setup(self, context):
- self._context = context
- super(SymantecCNFSDriver, self).do_setup(context)
- opts = self.configuration.nfs_mount_options
- if not opts or opts.find('vers=3') == -1 or (
- opts.find('nfsvers=3')) == -1:
- msg = _("NFS is not configured to use NFSv3.")
- LOG.error(msg)
- raise exception.NfsException(msg)
-
- def create_volume_from_snapshot(self, volume, snapshot):
- """Creates a volume from snapshot."""
- vol_name = volume['name']
- snap_name = snapshot['name']
- LOG.debug("SymantecNFSDriver create_volume_from_snapshot called "
- "vol_name %r snap_name %r", vol_name, snap_name)
- self._do_clone_volume(snapshot, snap_name, volume)
- LOG.debug("SymantecNFSDriver create_volume_from_snapshot-2 %r",
- volume['provider_location'])
- return {'provider_location': volume['provider_location']}
-
- def _volid_to_vol(self, volid):
- vol = self.db.volume_get(self._context, volid)
- return vol
-
- def create_snapshot(self, snapshot):
- """Create a snapshot of the volume."""
- src_vol_id = snapshot['volume_id']
- src_vol_name = snapshot['volume_name']
- src_vol = self._volid_to_vol(src_vol_id)
- self._do_clone_volume(src_vol, src_vol_name, snapshot)
- LOG.debug("SymantecNFSDriver create_snapshot %r",
- snapshot['provider_location'])
- return {'provider_location': snapshot['provider_location']}
-
- def delete_snapshot(self, snapshot):
- """Delete a snapshot."""
- if not snapshot['provider_location']:
- LOG.warn(_LW('Snapshot %s does not have provider_location '
- 'specified, skipping.'), snapshot['name'])
- return
- self._ensure_share_mounted(snapshot['provider_location'])
- snap_path = self.local_path(snapshot)
- self._execute('rm', '-f', snap_path, run_as_root=True)
-
- def create_cloned_volume(self, volume, src_vref):
- """Create a clone of the volume."""
- self.create_volume_from_snapshot(volume, src_vref)
-
- def _get_local_volume_path(self, provider_loc, vol_name):
- mnt_path = self._get_mount_point_for_share(provider_loc)
- vol_path = os.path.join(mnt_path, vol_name)
- return vol_path
-
- def _do_clone_volume(self, src_vol, src_vol_name, tgt_vol):
- cnfs_share = src_vol['provider_location']
- tgt_vol_name = tgt_vol['name']
- tgt_vol_path = self._get_local_volume_path(cnfs_share, tgt_vol_name)
- src_vol_path = self._get_local_volume_path(cnfs_share, src_vol_name)
- tgt_vol_path_spl = tgt_vol_path + "::snap:vxfs:"
- self._execute('ln', src_vol_path, tgt_vol_path_spl, run_as_root=True)
- LOG.debug("SymantecNFSDrivers: do_clone_volume src_vol_path %r "
- "tgt_vol_path %r tgt_vol_path_spl %r",
- src_vol_path, tgt_vol_path, tgt_vol_path_spl)
- if not os.path.exists(tgt_vol_path):
- self._execute('rm', '-f', tgt_vol_path_spl, run_as_root=True)
- msg = _("Filesnap over NFS is not supported, "
- "removing the ::snap:vxfs: file.")
- LOG.error(msg)
- raise exception.NfsException(msg)
- tgt_vol['provider_location'] = src_vol['provider_location']
-
- def _update_volume_stats(self):
- super(SymantecCNFSDriver, self)._update_volume_stats()
- backend_name = self.configuration.safe_get('volume_backend_name')
- self._stats["volume_backend_name"] = backend_name or 'SymantecCNFS'
- self._stats["vendor_name"] = 'Symantec'
- self._stats["driver_version"] = self.VERSION
- self._stats["storage_protocol"] = self.driver_volume_type