# under the License.
"""Unit tests for the NetApp-specific NFS driver module."""
+from itertools import chain
import os
from lxml import etree
'port': 443,
'username': 'admin',
'password': 'passw0rd'}
+SEVEN_MODE_CONNECTION_INFO = dict(chain(CONNECTION_INFO.items(),
+ {'vfiler': 'test_vfiler'}.items()))
FAKE_VSERVER = 'fake_vserver'
configuration.netapp_server_port = CONNECTION_INFO['port']
configuration.netapp_login = CONNECTION_INFO['username']
configuration.netapp_password = CONNECTION_INFO['password']
+ configuration.netapp_vfiler = SEVEN_MODE_CONNECTION_INFO['vfiler']
return configuration
def test_do_setup(self, mock_client_init, mock_super_do_setup):
context = mock.Mock()
self._driver.do_setup(context)
- mock_client_init.assert_called_once_with(**CONNECTION_INFO)
+ mock_client_init.assert_called_once_with(**SEVEN_MODE_CONNECTION_INFO)
mock_super_do_setup.assert_called_once_with(context)
@mock.patch.object(nfs_base.NetAppNfsDriver, 'check_for_setup_error')
actual_pathname = self.client.get_actual_path_for_export(
fake_export_path)
+ __, __, _kwargs = self.connection.invoke_successfully.mock_calls[0]
+ enable_tunneling = _kwargs['enable_tunneling']
+
self.assertEqual(expected_actual_pathname, actual_pathname)
+ self.assertTrue(enable_tunneling)
def test_clone_file(self):
expected_src_path = "fake_src_path"
self.client.clone_file(expected_src_path, expected_dest_path)
- __, _args, __ = self.connection.invoke_successfully.mock_calls[0]
+ __, _args, _kwargs = self.connection.invoke_successfully.mock_calls[0]
actual_request = _args[0]
+ enable_tunneling = _kwargs['enable_tunneling']
actual_src_path = actual_request \
.get_child_by_name('source-path').get_content()
actual_dest_path = actual_request.get_child_by_name(
self.assertEqual(expected_dest_path, actual_dest_path)
self.assertEqual(actual_request.get_child_by_name(
'destination-exists'), None)
+ self.assertTrue(enable_tunneling)
def test_clone_file_when_clone_fails(self):
"""Ensure clone is cleaned up on failure."""
expected_src_path,
expected_dest_path)
- __, _args, __ = self.connection.invoke_successfully.mock_calls[0]
+ __, _args, _kwargs = self.connection.invoke_successfully.mock_calls[0]
actual_request = _args[0]
+ enable_tunneling = _kwargs['enable_tunneling']
actual_src_path = actual_request \
.get_child_by_name('source-path').get_content()
actual_dest_path = actual_request.get_child_by_name(
self.assertEqual(expected_dest_path, actual_dest_path)
self.assertEqual(actual_request.get_child_by_name(
'destination-exists'), None)
+ self.assertTrue(enable_tunneling)
- __, _args, __ = self.connection.invoke_successfully.mock_calls[1]
+ __, _args, _kwargs = self.connection.invoke_successfully.mock_calls[1]
actual_request = _args[0]
+ enable_tunneling = _kwargs['enable_tunneling']
actual_clone_id = actual_request.get_child_by_name('clone-id')
actual_clone_id_info = actual_clone_id.get_child_by_name(
'clone-id-info')
self.assertEqual(fake_clone_op_id, actual_clone_op_id)
self.assertEqual(fake_volume_id, actual_volume_uuid)
+ self.assertTrue(enable_tunneling)
# Ensure that the clone-clear call is made upon error
- __, _args, __ = self.connection.invoke_successfully.mock_calls[2]
+ __, _args, _kwargs = self.connection.invoke_successfully.mock_calls[2]
actual_request = _args[0]
+ enable_tunneling = _kwargs['enable_tunneling']
actual_clone_id = actual_request \
.get_child_by_name('clone-id').get_content()
self.assertEqual(fake_clone_op_id, actual_clone_id)
+ self.assertTrue(enable_tunneling)
def test_get_file_usage(self):
expected_bytes = "2048"
"""Gets the actual path on the filer for export path."""
storage_path = netapp_api.NaElement.create_node_with_children(
'nfs-exportfs-storage-path', **{'pathname': export_path})
- result = self.connection.invoke_successfully(storage_path)
+ result = self.connection.invoke_successfully(storage_path,
+ enable_tunneling=True)
if result.get_child_content('actual-pathname'):
return result.get_child_content('actual-pathname')
raise exception.NotFound(_('No storage path found for export path %s')
**{'source-path': src_path,
'destination-path': dest_path,
'no-snap': 'true'})
- result = self.connection.invoke_successfully(clone_start)
+ result = self.connection.invoke_successfully(clone_start,
+ enable_tunneling=True)
clone_id_el = result.get_child_by_name('clone-id')
cl_id_info = clone_id_el.get_child_by_name('clone-id-info')
vol_uuid = cl_id_info.get_child_content('volume-uuid')
'volume-uuid': vol_uuid})
task_running = True
while task_running:
- result = self.connection.invoke_successfully(clone_ls_st)
+ result = self.connection.invoke_successfully(clone_ls_st,
+ enable_tunneling=True)
status = result.get_child_by_name('status')
ops_info = status.get_children()
if ops_info:
retry = 3
while retry:
try:
- self.connection.invoke_successfully(clone_clear)
+ self.connection.invoke_successfully(clone_clear,
+ enable_tunneling=True)
break
except netapp_api.NaApiError:
# Filer might be rebooting
from cinder.openstack.common import log as logging
from cinder.volume.drivers.netapp.dataontap.client import client_7mode
from cinder.volume.drivers.netapp.dataontap import nfs_base
+from cinder.volume.drivers.netapp import options as na_opts
from cinder.volume.drivers.netapp import utils as na_utils
from cinder.volume import utils as volume_utils
def __init__(self, *args, **kwargs):
super(NetApp7modeNfsDriver, self).__init__(*args, **kwargs)
+ self.configuration.append_config_values(na_opts.netapp_7mode_opts)
def do_setup(self, context):
"""Do the customized set up on client if any for 7 mode."""
username=self.configuration.netapp_login,
password=self.configuration.netapp_password,
hostname=self.configuration.netapp_server_hostname,
- port=self.configuration.netapp_server_port)
+ port=self.configuration.netapp_server_port,
+ vfiler=self.configuration.netapp_vfiler)
def check_for_setup_error(self):
"""Checks if setup occurred properly."""
self.configuration.append_config_values(na_opts.netapp_basicauth_opts)
self.configuration.append_config_values(na_opts.netapp_transport_opts)
self.configuration.append_config_values(na_opts.netapp_img_cache_opts)
+ self.configuration.append_config_values(na_opts.netapp_nfs_extra_opts)
def set_execute(self, execute):
self._execute = execute
def __init__(self, *args, **kwargs):
super(NetAppCmodeNfsDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(na_opts.netapp_cluster_opts)
- self.configuration.append_config_values(na_opts.netapp_nfs_extra_opts)
def do_setup(self, context):
"""Do the customized set up on client for cluster mode."""
help=('The vFiler unit on which provisioning of block storage '
'volumes will be done. This option is only used by the '
'driver when connecting to an instance with a storage '
- 'family of Data ONTAP operating in 7-Mode and the '
- 'storage protocol selected is iSCSI. Only use this '
+ 'family of Data ONTAP operating in 7-Mode. Only use this '
'option when utilizing the MultiStore feature on the '
'NetApp storage system.')), ]
# The vFiler unit on which provisioning of block storage
# volumes will be done. This option is only used by the driver
# when connecting to an instance with a storage family of Data
-# ONTAP operating in 7-Mode and the storage protocol selected
-# is iSCSI. Only use this option when utilizing the MultiStore
-# feature on the NetApp storage system. (string value)
+# ONTAP operating in 7-Mode. Only use this option when
+# utilizing the MultiStore feature on the NetApp storage
+# system. (string value)
#netapp_vfiler=<None>
# Administrative user account name used to access the storage