# under the License.
import os
-import sys
import mock
+from oslo_utils import units
from cinder import exception
from cinder.image import image_utils
_FAKE_VOLUME_PATH = os.path.join(_FAKE_MNT_POINT,
_FAKE_VOLUME_NAME + '.vhdx')
- def setUp(self):
+ @mock.patch.object(smbfs, 'utilsfactory')
+ @mock.patch.object(smbfs, 'remotefs')
+ def setUp(self, mock_remotefs, mock_utilsfactory):
super(WindowsSmbFsTestCase, self).setUp()
- self._mock_wmi = mock.MagicMock()
-
- mock.patch('sys.platform', 'win32').start()
- mock.patch.dict(sys.modules, ctypes=mock.DEFAULT).start()
- mock.patch('six.moves.builtins.wmi', create=True).start()
-
- self.addCleanup(mock.patch.stopall)
self._smbfs_driver = smbfs.WindowsSmbfsDriver(
configuration=mock.Mock())
- self._smbfs_driver._remotefsclient = mock.Mock()
self._smbfs_driver._delete = mock.Mock()
self._smbfs_driver.local_path = mock.Mock(
return_value=self._FAKE_VOLUME_PATH)
- self._smbfs_driver.vhdutils = mock.Mock()
def _test_create_volume(self, volume_exists=False, volume_format='vhdx'):
self._smbfs_driver.create_dynamic_vhd = mock.MagicMock()
- fake_create = self._smbfs_driver.vhdutils.create_dynamic_vhd
+ fake_create = self._smbfs_driver._vhdutils.create_dynamic_vhd
self._smbfs_driver.get_volume_format = mock.Mock(
return_value=volume_format)
self._test_create_volume(volume_format="qcow")
def test_get_capacity_info(self):
- self._smbfs_driver._remotefsclient.get_capacity_info = mock.Mock(
- return_value=(self._FAKE_TOTAL_SIZE, self._FAKE_TOTAL_AVAILABLE))
+ self._smbfs_driver._smbutils.get_share_capacity_info.return_value = (
+ self._FAKE_TOTAL_SIZE, self._FAKE_TOTAL_AVAILABLE)
self._smbfs_driver._get_total_allocated = mock.Mock(
return_value=self._FAKE_TOTAL_ALLOCATED)
self.assertEqual(expected_ret_val, ret_val)
def _test_get_img_info(self, backing_file=None):
- self._smbfs_driver.vhdutils.get_vhd_parent_path.return_value = (
+ self._smbfs_driver._vhdutils.get_vhd_parent_path.return_value = (
backing_file)
image_info = self._smbfs_driver._qemu_img_info(self._FAKE_VOLUME_PATH)
self._test_get_img_info(self._FAKE_VOLUME_PATH)
def test_create_snapshot(self):
- self._smbfs_driver.vhdutils.create_differencing_vhd = (
+ self._smbfs_driver._vhdutils.create_differencing_vhd = (
mock.Mock())
self._smbfs_driver._local_volume_dir = mock.Mock(
return_value=self._FAKE_MNT_POINT)
fake_create_diff = (
- self._smbfs_driver.vhdutils.create_differencing_vhd)
+ self._smbfs_driver._vhdutils.create_differencing_vhd)
self._smbfs_driver._do_create_snapshot(
self._FAKE_SNAPSHOT,
return_value=self._FAKE_MNT_POINT)
drv.get_volume_format = mock.Mock(
return_value=volume_format)
- drv.vhdutils.get_vhd_parent_path.return_value = (
+ drv._vhdutils.get_vhd_parent_path.return_value = (
fake_parent_path)
with mock.patch.object(image_utils, 'upload_volume') as (
fake_active_image)
upload_path = fake_temp_image_path
- drv.vhdutils.convert_vhd.assert_called_once_with(
+ drv._vhdutils.convert_vhd.assert_called_once_with(
fake_active_image_path,
fake_temp_image_path)
drv._delete.assert_called_once_with(
return_value=self._FAKE_VOLUME_PATH)
drv.configuration = mock.MagicMock()
drv.configuration.volume_dd_blocksize = mock.sentinel.block_size
- drv._extend_vhd_if_needed = mock.Mock()
with mock.patch.object(image_utils,
'fetch_to_volume_format') as fake_fetch:
mock.sentinel.image_id,
self._FAKE_VOLUME_PATH, mock.sentinel.volume_format,
mock.sentinel.block_size)
- drv._extend_vhd_if_needed.assert_called_once_with(
- self._FAKE_VOLUME_PATH, self._FAKE_VOLUME['size'])
+ drv._vhdutils.resize_vhd.assert_called_once_with(
+ self._FAKE_VOLUME_PATH,
+ self._FAKE_VOLUME['size'] * units.Gi)
def test_copy_volume_from_snapshot(self):
drv = self._smbfs_driver
return_value=fake_img_info)
drv.local_path = mock.Mock(
return_value=mock.sentinel.new_volume_path)
- drv._extend_vhd_if_needed = mock.Mock()
drv._copy_volume_from_snapshot(
self._FAKE_SNAPSHOT, self._FAKE_VOLUME,
self._FAKE_VOLUME['size'])
drv._delete.assert_called_once_with(mock.sentinel.new_volume_path)
- drv.vhdutils.convert_vhd.assert_called_once_with(
+ drv._vhdutils.convert_vhd.assert_called_once_with(
self._FAKE_VOLUME_PATH,
mock.sentinel.new_volume_path)
- drv._extend_vhd_if_needed.assert_called_once_with(
- mock.sentinel.new_volume_path, self._FAKE_VOLUME['size'])
+ drv._vhdutils.resize_vhd.assert_called_once_with(
+ mock.sentinel.new_volume_path,
+ self._FAKE_VOLUME['size'] * units.Gi)
def test_rebase_img(self):
- self._smbfs_driver._rebase_img(
+ drv = self._smbfs_driver
+ drv._rebase_img(
self._FAKE_SNAPSHOT_PATH,
self._FAKE_VOLUME_NAME + '.vhdx', 'vhdx')
- self._smbfs_driver.vhdutils.reconnect_parent.assert_called_once_with(
+ drv._vhdutils.reconnect_parent_vhd.assert_called_once_with(
self._FAKE_SNAPSHOT_PATH, self._FAKE_VOLUME_PATH)
-
- def _test_extend_vhd_if_needed(self, virtual_size_gb, requested_size_gb):
- drv = self._smbfs_driver
- virtual_size_bytes = virtual_size_gb << 30
- requested_size_bytes = requested_size_gb << 30
-
- virtual_size_dict = {'VirtualSize': virtual_size_bytes}
- drv.vhdutils.get_vhd_size = mock.Mock(return_value=virtual_size_dict)
-
- if virtual_size_gb > requested_size_gb:
- self.assertRaises(exception.VolumeBackendAPIException,
- drv._extend_vhd_if_needed,
- mock.sentinel.vhd_path,
- requested_size_gb)
- else:
- drv._extend_vhd_if_needed(mock.sentinel.vhd_path,
- requested_size_gb)
-
- if virtual_size_gb < requested_size_gb:
- drv.vhdutils.resize_vhd.assert_called_once_with(
- mock.sentinel.vhd_path, requested_size_bytes)
- else:
- self.assertFalse(drv.vhdutils.resize_vhd.called)
-
- drv.vhdutils.get_vhd_size.assert_called_once_with(
- mock.sentinel.vhd_path)
-
- def test_extend_vhd_if_needed_bigger_size(self):
- self._test_extend_vhd_if_needed(virtual_size_gb=1,
- requested_size_gb=2)
-
- def test_extend_vhd_if_needed_equal_size(self):
- self._test_extend_vhd_if_needed(virtual_size_gb=1,
- requested_size_gb=1)
-
- def test_extend_vhd_if_needed_smaller_size(self):
- self._test_extend_vhd_if_needed(virtual_size_gb=2,
- requested_size_gb=1)
+++ /dev/null
-# Copyright 2014 Cloudbase Solutions Srl
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from cinder import exception
-from cinder import test
-from cinder.volume.drivers.windows import constants
-from cinder.volume.drivers.windows import vhdutils
-
-
-class VHDUtilsTestCase(test.TestCase):
-
- _FAKE_FORMAT = 2
- _FAKE_TYPE = constants.VHD_TYPE_DYNAMIC
- _FAKE_JOB_PATH = 'fake_job_path'
- _FAKE_VHD_PATH = r'C:\fake\vhd.vhd'
- _FAKE_DEST_PATH = r'C:\fake\destination.vhdx'
- _FAKE_FILE_HANDLE = 'fake_file_handle'
- _FAKE_RET_VAL = 0
- _FAKE_VHD_SIZE = 1024
-
- def setUp(self):
- super(VHDUtilsTestCase, self).setUp()
- self._setup_mocks()
- self._vhdutils = vhdutils.VHDUtils()
- self._vhdutils._msft_vendor_id = 'fake_vendor_id'
-
- self.addCleanup(mock.patch.stopall)
-
- def _setup_mocks(self):
- fake_ctypes = mock.Mock()
- # Use this in order to make assertions on the variables parsed by
- # references.
- fake_ctypes.byref = lambda x: x
- fake_ctypes.c_wchar_p = lambda x: x
- fake_ctypes.c_ulong = lambda x: x
-
- mock.patch.multiple(
- 'cinder.volume.drivers.windows.vhdutils',
- ctypes=fake_ctypes, kernel32=mock.DEFAULT,
- wintypes=mock.DEFAULT, virtdisk=mock.DEFAULT,
- Win32_GUID=mock.DEFAULT,
- Win32_RESIZE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
- Win32_CREATE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
- Win32_VIRTUAL_STORAGE_TYPE=mock.DEFAULT,
- Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1=mock.DEFAULT,
- Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2=mock.DEFAULT,
- Win32_MERGE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
- Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS=mock.DEFAULT,
- Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS=mock.DEFAULT,
- create=True).start()
-
- def _test_create_vhd(self, src_path=None, max_internal_size=0,
- parent_path=None, create_failed=False):
- self._vhdutils._get_device_id_by_path = mock.Mock(
- side_effect=(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
- vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHDX))
- self._vhdutils._close = mock.Mock()
-
- fake_params = (
- vhdutils.Win32_CREATE_VIRTUAL_DISK_PARAMETERS.return_value)
- fake_vst = mock.Mock()
- fake_source_vst = mock.Mock()
-
- vhdutils.Win32_VIRTUAL_STORAGE_TYPE.side_effect = [
- fake_vst, None, fake_source_vst]
- vhdutils.virtdisk.CreateVirtualDisk.return_value = int(
- create_failed)
-
- if create_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils._create_vhd,
- self._FAKE_DEST_PATH,
- constants.VHD_TYPE_DYNAMIC,
- src_path=src_path,
- max_internal_size=max_internal_size,
- parent_path=parent_path)
- else:
- self._vhdutils._create_vhd(self._FAKE_DEST_PATH,
- constants.VHD_TYPE_DYNAMIC,
- src_path=src_path,
- max_internal_size=max_internal_size,
- parent_path=parent_path)
-
- self.assertEqual(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
- fake_vst.DeviceId)
- self.assertEqual(parent_path, fake_params.ParentPath)
- self.assertEqual(max_internal_size, fake_params.MaximumSize)
-
- if src_path:
- self.assertEqual(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHDX,
- fake_source_vst.DeviceId)
- self.assertEqual(src_path, fake_params.SourcePath)
-
- vhdutils.virtdisk.CreateVirtualDisk.assert_called_with(
- vhdutils.ctypes.byref(fake_vst),
- vhdutils.ctypes.c_wchar_p(self._FAKE_DEST_PATH),
- vhdutils.VIRTUAL_DISK_ACCESS_NONE, None,
- vhdutils.CREATE_VIRTUAL_DISK_FLAG_NONE, 0,
- vhdutils.ctypes.byref(fake_params), None,
- vhdutils.ctypes.byref(vhdutils.wintypes.HANDLE()))
- self.assertTrue(self._vhdutils._close.called)
-
- def test_create_vhd_exception(self):
- self._test_create_vhd(create_failed=True)
-
- def test_create_dynamic_vhd(self):
- self._test_create_vhd(max_internal_size=1 << 30)
-
- def test_create_differencing_vhd(self):
- self._test_create_vhd(parent_path=self._FAKE_VHD_PATH)
-
- def test_convert_vhd(self):
- self._test_create_vhd(src_path=self._FAKE_VHD_PATH)
-
- def _test_open(self, open_failed=False):
- fake_device_id = vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD
-
- vhdutils.virtdisk.OpenVirtualDisk.return_value = int(open_failed)
- self._vhdutils._get_device_id_by_path = mock.Mock(
- return_value=fake_device_id)
-
- fake_vst = vhdutils.Win32_VIRTUAL_STORAGE_TYPE.return_value
- fake_params = 'fake_params'
- fake_access_mask = vhdutils.VIRTUAL_DISK_ACCESS_NONE
- fake_open_flag = vhdutils.OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS
-
- if open_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils._open,
- self._FAKE_VHD_PATH)
- else:
- self._vhdutils._open(self._FAKE_VHD_PATH,
- open_flag=fake_open_flag,
- open_access_mask=fake_access_mask,
- open_params=fake_params)
-
- vhdutils.virtdisk.OpenVirtualDisk.assert_called_with(
- vhdutils.ctypes.byref(fake_vst),
- vhdutils.ctypes.c_wchar_p(self._FAKE_VHD_PATH),
- fake_access_mask, fake_open_flag, fake_params,
- vhdutils.ctypes.byref(vhdutils.wintypes.HANDLE()))
-
- self.assertEqual(fake_device_id, fake_vst.DeviceId)
-
- def test_open_success(self):
- self._test_open()
-
- def test_open_failed(self):
- self._test_open(open_failed=True)
-
- def _test_get_device_id_by_path(self,
- get_device_failed=False):
- if get_device_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils._get_device_id_by_path,
- self._FAKE_VHD_PATH[:-4])
- else:
- ret_val = self._vhdutils._get_device_id_by_path(
- self._FAKE_VHD_PATH)
-
- self.assertEqual(
- ret_val,
- vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD)
-
- def test_get_device_id_by_path_success(self):
- self._test_get_device_id_by_path()
-
- def test_get_device_id_by_path_failed(self):
- self._test_get_device_id_by_path(get_device_failed=True)
-
- def _test_resize_vhd(self, resize_failed=False):
- fake_params = (
- vhdutils.Win32_RESIZE_VIRTUAL_DISK_PARAMETERS.return_value)
-
- self._vhdutils._open = mock.Mock(
- return_value=self._FAKE_FILE_HANDLE)
- self._vhdutils._close = mock.Mock()
-
- vhdutils.virtdisk.ResizeVirtualDisk.return_value = int(
- resize_failed)
-
- if resize_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils.resize_vhd,
- self._FAKE_VHD_PATH,
- self._FAKE_VHD_SIZE)
- else:
- self._vhdutils.resize_vhd(self._FAKE_VHD_PATH,
- self._FAKE_VHD_SIZE)
-
- vhdutils.virtdisk.ResizeVirtualDisk.assert_called_with(
- self._FAKE_FILE_HANDLE,
- vhdutils.RESIZE_VIRTUAL_DISK_FLAG_NONE,
- vhdutils.ctypes.byref(fake_params),
- None)
- self.assertTrue(self._vhdutils._close.called)
-
- def test_resize_vhd_success(self):
- self._test_resize_vhd()
-
- def test_resize_vhd_failed(self):
- self._test_resize_vhd(resize_failed=True)
-
- def _test_merge_vhd(self, merge_failed=False):
- self._vhdutils._open = mock.Mock(
- return_value=self._FAKE_FILE_HANDLE)
- self._vhdutils._close = mock.Mock()
-
- fake_open_params = vhdutils.Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1()
- fake_params = vhdutils.Win32_MERGE_VIRTUAL_DISK_PARAMETERS()
-
- vhdutils.virtdisk.MergeVirtualDisk.return_value = int(
- merge_failed)
- vhdutils.Win32_RESIZE_VIRTUAL_DISK_PARAMETERS.return_value = (
- fake_params)
-
- if merge_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils.merge_vhd,
- self._FAKE_VHD_PATH)
- else:
- self._vhdutils.merge_vhd(self._FAKE_VHD_PATH)
-
- self._vhdutils._open.assert_called_once_with(
- self._FAKE_VHD_PATH,
- open_params=vhdutils.ctypes.byref(fake_open_params))
- self.assertEqual(vhdutils.OPEN_VIRTUAL_DISK_VERSION_1,
- fake_open_params.Version)
- self.assertEqual(2, fake_open_params.RWDepth)
- vhdutils.virtdisk.MergeVirtualDisk.assert_called_with(
- self._FAKE_FILE_HANDLE,
- vhdutils.MERGE_VIRTUAL_DISK_FLAG_NONE,
- vhdutils.ctypes.byref(fake_params),
- None)
-
- def test_merge_vhd_success(self):
- self._test_merge_vhd()
-
- def test_merge_vhd_failed(self):
- self._test_merge_vhd(merge_failed=True)
-
- def _test_get_vhd_info_member(self, get_vhd_info_failed=False):
- fake_params = vhdutils.Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS()
- fake_info_size = vhdutils.ctypes.sizeof(fake_params)
-
- vhdutils.virtdisk.GetVirtualDiskInformation.return_value = (
- get_vhd_info_failed)
- self._vhdutils._close = mock.Mock()
-
- if get_vhd_info_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils._get_vhd_info_member,
- self._FAKE_VHD_PATH,
- vhdutils.GET_VIRTUAL_DISK_INFO_SIZE)
- else:
- self._vhdutils._get_vhd_info_member(
- self._FAKE_VHD_PATH,
- vhdutils.GET_VIRTUAL_DISK_INFO_SIZE)
-
- vhdutils.virtdisk.GetVirtualDiskInformation.assert_called_with(
- self._FAKE_VHD_PATH,
- vhdutils.ctypes.byref(
- vhdutils.ctypes.c_ulong(fake_info_size)),
- vhdutils.ctypes.byref(fake_params), 0)
-
- def test_get_vhd_info_member_success(self):
- self._test_get_vhd_info_member()
-
- def test_get_vhd_info_member_failed(self):
- self._test_get_vhd_info_member(get_vhd_info_failed=True)
-
- def test_get_vhd_info(self):
- fake_vhd_info = {'VirtualSize': self._FAKE_VHD_SIZE}
- fake_info_member = vhdutils.GET_VIRTUAL_DISK_INFO_SIZE
-
- self._vhdutils._open = mock.Mock(
- return_value=self._FAKE_FILE_HANDLE)
- self._vhdutils._close = mock.Mock()
- self._vhdutils._get_vhd_info_member = mock.Mock(
- return_value=fake_vhd_info)
-
- ret_val = self._vhdutils.get_vhd_info(self._FAKE_VHD_PATH,
- [fake_info_member])
-
- self.assertEqual(fake_vhd_info, ret_val)
- self._vhdutils._open.assert_called_once_with(
- self._FAKE_VHD_PATH,
- open_access_mask=vhdutils.VIRTUAL_DISK_ACCESS_GET_INFO)
- self._vhdutils._get_vhd_info_member.assert_called_with(
- self._FAKE_FILE_HANDLE, fake_info_member)
- self._vhdutils._close.assert_called_once_with(self._FAKE_FILE_HANDLE)
-
- def test_parse_vhd_info(self):
- fake_physical_size = self._FAKE_VHD_SIZE + 1
- fake_info_member = vhdutils.GET_VIRTUAL_DISK_INFO_SIZE
- fake_info = mock.Mock()
- fake_info.VhdInfo.Size._fields_ = [
- ("VirtualSize", vhdutils.wintypes.ULARGE_INTEGER),
- ("PhysicalSize", vhdutils.wintypes.ULARGE_INTEGER)]
- fake_info.VhdInfo.Size.VirtualSize = self._FAKE_VHD_SIZE
- fake_info.VhdInfo.Size.PhysicalSize = fake_physical_size
-
- ret_val = self._vhdutils._parse_vhd_info(fake_info, fake_info_member)
- expected = {'VirtualSize': self._FAKE_VHD_SIZE,
- 'PhysicalSize': fake_physical_size}
-
- self.assertEqual(expected, ret_val)
-
- def _test_reconnect_parent(self, reconnect_failed=False):
- fake_params = vhdutils.Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS()
- fake_open_params = vhdutils.Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2()
-
- self._vhdutils._open = mock.Mock(return_value=self._FAKE_FILE_HANDLE)
- self._vhdutils._close = mock.Mock()
- vhdutils.virtdisk.SetVirtualDiskInformation.return_value = int(
- reconnect_failed)
-
- if reconnect_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self._vhdutils.reconnect_parent,
- self._FAKE_VHD_PATH, self._FAKE_DEST_PATH)
-
- else:
- self._vhdutils.reconnect_parent(self._FAKE_VHD_PATH,
- self._FAKE_DEST_PATH)
-
- self._vhdutils._open.assert_called_once_with(
- self._FAKE_VHD_PATH,
- open_flag=vhdutils.OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS,
- open_access_mask=vhdutils.VIRTUAL_DISK_ACCESS_NONE,
- open_params=vhdutils.ctypes.byref(fake_open_params))
- self.assertEqual(vhdutils.OPEN_VIRTUAL_DISK_VERSION_2,
- fake_open_params.Version)
- self.assertFalse(fake_open_params.GetInfoOnly)
- vhdutils.virtdisk.SetVirtualDiskInformation.assert_called_once_with(
- self._FAKE_FILE_HANDLE, vhdutils.ctypes.byref(fake_params))
- self.assertEqual(self._FAKE_DEST_PATH, fake_params.ParentFilePath)
-
- def test_reconnect_parent_success(self):
- self._test_reconnect_parent()
-
- def test_reconnect_parent_failed(self):
- self._test_reconnect_parent(reconnect_failed=True)
-
- @mock.patch('sys.exc_info')
- @mock.patch.object(vhdutils, 'LOG')
- def test_run_and_check_output_fails_exc_info_set(self, mock_log,
- mock_exc_info):
- # we can't use assertRaises because we're mocking sys.exc_info and
- # that messes up how assertRaises handles the exception
- try:
- self._vhdutils._run_and_check_output(lambda *args, **kwargs: 1)
- self.fail('Expected _run_and_check_output to fail.')
- except exception.VolumeBackendAPIException:
- pass
- mock_log.error.assert_called_once_with(mock.ANY, exc_info=True)
-
- @mock.patch('sys.exc_info', return_value=None)
- @mock.patch.object(vhdutils, 'LOG')
- def test_run_and_check_output_fails_exc_info_not_set(self, mock_log,
- mock_exc_info):
- # we can't use assertRaises because we're mocking sys.exc_info and
- # that messes up how assertRaises handles the exception
- try:
- self._vhdutils._run_and_check_output(lambda *args, **kwargs: 1)
- self.fail('Expected _run_and_check_output to fail.')
- except exception.VolumeBackendAPIException:
- pass
- mock_log.error.assert_called_once_with(mock.ANY, exc_info=False)
# Copyright 2012 Pedro Navarro Perez
+# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Unit tests for Windows Server 2012 OpenStack Cinder volume driver
"""
-
+import mock
import os
-import shutil
-import tempfile
-from mox3 import mox
-from oslo_config import cfg
from oslo_utils import fileutils
+from oslo_utils import units
from cinder.image import image_utils
from cinder import test
from cinder.tests.unit.windows import db_fakes
from cinder.volume import configuration as conf
-from cinder.volume.drivers.windows import constants
-from cinder.volume.drivers.windows import vhdutils
from cinder.volume.drivers.windows import windows
-from cinder.volume.drivers.windows import windows_utils
-
-CONF = cfg.CONF
class TestWindowsDriver(test.TestCase):
-
- def __init__(self, method):
- super(TestWindowsDriver, self).__init__(method)
-
- def setUp(self):
- self.lun_path_tempdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.lun_path_tempdir)
-
+ @mock.patch.object(windows, 'utilsfactory')
+ def setUp(self, mock_utilsfactory):
super(TestWindowsDriver, self).setUp()
- self.flags(
- windows_iscsi_lun_path=self.lun_path_tempdir,
- )
- self._setup_stubs()
configuration = conf.Configuration(None)
configuration.append_config_values(windows.windows_opts)
- self._driver = windows.WindowsDriver(configuration=configuration)
- self._driver.do_setup({})
-
- def _setup_stubs(self):
-
- def fake_wutils__init__(self):
- pass
-
- windows_utils.WindowsUtils.__init__ = fake_wutils__init__
-
- def fake_local_path(self, volume):
- return os.path.join(CONF.windows_iscsi_lun_path,
- str(volume['name']) + ".vhd")
-
- def test_check_for_setup_errors(self):
- drv = self._driver
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'check_for_setup_error')
- windows_utils.WindowsUtils.check_for_setup_error()
-
- self.mox.ReplayAll()
-
- drv.check_for_setup_error()
-
- def test_create_volume(self):
- drv = self._driver
- vol = db_fakes.get_fake_volume_info()
-
- self.stubs.Set(drv, 'local_path', self.fake_local_path)
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'create_volume')
-
- windows_utils.WindowsUtils.create_volume(self.fake_local_path(vol),
- vol['name'], vol['size'])
-
- self.mox.ReplayAll()
-
- drv.create_volume(vol)
-
- def test_delete_volume(self):
- """delete_volume simple test case."""
- drv = self._driver
+ self.flags(windows_iscsi_lun_path=mock.sentinel.iscsi_lun_path)
+ self.flags(image_conversion_dir=mock.sentinel.image_conversion_dir)
- vol = db_fakes.get_fake_volume_info()
-
- self.mox.StubOutWithMock(drv, 'local_path')
- drv.local_path(vol).AndReturn(self.fake_local_path(vol))
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'delete_volume')
- windows_utils.WindowsUtils.delete_volume(vol['name'],
- self.fake_local_path(vol))
- self.mox.ReplayAll()
-
- drv.delete_volume(vol)
-
- def test_create_snapshot(self):
- drv = self._driver
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'create_snapshot')
- volume = db_fakes.get_fake_volume_info()
- snapshot = db_fakes.get_fake_snapshot_info()
-
- self.stubs.Set(drv, 'local_path', self.fake_local_path(snapshot))
-
- windows_utils.WindowsUtils.create_snapshot(volume['name'],
- snapshot['name'])
-
- self.mox.ReplayAll()
-
- drv.create_snapshot(snapshot)
-
- def test_create_volume_from_snapshot(self):
- drv = self._driver
-
- snapshot = db_fakes.get_fake_snapshot_info()
- volume = db_fakes.get_fake_volume_info()
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'create_volume_from_snapshot')
- windows_utils.WindowsUtils.\
- create_volume_from_snapshot(volume, snapshot['name'])
-
- self.mox.ReplayAll()
+ self._driver = windows.WindowsDriver(configuration=configuration)
- drv.create_volume_from_snapshot(volume, snapshot)
+ @mock.patch.object(fileutils, 'ensure_tree')
+ def test_do_setup(self, mock_ensure_tree):
+ self._driver.do_setup(mock.sentinel.context)
- def test_delete_snapshot(self):
- drv = self._driver
+ mock_ensure_tree.assert_has_calls(
+ [mock.call(mock.sentinel.iscsi_lun_path),
+ mock.call(mock.sentinel.image_conversion_dir)])
- snapshot = db_fakes.get_fake_snapshot_info()
+ def test_check_for_setup_error(self):
+ self._driver.check_for_setup_error()
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'delete_snapshot')
- windows_utils.WindowsUtils.delete_snapshot(snapshot['name'])
+ self._driver._tgt_utils.get_portal_locations.assert_called_once_with(
+ available_only=True, fail_if_none_found=True)
- self.mox.ReplayAll()
+ @mock.patch.object(windows.WindowsDriver, '_get_target_name')
+ def test_get_host_information(self, mock_get_target_name):
+ tgt_utils = self._driver._tgt_utils
- drv.delete_snapshot(snapshot)
-
- def _test_create_export(self, chap_enabled=False):
- drv = self._driver
- volume = db_fakes.get_fake_volume_info()
- initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
+ fake_auth_meth = 'CHAP'
fake_chap_username = 'fake_chap_username'
fake_chap_password = 'fake_chap_password'
-
- self.flags(use_chap_auth=chap_enabled)
- self.flags(chap_username=fake_chap_username)
- self.flags(chap_password=fake_chap_password)
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'add_disk_to_target')
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'create_iscsi_target')
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'set_chap_credentials')
- self.mox.StubOutWithMock(self._driver,
- 'remove_export')
-
- self._driver.remove_export(mox.IgnoreArg(), mox.IgnoreArg())
- windows_utils.WindowsUtils.create_iscsi_target(initiator_name)
-
- if chap_enabled:
- windows_utils.WindowsUtils.set_chap_credentials(
- mox.IgnoreArg(),
- fake_chap_username,
- fake_chap_password)
-
- windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
- initiator_name)
-
- self.mox.ReplayAll()
-
- export_info = drv.create_export(None, volume, {})
-
- self.assertEqual(initiator_name, export_info['provider_location'])
- if chap_enabled:
- expected_provider_auth = ' '.join(('CHAP',
- fake_chap_username,
- fake_chap_password))
- self.assertEqual(expected_provider_auth,
- export_info['provider_auth'])
-
- def test_create_export_chap_disabled(self):
- self._test_create_export()
-
- def test_create_export_chap_enabled(self):
- self._test_create_export(chap_enabled=True)
-
- def test_initialize_connection(self):
- drv = self._driver
-
- volume = db_fakes.get_fake_volume_info()
- initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
-
- connector = db_fakes.get_fake_connector_info()
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'associate_initiator_with_iscsi_target')
- windows_utils.WindowsUtils.associate_initiator_with_iscsi_target(
- volume['provider_location'], initiator_name, )
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'get_host_information')
- windows_utils.WindowsUtils.get_host_information(
- volume, volume['provider_location'])
-
- self.mox.ReplayAll()
-
- drv.initialize_connection(volume, connector)
+ fake_host_info = {'fake_prop': 'fake_value'}
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_volume['provider_auth'] = "%s %s %s" % (fake_auth_meth,
+ fake_chap_username,
+ fake_chap_password)
+
+ mock_get_target_name.return_value = mock.sentinel.target_name
+ tgt_utils.get_portal_locations.return_value = [
+ mock.sentinel.portal_location]
+ tgt_utils.get_target_information.return_value = fake_host_info
+
+ expected_host_info = dict(fake_host_info,
+ auth_method=fake_auth_meth,
+ auth_username=fake_chap_username,
+ auth_password=fake_chap_password,
+ target_discovered=False,
+ target_portal=mock.sentinel.portal_location,
+ target_lun=0,
+ volume_id=fake_volume['id'])
+
+ host_info = self._driver._get_host_information(fake_volume)
+
+ self.assertEqual(expected_host_info, host_info)
+
+ mock_get_target_name.assert_called_once_with(fake_volume)
+ tgt_utils.get_portal_locations.assert_called_once_with()
+ tgt_utils.get_target_information.assert_called_once_with(
+ mock.sentinel.target_name)
+
+ @mock.patch.object(windows.WindowsDriver, '_get_host_information')
+ def test_initialize_connection(self, mock_get_host_info):
+ tgt_utils = self._driver._tgt_utils
+
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_initiator = db_fakes.get_fake_connector_info()
+ fake_host_info = {'fake_host_prop': 'fake_value'}
+
+ mock_get_host_info.return_value = fake_host_info
+
+ expected_conn_info = {'driver_volume_type': 'iscsi',
+ 'data': fake_host_info}
+ conn_info = self._driver.initialize_connection(fake_volume,
+ fake_initiator)
+
+ self.assertEqual(expected_conn_info, conn_info)
+ mock_associate = tgt_utils.associate_initiator_with_iscsi_target
+ mock_associate.assert_called_once_with(
+ fake_initiator['initiator'],
+ fake_volume['provider_location'])
def test_terminate_connection(self):
- drv = self._driver
-
- volume = db_fakes.get_fake_volume_info()
- initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
- connector = db_fakes.get_fake_connector_info()
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'delete_iscsi_target')
- windows_utils.WindowsUtils.delete_iscsi_target(
- initiator_name, volume['provider_location'])
-
- self.mox.ReplayAll()
-
- drv.terminate_connection(volume, connector)
-
- def test_remove_export(self):
- drv = self._driver
-
- volume = db_fakes.get_fake_volume_info()
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_initiator = db_fakes.get_fake_connector_info()
- target_name = volume['provider_location']
+ self._driver.terminate_connection(fake_volume, fake_initiator)
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'remove_iscsi_target')
- windows_utils.WindowsUtils.remove_iscsi_target(target_name)
+ self._driver._tgt_utils.deassociate_initiator.assert_called_once_with(
+ fake_initiator['initiator'], fake_volume['provider_location'])
- self.mox.ReplayAll()
+ @mock.patch.object(windows.WindowsDriver, 'local_path')
+ def test_create_volume(self, mock_local_path):
+ fake_volume = db_fakes.get_fake_volume_info()
- drv.remove_export(None, volume)
+ self._driver.create_volume(fake_volume)
- def test_copy_image_to_volume(self):
- """resize_image common case usage."""
- drv = self._driver
+ mock_local_path.assert_called_once_with(fake_volume)
+ self._driver._tgt_utils.create_wt_disk.assert_called_once_with(
+ mock_local_path.return_value,
+ fake_volume['name'],
+ size_mb=fake_volume['size'] * 1024)
- volume = db_fakes.get_fake_volume_info()
+ def test_local_path(self):
+ fake_volume = db_fakes.get_fake_volume_info()
- fake_get_supported_type = lambda x: constants.VHD_TYPE_FIXED
- self.stubs.Set(drv, 'local_path', self.fake_local_path)
- self.stubs.Set(windows_utils.WindowsUtils, 'get_supported_vhd_type',
- fake_get_supported_type)
+ fake_lun_path = 'fake_lun_path'
+ self.flags(windows_iscsi_lun_path=fake_lun_path)
- self.mox.StubOutWithMock(os, 'unlink')
- self.mox.StubOutWithMock(image_utils, 'create_temporary_file')
- self.mox.StubOutWithMock(image_utils, 'fetch_to_vhd')
- self.mox.StubOutWithMock(vhdutils.VHDUtils, 'convert_vhd')
- self.mox.StubOutWithMock(vhdutils.VHDUtils, 'resize_vhd')
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'change_disk_status')
+ disk_format = 'vhd'
+ mock_get_fmt = self._driver._tgt_utils.get_supported_disk_format
+ mock_get_fmt.return_value = disk_format
- fake_temp_path = r'C:\fake\temp\file'
- if (CONF.image_conversion_dir and not
- os.path.exists(CONF.image_conversion_dir)):
- os.makedirs(CONF.image_conversion_dir)
- image_utils.create_temporary_file(suffix='.vhd').AndReturn(
- fake_temp_path)
+ disk_path = self._driver.local_path(fake_volume)
- fake_volume_path = self.fake_local_path(volume)
+ expected_fname = "%s.%s" % (fake_volume['name'], disk_format)
+ expected_disk_path = os.path.join(fake_lun_path,
+ expected_fname)
+ self.assertEqual(expected_disk_path, disk_path)
+ mock_get_fmt.assert_called_once_with()
- image_utils.fetch_to_vhd(None, None, None,
- fake_temp_path,
- mox.IgnoreArg())
- windows_utils.WindowsUtils.change_disk_status(volume['name'],
- mox.IsA(bool))
- vhdutils.VHDUtils.convert_vhd(fake_temp_path,
- fake_volume_path,
- constants.VHD_TYPE_FIXED)
- vhdutils.VHDUtils.resize_vhd(fake_volume_path,
- volume['size'] << 30)
- windows_utils.WindowsUtils.change_disk_status(volume['name'],
- mox.IsA(bool))
- os.unlink(mox.IsA(str))
+ @mock.patch.object(windows.WindowsDriver, 'local_path')
+ @mock.patch.object(fileutils, 'delete_if_exists')
+ def test_delete_volume(self, mock_delete_if_exists, mock_local_path):
+ fake_volume = db_fakes.get_fake_volume_info()
- self.mox.ReplayAll()
+ self._driver.delete_volume(fake_volume)
- drv.copy_image_to_volume(None, volume, None, None)
+ mock_local_path.assert_called_once_with(fake_volume)
+ self._driver._tgt_utils.remove_wt_disk.assert_called_once_with(
+ fake_volume['name'])
+ mock_delete_if_exists.assert_called_once_with(
+ mock_local_path.return_value)
- def _test_copy_volume_to_image(self, supported_format):
- drv = self._driver
-
- vol = db_fakes.get_fake_volume_info()
-
- image_meta = db_fakes.get_fake_image_meta()
-
- fake_get_supported_format = lambda x: supported_format
-
- self.stubs.Set(os.path, 'exists', lambda x: False)
- self.stubs.Set(drv, 'local_path', self.fake_local_path)
- self.stubs.Set(windows_utils.WindowsUtils, 'get_supported_format',
- fake_get_supported_format)
-
- self.mox.StubOutWithMock(fileutils, 'ensure_tree')
- self.mox.StubOutWithMock(fileutils, 'delete_if_exists')
- self.mox.StubOutWithMock(image_utils, 'upload_volume')
- self.mox.StubOutWithMock(windows_utils.WindowsUtils, 'copy_vhd_disk')
- self.mox.StubOutWithMock(vhdutils.VHDUtils, 'convert_vhd')
-
- fileutils.ensure_tree(CONF.image_conversion_dir)
- temp_vhd_path = os.path.join(CONF.image_conversion_dir,
- str(image_meta['id']) + "." +
- supported_format)
- upload_image = temp_vhd_path
-
- windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(vol),
- temp_vhd_path)
- if supported_format == 'vhdx':
- upload_image = upload_image[:-1]
- vhdutils.VHDUtils.convert_vhd(temp_vhd_path, upload_image,
- constants.VHD_TYPE_DYNAMIC)
-
- image_utils.upload_volume(None, None, image_meta, upload_image, 'vhd')
-
- fileutils.delete_if_exists(temp_vhd_path)
- fileutils.delete_if_exists(upload_image)
-
- self.mox.ReplayAll()
+ def test_create_snapshot(self):
+ fake_snapshot = db_fakes.get_fake_snapshot_info()
- drv.copy_volume_to_image(None, vol, None, image_meta)
+ self._driver.create_snapshot(fake_snapshot)
- def test_copy_volume_to_image_using_vhd(self):
- self._test_copy_volume_to_image('vhd')
+ self._driver._tgt_utils.create_snapshot.assert_called_once_with(
+ fake_snapshot['volume_name'], fake_snapshot['name'])
- def test_copy_volume_to_image_using_vhdx(self):
- self._test_copy_volume_to_image('vhdx')
+ @mock.patch.object(windows.WindowsDriver, 'local_path')
+ def test_create_volume_from_snapshot(self, mock_local_path):
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_snapshot = db_fakes.get_fake_snapshot_info()
- def test_create_cloned_volume(self):
- drv = self._driver
+ self._driver.create_volume_from_snapshot(fake_volume, fake_snapshot)
- volume = db_fakes.get_fake_volume_info()
- volume_cloned = db_fakes.get_fake_volume_info_cloned()
- new_vhd_path = self.fake_local_path(volume)
- src_vhd_path = self.fake_local_path(volume_cloned)
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'copy_vhd_disk')
- self.mox.StubOutWithMock(windows_utils.WindowsUtils,
- 'import_wt_disk')
- self.mox.StubOutWithMock(vhdutils.VHDUtils,
- 'resize_vhd')
-
- self.stubs.Set(drv.utils,
- 'is_resize_needed',
- lambda vhd_path, new_size, old_size: True)
- self.stubs.Set(drv, 'local_path', self.fake_local_path)
-
- windows_utils.WindowsUtils.copy_vhd_disk(src_vhd_path,
- new_vhd_path)
- drv.utils.is_resize_needed(new_vhd_path,
- volume['size'],
- volume_cloned['size'])
- vhdutils.VHDUtils.resize_vhd(new_vhd_path, volume['size'] << 30)
- windows_utils.WindowsUtils.import_wt_disk(new_vhd_path,
- volume['name'])
+ self._driver._tgt_utils.export_snapshot.assert_called_once_with(
+ fake_snapshot['name'],
+ mock_local_path.return_value)
+ self._driver._tgt_utils.import_wt_disk.assert_called_once_with(
+ mock_local_path.return_value,
+ fake_volume['name'])
- self.mox.ReplayAll()
+ def test_delete_snapshot(self):
+ fake_snapshot = db_fakes.get_fake_snapshot_info()
+
+ self._driver.delete_snapshot(fake_snapshot)
+
+ self._driver._tgt_utils.delete_snapshot.assert_called_once_with(
+ fake_snapshot['name'])
+
+ def test_get_target_name(self):
+ fake_volume = db_fakes.get_fake_volume_info()
+ expected_target_name = "%s%s" % (
+ self._driver.configuration.iscsi_target_prefix,
+ fake_volume['name'])
+
+ target_name = self._driver._get_target_name(fake_volume)
+ self.assertEqual(expected_target_name, target_name)
+
+ @mock.patch.object(windows.WindowsDriver, '_get_target_name')
+ @mock.patch.object(windows.utils, 'generate_username')
+ @mock.patch.object(windows.utils, 'generate_password')
+ def test_create_export(self, mock_generate_password,
+ mock_generate_username,
+ mock_get_target_name):
+ tgt_utils = self._driver._tgt_utils
+ fake_volume = db_fakes.get_fake_volume_info()
+ self._driver.configuration.chap_username = None
+ self._driver.configuration.chap_password = None
+ self._driver.configuration.use_chap_auth = True
+ fake_chap_username = 'fake_chap_username'
+ fake_chap_password = 'fake_chap_password'
- drv.create_cloned_volume(volume, volume_cloned)
+ mock_get_target_name.return_value = mock.sentinel.target_name
+ mock_generate_username.return_value = fake_chap_username
+ mock_generate_password.return_value = fake_chap_password
+ tgt_utils.iscsi_target_exists.return_value = False
+
+ vol_updates = self._driver.create_export(mock.sentinel.context,
+ fake_volume,
+ mock.sentinel.connector)
+
+ mock_get_target_name.assert_called_once_with(fake_volume)
+ tgt_utils.iscsi_target_exists.assert_called_once_with(
+ mock.sentinel.target_name)
+ tgt_utils.set_chap_credentials.assert_called_once_with(
+ mock.sentinel.target_name,
+ fake_chap_username,
+ fake_chap_password)
+ tgt_utils.add_disk_to_target.assert_called_once_with(
+ fake_volume['name'], mock.sentinel.target_name)
+
+ expected_provider_auth = ' '.join(('CHAP',
+ fake_chap_username,
+ fake_chap_password))
+ expected_vol_updates = dict(
+ provider_location=mock.sentinel.target_name,
+ provider_auth=expected_provider_auth)
+ self.assertEqual(expected_vol_updates, vol_updates)
+
+ @mock.patch.object(windows.WindowsDriver, '_get_target_name')
+ def test_remove_export(self, mock_get_target_name):
+ fake_volume = db_fakes.get_fake_volume_info()
+
+ self._driver.remove_export(mock.sentinel.context, fake_volume)
+
+ mock_get_target_name.assert_called_once_with(fake_volume)
+ self._driver._tgt_utils.delete_iscsi_target.assert_called_once_with(
+ mock_get_target_name.return_value)
+
+ @mock.patch.object(windows.WindowsDriver, 'local_path')
+ @mock.patch.object(image_utils, 'temporary_file')
+ @mock.patch.object(image_utils, 'fetch_to_vhd')
+ @mock.patch('os.unlink')
+ def test_copy_image_to_volume(self, mock_unlink, mock_fetch_to_vhd,
+ mock_tmp_file, mock_local_path):
+ tgt_utils = self._driver._tgt_utils
+ fake_volume = db_fakes.get_fake_volume_info()
+
+ mock_tmp_file.return_value.__enter__.return_value = (
+ mock.sentinel.tmp_vhd_path)
+ mock_local_path.return_value = mock.sentinel.vol_vhd_path
+
+ self._driver.copy_image_to_volume(mock.sentinel.context,
+ fake_volume,
+ mock.sentinel.image_service,
+ mock.sentinel.image_id)
+
+ mock_local_path.assert_called_once_with(fake_volume)
+ mock_tmp_file.assert_called_once_with(suffix='.vhd')
+ image_utils.fetch_to_vhd.assert_called_once_with(
+ mock.sentinel.context, mock.sentinel.image_service,
+ mock.sentinel.image_id, mock.sentinel.tmp_vhd_path,
+ self._driver.configuration.volume_dd_blocksize)
+
+ mock_unlink.assert_called_once_with(mock.sentinel.vol_vhd_path)
+ self._driver._vhdutils.convert_vhd.assert_called_once_with(
+ mock.sentinel.tmp_vhd_path,
+ mock.sentinel.vol_vhd_path,
+ tgt_utils.get_supported_vhd_type.return_value)
+ self._driver._vhdutils.resize_vhd.assert_called_once_with(
+ mock.sentinel.vol_vhd_path,
+ fake_volume['size'] * units.Gi,
+ is_file_max_size=False)
+
+ tgt_utils.change_wt_disk_status.assert_has_calls(
+ [mock.call(fake_volume['name'], enabled=False),
+ mock.call(fake_volume['name'], enabled=True)])
+
+ @mock.patch.object(windows.uuidutils, 'generate_uuid')
+ def test_temporary_snapshot(self, mock_generate_uuid):
+ tgt_utils = self._driver._tgt_utils
+ mock_generate_uuid.return_value = mock.sentinel.snap_uuid
+ expected_snap_name = '%s-tmp-snapshot-%s' % (
+ mock.sentinel.volume_name, mock.sentinel.snap_uuid)
+
+ with self._driver._temporary_snapshot(
+ mock.sentinel.volume_name) as snap_name:
+ self.assertEqual(expected_snap_name, snap_name)
+ tgt_utils.create_snapshot.assert_called_once_with(
+ mock.sentinel.volume_name, expected_snap_name)
+
+ tgt_utils.delete_snapshot.assert_called_once_with(
+ expected_snap_name)
+
+ @mock.patch.object(windows.WindowsDriver, '_temporary_snapshot')
+ @mock.patch.object(image_utils, 'upload_volume')
+ @mock.patch.object(fileutils, 'delete_if_exists')
+ def test_copy_volume_to_image(self, mock_delete_if_exists,
+ mock_upload_volume,
+ mock_tmp_snap):
+ tgt_utils = self._driver._tgt_utils
+
+ disk_format = 'vhd'
+ fake_image_meta = db_fakes.get_fake_image_meta()
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_img_conv_dir = 'fake_img_conv_dir'
+ self._driver.configuration.image_conversion_dir = fake_img_conv_dir
+
+ tgt_utils.get_supported_disk_format.return_value = disk_format
+ mock_tmp_snap.return_value.__enter__.return_value = (
+ mock.sentinel.tmp_snap_name)
+
+ expected_tmp_vhd_path = os.path.join(
+ fake_img_conv_dir,
+ fake_image_meta['id'] + '.' + disk_format)
+
+ self._driver.copy_volume_to_image(
+ mock.sentinel.context, fake_volume,
+ mock.sentinel.image_service,
+ fake_image_meta)
+
+ mock_tmp_snap.assert_called_once_with(fake_volume['name'])
+ tgt_utils.export_snapshot.assert_called_once_with(
+ mock.sentinel.tmp_snap_name,
+ expected_tmp_vhd_path)
+ mock_upload_volume.assert_called_once_with(
+ mock.sentinel.context, mock.sentinel.image_service,
+ fake_image_meta, expected_tmp_vhd_path, 'vhd')
+ mock_delete_if_exists.assert_called_once_with(
+ expected_tmp_vhd_path)
+
+ @mock.patch.object(windows.WindowsDriver, '_temporary_snapshot')
+ @mock.patch.object(windows.WindowsDriver, 'local_path')
+ def test_create_cloned_volume(self, mock_local_path,
+ mock_tmp_snap):
+ tgt_utils = self._driver._tgt_utils
+
+ fake_volume = db_fakes.get_fake_volume_info()
+ fake_src_volume = db_fakes.get_fake_volume_info_cloned()
+
+ mock_tmp_snap.return_value.__enter__.return_value = (
+ mock.sentinel.tmp_snap_name)
+ mock_local_path.return_value = mock.sentinel.vol_vhd_path
+
+ self._driver.create_cloned_volume(fake_volume, fake_src_volume)
+
+ mock_tmp_snap.assert_called_once_with(fake_src_volume['name'])
+ tgt_utils.export_snapshot.assert_called_once_with(
+ mock.sentinel.tmp_snap_name,
+ mock.sentinel.vol_vhd_path)
+ self._driver._vhdutils.resize_vhd.assert_called_once_with(
+ mock.sentinel.vol_vhd_path, fake_volume['size'] * units.Gi,
+ is_file_max_size=False)
+ tgt_utils.import_wt_disk.assert_called_once_with(
+ mock.sentinel.vol_vhd_path, fake_volume['name'])
+
+ @mock.patch('os.path.splitdrive')
+ def test_get_capacity_info(self, mock_splitdrive):
+ mock_splitdrive.return_value = (mock.sentinel.drive,
+ mock.sentinel.path_tail)
+ fake_size_gb = 2
+ fake_free_space_gb = 1
+ self._driver._hostutils.get_volume_info.return_value = (
+ fake_size_gb * units.Gi,
+ fake_free_space_gb * units.Gi)
+
+ total_gb, free_gb = self._driver._get_capacity_info()
+
+ self.assertEqual(fake_size_gb, total_gb)
+ self.assertEqual(fake_free_space_gb, free_gb)
+
+ self._driver._hostutils.get_volume_info.assert_called_once_with(
+ mock.sentinel.drive)
+ mock_splitdrive.assert_called_once_with(
+ mock.sentinel.iscsi_lun_path)
+
+ @mock.patch.object(windows.WindowsDriver, '_get_capacity_info')
+ def test_update_volume_stats(self, mock_get_capacity_info):
+ mock_get_capacity_info.return_value = (
+ mock.sentinel.size_gb,
+ mock.sentinel.free_space_gb)
+
+ self.flags(volume_backend_name=mock.sentinel.backend_name)
+ self.flags(reserved_percentage=mock.sentinel.reserved_percentage)
+
+ expected_volume_stats = dict(
+ volume_backend_name=mock.sentinel.backend_name,
+ vendor_name='Microsoft',
+ driver_version=self._driver.VERSION,
+ storage_protocol='iSCSI',
+ total_capacity_gb=mock.sentinel.size_gb,
+ free_capacity_gb=mock.sentinel.free_space_gb,
+ reserved_percentage=mock.sentinel.reserved_percentage,
+ QoS_support=False)
+
+ self._driver._update_volume_stats()
+ self.assertEqual(expected_volume_stats,
+ self._driver._stats)
def test_extend_volume(self):
- drv = self._driver
-
- volume = db_fakes.get_fake_volume_info()
-
- TEST_VOLUME_ADDITIONAL_SIZE_MB = 1024
- TEST_VOLUME_ADDITIONAL_SIZE_GB = 1
-
- self.mox.StubOutWithMock(windows_utils.WindowsUtils, 'extend')
-
- windows_utils.WindowsUtils.extend(volume['name'],
- TEST_VOLUME_ADDITIONAL_SIZE_MB)
-
- new_size = volume['size'] + TEST_VOLUME_ADDITIONAL_SIZE_GB
+ fake_volume = db_fakes.get_fake_volume_info()
+ new_size_gb = 2
+ expected_additional_sz_mb = 1024
- self.mox.ReplayAll()
+ self._driver.extend_volume(fake_volume, new_size_gb)
- drv.extend_volume(volume, new_size)
+ self._driver._tgt_utils.extend_wt_disk.assert_called_once_with(
+ fake_volume['name'], expected_additional_sz_mb)
# License for the specific language governing permissions and limitations
# under the License.
-import ctypes
-import os
-
import mock
from cinder import exception
class WindowsRemoteFsTestCase(test.TestCase):
- _FAKE_SHARE = '//1.2.3.4/share1'
- _FAKE_MNT_BASE = 'C:\OpenStack\mnt'
- _FAKE_HASH = 'db0bf952c1734092b83e8990bd321131'
- _FAKE_MNT_POINT = os.path.join(_FAKE_MNT_BASE, _FAKE_HASH)
- _FAKE_SHARE_OPTS = '-o username=Administrator,password=12345'
- _FAKE_OPTIONS_DICT = {'username': 'Administrator',
- 'password': '12345'}
-
def setUp(self):
super(WindowsRemoteFsTestCase, self).setUp()
- remotefs.ctypes.windll = mock.MagicMock()
- remotefs.WindowsRemoteFsClient.__init__ = mock.Mock(return_value=None)
-
- self._remotefs = remotefs.WindowsRemoteFsClient(
- 'cifs', root_helper=None,
- smbfs_mount_point_base=self._FAKE_MNT_BASE)
- self._remotefs._mount_base = self._FAKE_MNT_BASE
- self._remotefs.smb_conn = mock.MagicMock()
- self._remotefs.conn_cimv2 = mock.MagicMock()
-
- def _test_mount_share(self, mount_point_exists=True, is_symlink=True,
- mount_base_exists=True):
- fake_exists = mock.Mock(return_value=mount_point_exists)
- fake_isdir = mock.Mock(return_value=mount_base_exists)
- fake_makedirs = mock.Mock()
- with mock.patch.multiple('os.path', exists=fake_exists,
- isdir=fake_isdir):
- with mock.patch('os.makedirs', fake_makedirs):
- self._remotefs.is_symlink = mock.Mock(
- return_value=is_symlink)
- self._remotefs.create_sym_link = mock.MagicMock()
- self._remotefs._mount = mock.MagicMock()
- fake_norm_path = os.path.abspath(self._FAKE_SHARE)
-
- if mount_point_exists:
- if not is_symlink:
- self.assertRaises(exception.SmbfsException,
- self._remotefs.mount,
- self._FAKE_MNT_POINT,
- self._FAKE_OPTIONS_DICT)
- else:
- self._remotefs.mount(self._FAKE_SHARE,
- self._FAKE_OPTIONS_DICT)
- if not mount_base_exists:
- fake_makedirs.assert_called_once_with(
- self._FAKE_MNT_BASE)
- self._remotefs._mount.assert_called_once_with(
- fake_norm_path, self._FAKE_OPTIONS_DICT)
- self._remotefs.create_sym_link.assert_called_once_with(
- self._FAKE_MNT_POINT, fake_norm_path)
-
- def test_mount_linked_share(self):
- # The mountpoint contains a symlink targeting the share path
- self._test_mount_share(True)
-
- def test_mount_unlinked_share(self):
- self._test_mount_share(False)
-
- def test_mount_point_exception(self):
- # The mountpoint already exists but it is not a symlink
- self._test_mount_share(True, False)
-
- def test_mount_base_missing(self):
- # The mount point base dir does not exist
- self._test_mount_share(mount_base_exists=False)
-
- def _test_check_symlink(self, is_symlink=True, python_version=(2, 7),
- is_dir=True):
- fake_isdir = mock.Mock(return_value=is_dir)
- fake_islink = mock.Mock(return_value=is_symlink)
- with mock.patch('sys.version_info', python_version):
- with mock.patch.multiple('os.path', isdir=fake_isdir,
- islink=fake_islink):
- if is_symlink:
- ret_value = 0x400
- else:
- ret_value = 0x80
- fake_get_attributes = mock.Mock(return_value=ret_value)
- ctypes.windll.kernel32.GetFileAttributesW = fake_get_attributes
-
- ret_value = self._remotefs.is_symlink(self._FAKE_MNT_POINT)
- if python_version >= (3, 2):
- fake_islink.assert_called_once_with(self._FAKE_MNT_POINT)
- else:
- fake_get_attributes.assert_called_once_with(
- self._FAKE_MNT_POINT)
- self.assertEqual(ret_value, is_symlink)
-
- def test_is_symlink(self):
- self._test_check_symlink()
-
- def test_is_not_symlink(self):
- self._test_check_symlink(False)
-
- def test_check_symlink_python_gt_3_2(self):
- self._test_check_symlink(python_version=(3, 3))
-
- def test_create_sym_link_exception(self):
- ctypes.windll.kernel32.CreateSymbolicLinkW.return_value = 0
- self.assertRaises(exception.VolumeBackendAPIException,
- self._remotefs.create_sym_link,
- self._FAKE_MNT_POINT, self._FAKE_SHARE)
-
- def _test_check_smb_mapping(self, existing_mappings=False,
- share_available=False):
- with mock.patch('os.path.exists', lambda x: share_available):
- fake_mapping = mock.MagicMock()
- if existing_mappings:
- fake_mappings = [fake_mapping]
- else:
- fake_mappings = []
-
- self._remotefs.smb_conn.query.return_value = fake_mappings
- ret_val = self._remotefs.check_smb_mapping(self._FAKE_SHARE)
-
- if existing_mappings:
- if share_available:
- self.assertTrue(ret_val)
- else:
- fake_mapping.Remove.assert_called_once_with(True, True)
- else:
- self.assertFalse(ret_val)
-
- def test_check_mapping(self):
- self._test_check_smb_mapping()
-
- def test_remake_unavailable_mapping(self):
- self._test_check_smb_mapping(True, False)
-
- def test_available_mapping(self):
- self._test_check_smb_mapping(True, True)
-
- def test_mount_smb(self):
- fake_create = self._remotefs.smb_conn.Msft_SmbMapping.Create
- self._remotefs._mount(self._FAKE_SHARE, {})
- fake_create.assert_called_once_with(UserName=None,
- Password=None,
- RemotePath=self._FAKE_SHARE)
+ with mock.patch.object(remotefs.WindowsRemoteFsClient,
+ '__init__', lambda x: None):
+ self._remotefs = remotefs.WindowsRemoteFsClient()
+
+ self._remotefs._mount_base = mock.sentinel.mnt_base
+ self._remotefs._smbutils = mock.Mock()
+ self._remotefs._pathutils = mock.Mock()
+
+ @mock.patch('os.path.isdir')
+ @mock.patch('os.makedirs')
+ @mock.patch('os.path.exists')
+ @mock.patch('os.path.abspath')
+ @mock.patch.object(remotefs.WindowsRemoteFsClient, 'get_mount_point')
+ def _test_mount_share(self, mock_get_mnt_point, mock_abspath,
+ mock_path_exists, mock_makedirs, mock_isdir,
+ mnt_point_exists=False, is_mnt_point_slink=True):
+ mount_options = dict(username=mock.sentinel.username,
+ password=mock.sentinel.password)
+ mock_isdir.return_value = False
+ mock_get_mnt_point.return_value = mock.sentinel.mnt_point
+ mock_abspath.return_value = mock.sentinel.norm_export_path
+ mock_path_exists.return_value = mnt_point_exists
+
+ self._remotefs._pathutils.is_symlink.return_value = is_mnt_point_slink
+ self._remotefs._smbutils.check_smb_mapping.return_value = False
+
+ if mnt_point_exists and not is_mnt_point_slink:
+ self.assertRaises(exception.SmbfsException,
+ self._remotefs.mount,
+ mock.sentinel.export_path,
+ mount_options)
+ else:
+ self._remotefs.mount(mock.sentinel.export_path, mount_options)
+
+ mock_makedirs.assert_called_once_with(mock.sentinel.mnt_base)
+ mock_get_mnt_point.assert_called_once_with(mock.sentinel.export_path)
+ self._remotefs._smbutils.check_smb_mapping.assert_called_once_with(
+ mock.sentinel.norm_export_path, remove_unavailable_mapping=True)
+ self._remotefs._smbutils.mount_smb_share.assert_called_once_with(
+ mock.sentinel.norm_export_path, **mount_options)
+
+ if not mnt_point_exists:
+ self._remotefs._pathutils.create_sym_link.assert_called_once_with(
+ mock.sentinel.mnt_point, mock.sentinel.norm_export_path)
+
+ def test_mount_share(self):
+ self._test_mount_share()
+
+ def test_mount_share_existing_mnt_point_not_symlink(self):
+ self._test_mount_share(mnt_point_exists=True,
+ is_mnt_point_slink=False)
+++ /dev/null
-# Copyright 2015 Cloudbase Solutions Srl
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from cinder import exception
-from cinder import test
-from cinder.volume.drivers.windows import windows_utils
-
-
-class WindowsUtilsTestCase(test.TestCase):
-
- def setUp(self):
- super(WindowsUtilsTestCase, self).setUp()
-
- windows_utils.WindowsUtils.__init__ = lambda x: None
- self.wutils = windows_utils.WindowsUtils()
- self.wutils._conn_wmi = mock.Mock()
- self.wutils._conn_cimv2 = mock.MagicMock()
-
- @mock.patch.object(windows_utils.WindowsUtils, 'get_windows_version')
- def test_check_min_windows_version(self, mock_get_win_version):
- required_win_version = [6, 4]
- actual_win_version = '6.3.0'
- mock_get_win_version.return_value = actual_win_version
-
- self.assertFalse(self.wutils.check_min_windows_version(
- *required_win_version))
-
- def _test_copy_vhd_disk(self, source_exists=True, copy_failed=False):
- fake_data_file_object = mock.MagicMock()
- fake_data_file_object.Copy.return_value = [int(copy_failed)]
-
- fake_vhd_list = [fake_data_file_object] if source_exists else []
- mock_query = mock.Mock(return_value=fake_vhd_list)
- self.wutils._conn_cimv2.query = mock_query
-
- if not source_exists or copy_failed:
- self.assertRaises(exception.VolumeBackendAPIException,
- self.wutils.copy_vhd_disk,
- mock.sentinel.src,
- mock.sentinel.dest)
- else:
- self.wutils.copy_vhd_disk(mock.sentinel.src, mock.sentinel.dest)
-
- expected_query = (
- "Select * from CIM_DataFile where Name = '%s'" %
- mock.sentinel.src)
- mock_query.assert_called_once_with(expected_query)
- fake_data_file_object.Copy.assert_called_with(
- mock.sentinel.dest)
-
- def test_copy_vhd_disk(self):
- self._test_copy_vhd_disk()
-
- def test_copy_vhd_disk_invalid_source(self):
- self._test_copy_vhd_disk(source_exists=False)
-
- def test_copy_vhd_disk_copy_failed(self):
- self._test_copy_vhd_disk(copy_failed=True)
-
- @mock.patch.object(windows_utils, 'wmi', create=True)
- def test_import_wt_disk_exception(self, mock_wmi):
- mock_wmi.x_wmi = Exception
- mock_import_disk = self.wutils._conn_wmi.WT_Disk.ImportWTDisk
- mock_import_disk.side_effect = mock_wmi.x_wmi
-
- self.assertRaises(exception.VolumeBackendAPIException,
- self.wutils.import_wt_disk,
- mock.sentinel.vhd_path,
- mock.sentinel.vol_name)
- mock_import_disk.assert_called_once_with(
- DevicePath=mock.sentinel.vhd_path,
- Description=mock.sentinel.vol_name)
-
- def test_check_if_resize_is_needed_bigger_requested_size(self):
- ret_val = self.wutils.is_resize_needed(
- mock.sentinel.vhd_path, 1, 0)
- self.assertTrue(ret_val)
-
- def test_check_if_resize_is_needed_equal_requested_size(self):
- ret_val = self.wutils.is_resize_needed(
- mock.sentinel.vhd_path, 1, 1)
- self.assertFalse(ret_val)
-
- def test_check_if_resize_is_needed_smaller_requested_size(self):
- self.assertRaises(
- exception.VolumeBackendAPIException,
- self.wutils.is_resize_needed,
- mock.sentinel.vhd_path, 1, 2)
-
- @mock.patch.object(windows_utils.WindowsUtils, '_wmi_obj_set_attr')
- @mock.patch.object(windows_utils, 'wmi', create=True)
- def test_set_chap_credentials(self, mock_wmi, mock_set_attr):
- mock_wt_host = mock.Mock()
- mock_wt_host_class = self.wutils._conn_wmi.WT_Host
- mock_wt_host_class.return_value = [mock_wt_host]
-
- self.wutils.set_chap_credentials(mock.sentinel.target_name,
- mock.sentinel.chap_username,
- mock.sentinel.chap_password)
-
- mock_wt_host_class.assert_called_once_with(
- HostName=mock.sentinel.target_name)
-
- mock_set_attr.assert_has_calls([
- mock.call(mock_wt_host, 'EnableCHAP', True),
- mock.call(mock_wt_host, 'CHAPUserName',
- mock.sentinel.chap_username),
- mock.call(mock_wt_host, 'CHAPSecret',
- mock.sentinel.chap_password)])
-
- mock_wt_host.put.assert_called_once_with()
-
- @mock.patch.object(windows_utils.WindowsUtils, '_wmi_obj_set_attr')
- @mock.patch.object(windows_utils, 'wmi', create=True)
- def test_set_chap_credentials_exc(self, mock_wmi, mock_set_attr):
- mock_wmi.x_wmi = Exception
- mock_set_attr.side_effect = mock_wmi.x_wmi
- self.assertRaises(exception.VolumeBackendAPIException,
- self.wutils.set_chap_credentials,
- mock.sentinel.target_name,
- mock.sentinel.chap_username,
- mock.sentinel.chap_password)
-
- def test_set_wmi_obj_attr(self):
- wmi_obj = mock.Mock()
- wmi_property_method = wmi_obj.wmi_property
- wmi_property = wmi_obj.wmi_property.return_value
-
- self.wutils._wmi_obj_set_attr(wmi_obj,
- mock.sentinel.key,
- mock.sentinel.value)
-
- wmi_property_method.assert_called_once_with(mock.sentinel.key)
- wmi_property.set.assert_called_once_with(mock.sentinel.value)
# License for the specific language governing permissions and limitations
# under the License.
-import ctypes
import os
-import sys
-
-if sys.platform == 'win32':
- import wmi
from os_brick.remotefs import remotefs
-from oslo_log import log as logging
-import six
+from os_win import utilsfactory
from cinder import exception
-from cinder.i18n import _, _LE, _LI
-
-LOG = logging.getLogger(__name__)
+from cinder.i18n import _
class WindowsRemoteFsClient(remotefs.RemoteFsClient):
- _FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
-
def __init__(self, *args, **kwargs):
super(WindowsRemoteFsClient, self).__init__(*args, **kwargs)
- self.smb_conn = wmi.WMI(moniker='root/Microsoft/Windows/SMB')
- self.conn_cimv2 = wmi.WMI(moniker='root/cimv2')
+ self._smbutils = utilsfactory.get_smbutils()
+ self._pathutils = utilsfactory.get_pathutils()
def mount(self, export_path, mnt_options=None):
if not os.path.isdir(self._mount_base):
os.makedirs(self._mount_base)
- export_hash = self._get_hash_str(export_path)
+ mnt_point = self.get_mount_point(export_path)
norm_path = os.path.abspath(export_path)
mnt_options = mnt_options or {}
- if not self.check_smb_mapping(norm_path):
- self._mount(norm_path, mnt_options)
+ username = (mnt_options.get('username') or
+ mnt_options.get('user'))
+ password = (mnt_options.get('password') or
+ mnt_options.get('pass'))
+
+ if not self._smbutils.check_smb_mapping(
+ norm_path,
+ remove_unavailable_mapping=True):
+ self._smbutils.mount_smb_share(norm_path,
+ username=username,
+ password=password)
- link_path = os.path.join(self._mount_base, export_hash)
- if os.path.exists(link_path):
- if not self.is_symlink(link_path):
+ if os.path.exists(mnt_point):
+ if not self._pathutils.is_symlink(mnt_point):
raise exception.SmbfsException(_("Link path already exists "
"and its not a symlink"))
else:
- self.create_sym_link(link_path, norm_path)
-
- def is_symlink(self, path):
- if sys.version_info >= (3, 2):
- return os.path.islink(path)
-
- file_attr = ctypes.windll.kernel32.GetFileAttributesW(
- six.text_type(path))
-
- return bool(os.path.isdir(path) and (
- file_attr & self._FILE_ATTRIBUTE_REPARSE_POINT))
-
- def create_sym_link(self, link, target, target_is_dir=True):
- """If target_is_dir is True, a junction will be created.
-
- NOTE: Juctions only work on same filesystem.
- """
- symlink = ctypes.windll.kernel32.CreateSymbolicLinkW
- symlink.argtypes = (
- ctypes.c_wchar_p,
- ctypes.c_wchar_p,
- ctypes.c_ulong,
- )
- symlink.restype = ctypes.c_ubyte
- retcode = symlink(link, target, target_is_dir)
- if retcode == 0:
- err_msg = (_("Could not create symbolic link. "
- "Link: %(link)s Target %(target)s")
- % {'link': link, 'target': target})
- raise exception.VolumeBackendAPIException(err_msg)
-
- def check_smb_mapping(self, smbfs_share):
- mappings = self.smb_conn.query("SELECT * FROM "
- "MSFT_SmbMapping "
- "WHERE RemotePath='%s'" %
- smbfs_share)
-
- if len(mappings) > 0:
- if os.path.exists(smbfs_share):
- LOG.debug('Share already mounted: %s', smbfs_share)
- return True
- else:
- LOG.debug('Share exists but is unavailable: %s ', smbfs_share)
- for mapping in mappings:
- # Due to a bug in the WMI module, getting the output of
- # methods returning None will raise an AttributeError
- try:
- mapping.Remove(True, True)
- except AttributeError:
- pass
- return False
-
- def _mount(self, smbfs_share, options):
- smb_opts = {'RemotePath': smbfs_share}
- smb_opts['UserName'] = (options.get('username') or
- options.get('user'))
- smb_opts['Password'] = (options.get('password') or
- options.get('pass'))
-
- try:
- LOG.info(_LI('Mounting share: %s'), smbfs_share)
- self.smb_conn.Msft_SmbMapping.Create(**smb_opts)
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'Unable to mount SMBFS share: %(smbfs_share)s '
- 'WMI exception: %(wmi_exc)s'
- 'Options: %(options)s') % {'smbfs_share': smbfs_share,
- 'options': smb_opts,
- 'wmi_exc': six.text_type(exc)})
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def get_capacity_info(self, smbfs_share):
- norm_path = os.path.abspath(smbfs_share)
- kernel32 = ctypes.windll.kernel32
-
- free_bytes = ctypes.c_ulonglong(0)
- total_bytes = ctypes.c_ulonglong(0)
- retcode = kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(norm_path),
- None,
- ctypes.pointer(total_bytes),
- ctypes.pointer(free_bytes))
- if retcode == 0:
- LOG.error(_LE("Could not get share %s capacity info."),
- smbfs_share)
- return 0, 0
- return total_bytes.value, free_bytes.value
+ self._pathutils.create_sym_link(mnt_point, norm_path)
import os
import sys
+from os_win import utilsfactory
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
from cinder.volume.drivers import remotefs as remotefs_drv
from cinder.volume.drivers import smbfs
from cinder.volume.drivers.windows import remotefs
-from cinder.volume.drivers.windows import vhdutils
-from cinder.volume.drivers.windows import windows_utils
VERSION = '1.1.0'
self._remotefsclient = remotefs.WindowsRemoteFsClient(
'cifs', root_helper=None, smbfs_mount_point_base=self.base,
smbfs_mount_options=opts)
- self.vhdutils = vhdutils.VHDUtils()
- self._windows_utils = windows_utils.WindowsUtils()
+
+ self._vhdutils = utilsfactory.get_vhdutils()
+ self._pathutils = utilsfactory.get_pathutils()
+ self._smbutils = utilsfactory.get_smbutils()
def do_setup(self, context):
self._check_os_platform()
err_msg = _("Unsupported volume format: %s ") % volume_format
raise exception.InvalidVolume(err_msg)
- self.vhdutils.create_dynamic_vhd(volume_path, volume_size_bytes)
+ self._vhdutils.create_dynamic_vhd(volume_path, volume_size_bytes)
def _ensure_share_mounted(self, smbfs_share):
mnt_options = {}
:param smbfs_share: example //172.18.194.100/var/smbfs
"""
- total_size, total_available = self._remotefsclient.get_capacity_info(
+ total_size, total_available = self._smbutils.get_share_capacity_info(
smbfs_share)
total_allocated = self._get_total_allocated(smbfs_share)
return_value = [total_size, total_available, total_allocated]
return [float(x) for x in return_value]
def _img_commit(self, snapshot_path):
- self.vhdutils.merge_vhd(snapshot_path)
- self._delete(snapshot_path)
+ self._vhdutils.merge_vhd(snapshot_path)
def _rebase_img(self, image, backing_file, volume_format):
# Relative path names are not supported in this case.
image_dir = os.path.dirname(image)
backing_file_path = os.path.join(image_dir, backing_file)
- self.vhdutils.reconnect_parent(image, backing_file_path)
+ self._vhdutils.reconnect_parent_vhd(image, backing_file_path)
def _qemu_img_info(self, path, volume_name=None):
# This code expects to deal only with relative filenames.
# As this method is needed by the upper class and qemu-img does
# not fully support vhdx images, for the moment we'll use Win32 API
# for retrieving image information.
- parent_path = self.vhdutils.get_vhd_parent_path(path)
+ parent_path = self._vhdutils.get_vhd_parent_path(path)
file_format = os.path.splitext(path)[1][1:].lower()
if parent_path:
backing_file_full_path = os.path.join(
self._local_volume_dir(snapshot['volume']),
backing_file)
- self.vhdutils.create_differencing_vhd(new_snap_path,
- backing_file_full_path)
+ self._vhdutils.create_differencing_vhd(new_snap_path,
+ backing_file_full_path)
def _do_extend_volume(self, volume_path, size_gb, volume_name=None):
- self.vhdutils.resize_vhd(volume_path, size_gb * units.Gi)
+ self._vhdutils.resize_vhd(volume_path, size_gb * units.Gi)
@remotefs_drv.locked_volume_id_operation
def copy_volume_to_image(self, context, volume, image_service, image_meta):
active_file = self.get_active_image_from_info(volume)
active_file_path = os.path.join(self._local_volume_dir(volume),
active_file)
- backing_file = self.vhdutils.get_vhd_parent_path(active_file_path)
+ backing_file = self._vhdutils.get_vhd_parent_path(active_file_path)
root_file_fmt = self.get_volume_format(volume)
temp_path = None
temp_path = os.path.join(self._local_volume_dir(volume),
temp_file_name)
- self.vhdutils.convert_vhd(active_file_path, temp_path)
+ self._vhdutils.convert_vhd(active_file_path, temp_path)
upload_path = temp_path
else:
upload_path = active_file_path
volume_path, volume_format,
self.configuration.volume_dd_blocksize)
- self._extend_vhd_if_needed(self.local_path(volume), volume['size'])
+ self._vhdutils.resize_vhd(self.local_path(volume),
+ volume['size'] * units.Gi)
def _copy_volume_from_snapshot(self, snapshot, volume, volume_size):
"""Copy data from snapshot to destination volume."""
volume_path = self.local_path(volume)
self._delete(volume_path)
- self.vhdutils.convert_vhd(snapshot_path,
- volume_path)
- self._extend_vhd_if_needed(volume_path, volume_size)
-
- def _extend_vhd_if_needed(self, vhd_path, new_size_gb):
- old_size_bytes = self.vhdutils.get_vhd_size(vhd_path)['VirtualSize']
- new_size_bytes = new_size_gb * units.Gi
-
- # This also ensures we're not attempting to shrink the image.
- is_resize_needed = self._windows_utils.is_resize_needed(
- vhd_path, new_size_bytes, old_size_bytes)
- if is_resize_needed:
- self.vhdutils.resize_vhd(vhd_path, new_size_bytes)
+ self._vhdutils.convert_vhd(snapshot_path,
+ volume_path)
+ self._vhdutils.resize_vhd(volume_path, volume_size * units.Gi)
+++ /dev/null
-# Copyright 2014 Cloudbase Solutions Srl
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Utility class for VHD related operations.
-
-Official VHD format specs can be retrieved at:
-http://technet.microsoft.com/en-us/library/bb676673.aspx
-See "Download the Specifications Without Registering"
-
-Official VHDX format specs can be retrieved at:
-http://www.microsoft.com/en-us/download/details.aspx?id=34750
-
-VHD related Win32 API reference:
-http://msdn.microsoft.com/en-us/library/windows/desktop/dd323700.aspx
-"""
-import ctypes
-import os
-import sys
-
-if os.name == 'nt':
- from ctypes import wintypes
- kernel32 = ctypes.windll.kernel32
- virtdisk = ctypes.windll.virtdisk
-
-from oslo_log import log as logging
-
-from cinder import exception
-from cinder.i18n import _
-from cinder.volume.drivers.windows import constants
-
-LOG = logging.getLogger(__name__)
-
-if os.name == 'nt':
- class Win32_GUID(ctypes.Structure):
- _fields_ = [("Data1", wintypes.DWORD),
- ("Data2", wintypes.WORD),
- ("Data3", wintypes.WORD),
- ("Data4", wintypes.BYTE * 8)]
-
- class Win32_VIRTUAL_STORAGE_TYPE(ctypes.Structure):
- _fields_ = [
- ('DeviceId', wintypes.ULONG),
- ('VendorId', Win32_GUID)
- ]
-
- class Win32_RESIZE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('NewSize', ctypes.c_ulonglong)
- ]
-
- class Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('RWDepth', ctypes.c_ulong),
- ]
-
- class Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('GetInfoOnly', wintypes.BOOL),
- ('ReadOnly', wintypes.BOOL),
- ('ResiliencyGuid', Win32_GUID)
- ]
-
- class Win32_MERGE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('MergeDepth', ctypes.c_ulong)
- ]
-
- class Win32_CREATE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('UniqueId', Win32_GUID),
- ('MaximumSize', ctypes.c_ulonglong),
- ('BlockSizeInBytes', wintypes.ULONG),
- ('SectorSizeInBytes', wintypes.ULONG),
- ('PhysicalSectorSizeInBytes', wintypes.ULONG),
- ('ParentPath', wintypes.LPCWSTR),
- ('SourcePath', wintypes.LPCWSTR),
- ('OpenFlags', wintypes.DWORD),
- ('ParentVirtualStorageType', Win32_VIRTUAL_STORAGE_TYPE),
- ('SourceVirtualStorageType', Win32_VIRTUAL_STORAGE_TYPE),
- ('ResiliencyGuid', Win32_GUID)
- ]
-
- class Win32_SIZE(ctypes.Structure):
- _fields_ = [("VirtualSize", wintypes.ULARGE_INTEGER),
- ("PhysicalSize", wintypes.ULARGE_INTEGER),
- ("BlockSize", wintypes.ULONG),
- ("SectorSize", wintypes.ULONG)]
-
- class Win32_PARENT_LOCATION(ctypes.Structure):
- _fields_ = [('ParentResolved', wintypes.BOOL),
- ('ParentLocationBuffer', wintypes.WCHAR * 512)]
-
- class Win32_PHYSICAL_DISK(ctypes.Structure):
- _fields_ = [("LogicalSectorSize", wintypes.ULONG),
- ("PhysicalSectorSize", wintypes.ULONG),
- ("IsRemote", wintypes.BOOL)]
-
- class Win32_VHD_INFO(ctypes.Union):
- _fields_ = [("Size", Win32_SIZE),
- ("Identifier", Win32_GUID),
- ("ParentLocation", Win32_PARENT_LOCATION),
- ("ParentIdentifier", Win32_GUID),
- ("ParentTimestamp", wintypes.ULONG),
- ("VirtualStorageType", Win32_VIRTUAL_STORAGE_TYPE),
- ("ProviderSubtype", wintypes.ULONG),
- ("Is4kAligned", wintypes.BOOL),
- ("PhysicalDisk", Win32_PHYSICAL_DISK),
- ("VhdPhysicalSectorSize", wintypes.ULONG),
- ("SmallestSafeVirtualSize",
- wintypes.ULARGE_INTEGER),
- ("FragmentationPercentage", wintypes.ULONG)]
-
- class Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS(ctypes.Structure):
- _fields_ = [("VERSION", wintypes.UINT),
- ("VhdInfo", Win32_VHD_INFO)]
-
- class Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS(ctypes.Structure):
- _fields_ = [
- ('Version', wintypes.DWORD),
- ('ParentFilePath', wintypes.LPCWSTR)
- ]
-
-
-VIRTUAL_STORAGE_TYPE_DEVICE_ISO = 1
-VIRTUAL_STORAGE_TYPE_DEVICE_VHD = 2
-VIRTUAL_STORAGE_TYPE_DEVICE_VHDX = 3
-VIRTUAL_DISK_ACCESS_NONE = 0
-VIRTUAL_DISK_ACCESS_ALL = 0x003f0000
-VIRTUAL_DISK_ACCESS_CREATE = 0x00100000
-VIRTUAL_DISK_ACCESS_GET_INFO = 0x80000
-OPEN_VIRTUAL_DISK_FLAG_NONE = 0
-OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS = 1
-OPEN_VIRTUAL_DISK_VERSION_1 = 1
-OPEN_VIRTUAL_DISK_VERSION_2 = 2
-RESIZE_VIRTUAL_DISK_FLAG_NONE = 0
-RESIZE_VIRTUAL_DISK_VERSION_1 = 1
-CREATE_VIRTUAL_DISK_VERSION_2 = 2
-CREATE_VHD_PARAMS_DEFAULT_BLOCK_SIZE = 0
-CREATE_VIRTUAL_DISK_FLAG_NONE = 0
-CREATE_VIRTUAL_DISK_FLAG_FULL_PHYSICAL_ALLOCATION = 1
-MERGE_VIRTUAL_DISK_VERSION_1 = 1
-MERGE_VIRTUAL_DISK_FLAG_NONE = 0x00000000
-GET_VIRTUAL_DISK_INFO_SIZE = 1
-GET_VIRTUAL_DISK_INFO_PARENT_LOCATION = 3
-GET_VIRTUAL_DISK_INFO_VIRTUAL_STORAGE_TYPE = 6
-GET_VIRTUAL_DISK_INFO_PROVIDER_SUBTYPE = 7
-SET_VIRTUAL_DISK_INFO_PARENT_PATH = 1
-
-FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
-FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
-FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
-
-ERROR_VHD_INVALID_TYPE = 0xC03A001B
-
-
-class VHDUtils(object):
-
- def __init__(self):
- self._ext_device_id_map = {
- 'vhd': VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
- 'vhdx': VIRTUAL_STORAGE_TYPE_DEVICE_VHDX}
- self.create_virtual_disk_flags = {
- constants.VHD_TYPE_FIXED: (
- CREATE_VIRTUAL_DISK_FLAG_FULL_PHYSICAL_ALLOCATION),
- constants.VHD_TYPE_DYNAMIC: CREATE_VIRTUAL_DISK_FLAG_NONE
- }
- self._vhd_info_members = {
- GET_VIRTUAL_DISK_INFO_SIZE: 'Size',
- GET_VIRTUAL_DISK_INFO_PARENT_LOCATION: 'ParentLocation',
- GET_VIRTUAL_DISK_INFO_VIRTUAL_STORAGE_TYPE:
- 'VirtualStorageType',
- GET_VIRTUAL_DISK_INFO_PROVIDER_SUBTYPE: 'ProviderSubtype',
- }
-
- if os.name == 'nt':
- self._msft_vendor_id = (
- self.get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MSFT())
-
- def _run_and_check_output(self, func, *args, **kwargs):
- """Convenience helper method for running Win32 API methods."""
- ignored_error_codes = kwargs.pop('ignored_error_codes', [])
-
- ret_val = func(*args, **kwargs)
-
- # The VHD Win32 API functions return non-zero error codes
- # in case of failure.
- if ret_val and ret_val not in ignored_error_codes:
- error_message = self._get_error_message(ret_val)
- func_name = getattr(func, '__name__', '')
- err = (_("Executing Win32 API function %(func_name)s failed. "
- "Error code: %(error_code)s. "
- "Error message: %(error_message)s") %
- {'func_name': func_name,
- 'error_code': ret_val,
- 'error_message': error_message})
- LOG.error(err, exc_info=(sys.exc_info() is not None))
- raise exception.VolumeBackendAPIException(err)
-
- @staticmethod
- def _get_error_message(error_code):
- message_buffer = ctypes.c_char_p()
-
- kernel32.FormatMessageA(
- FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER |
- FORMAT_MESSAGE_IGNORE_INSERTS,
- None, error_code, 0, ctypes.byref(message_buffer), 0, None)
-
- error_message = message_buffer.value
- kernel32.LocalFree(message_buffer)
- return error_message
-
- @staticmethod
- def get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MSFT():
- guid = Win32_GUID()
- guid.Data1 = 0xec984aec
- guid.Data2 = 0xa0f9
- guid.Data3 = 0x47e9
- ByteArray8 = wintypes.BYTE * 8
- guid.Data4 = ByteArray8(0x90, 0x1f, 0x71, 0x41, 0x5a, 0x66, 0x34, 0x5b)
- return guid
-
- def _open(self, vhd_path, open_flag=OPEN_VIRTUAL_DISK_FLAG_NONE,
- open_access_mask=VIRTUAL_DISK_ACCESS_ALL,
- open_params=0):
- device_id = self._get_device_id_by_path(vhd_path)
-
- vst = Win32_VIRTUAL_STORAGE_TYPE()
- vst.DeviceId = device_id
- vst.VendorId = self._msft_vendor_id
-
- handle = wintypes.HANDLE()
-
- self._run_and_check_output(virtdisk.OpenVirtualDisk,
- ctypes.byref(vst),
- ctypes.c_wchar_p(vhd_path),
- open_access_mask,
- open_flag,
- open_params,
- ctypes.byref(handle))
- return handle
-
- def _close(self, handle):
- kernel32.CloseHandle(handle)
-
- def _get_device_id_by_path(self, vhd_path):
- ext = os.path.splitext(vhd_path)[1][1:].lower()
- device_id = self._ext_device_id_map.get(ext)
- if not device_id:
- raise exception.VolumeBackendAPIException(
- _("Unsupported virtual disk extension: %s") % ext)
- return device_id
-
- def resize_vhd(self, vhd_path, new_max_size):
- handle = self._open(vhd_path)
-
- params = Win32_RESIZE_VIRTUAL_DISK_PARAMETERS()
- params.Version = RESIZE_VIRTUAL_DISK_VERSION_1
- params.NewSize = new_max_size
-
- try:
- self._run_and_check_output(virtdisk.ResizeVirtualDisk,
- handle,
- RESIZE_VIRTUAL_DISK_FLAG_NONE,
- ctypes.byref(params),
- None)
- finally:
- self._close(handle)
-
- def merge_vhd(self, vhd_path):
- open_params = Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1()
- open_params.Version = OPEN_VIRTUAL_DISK_VERSION_1
- open_params.RWDepth = 2
-
- handle = self._open(vhd_path,
- open_params=ctypes.byref(open_params))
-
- params = Win32_MERGE_VIRTUAL_DISK_PARAMETERS()
- params.Version = MERGE_VIRTUAL_DISK_VERSION_1
- params.MergeDepth = 1
-
- try:
- self._run_and_check_output(virtdisk.MergeVirtualDisk,
- handle,
- MERGE_VIRTUAL_DISK_FLAG_NONE,
- ctypes.byref(params),
- None)
- finally:
- self._close(handle)
-
- def _create_vhd(self, new_vhd_path, new_vhd_type, src_path=None,
- max_internal_size=0, parent_path=None):
- new_device_id = self._get_device_id_by_path(new_vhd_path)
-
- vst = Win32_VIRTUAL_STORAGE_TYPE()
- vst.DeviceId = new_device_id
- vst.VendorId = self._msft_vendor_id
-
- params = Win32_CREATE_VIRTUAL_DISK_PARAMETERS()
- params.Version = CREATE_VIRTUAL_DISK_VERSION_2
- params.UniqueId = Win32_GUID()
- params.BlockSizeInBytes = CREATE_VHD_PARAMS_DEFAULT_BLOCK_SIZE
- params.SectorSizeInBytes = 0x200
- params.PhysicalSectorSizeInBytes = 0x200
- params.OpenFlags = OPEN_VIRTUAL_DISK_FLAG_NONE
- params.ResiliencyGuid = Win32_GUID()
- params.MaximumSize = max_internal_size
- params.ParentPath = parent_path
- params.ParentVirtualStorageType = Win32_VIRTUAL_STORAGE_TYPE()
-
- if src_path:
- src_device_id = self._get_device_id_by_path(src_path)
- params.SourcePath = src_path
- params.SourceVirtualStorageType = Win32_VIRTUAL_STORAGE_TYPE()
- params.SourceVirtualStorageType.DeviceId = src_device_id
- params.SourceVirtualStorageType.VendorId = self._msft_vendor_id
-
- handle = wintypes.HANDLE()
- create_virtual_disk_flag = self.create_virtual_disk_flags.get(
- new_vhd_type)
-
- try:
- self._run_and_check_output(virtdisk.CreateVirtualDisk,
- ctypes.byref(vst),
- ctypes.c_wchar_p(new_vhd_path),
- VIRTUAL_DISK_ACCESS_NONE,
- None,
- create_virtual_disk_flag,
- 0,
- ctypes.byref(params),
- None,
- ctypes.byref(handle))
- finally:
- self._close(handle)
-
- def get_vhd_info(self, vhd_path, info_members=None):
- vhd_info = {}
- info_members = info_members or self._vhd_info_members
-
- handle = self._open(vhd_path,
- open_access_mask=VIRTUAL_DISK_ACCESS_GET_INFO)
-
- try:
- for member in info_members:
- info = self._get_vhd_info_member(handle, member)
- vhd_info.update(info)
- finally:
- self._close(handle)
-
- return vhd_info
-
- def _get_vhd_info_member(self, vhd_file, info_member):
- virt_disk_info = Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS()
- virt_disk_info.VERSION = ctypes.c_uint(info_member)
-
- infoSize = ctypes.sizeof(virt_disk_info)
-
- virtdisk.GetVirtualDiskInformation.restype = wintypes.DWORD
-
- # Note(lpetrut): If the vhd has no parent image, this will
- # return an error. No need to raise an exception in this case.
- ignored_error_codes = []
- if info_member == GET_VIRTUAL_DISK_INFO_PARENT_LOCATION:
- ignored_error_codes.append(ERROR_VHD_INVALID_TYPE)
-
- self._run_and_check_output(virtdisk.GetVirtualDiskInformation,
- vhd_file,
- ctypes.byref(ctypes.c_ulong(infoSize)),
- ctypes.byref(virt_disk_info),
- 0,
- ignored_error_codes=ignored_error_codes)
-
- return self._parse_vhd_info(virt_disk_info, info_member)
-
- def _parse_vhd_info(self, virt_disk_info, info_member):
- vhd_info = {}
- vhd_info_member = self._vhd_info_members[info_member]
- info = getattr(virt_disk_info.VhdInfo, vhd_info_member)
-
- if hasattr(info, '_fields_'):
- for field in info._fields_:
- vhd_info[field[0]] = getattr(info, field[0])
- else:
- vhd_info[vhd_info_member] = info
-
- return vhd_info
-
- def get_vhd_size(self, vhd_path):
- """Return vhd size.
-
- Returns a dict containing the virtual size, physical size,
- block size and sector size of the vhd.
- """
- size = self.get_vhd_info(vhd_path,
- [GET_VIRTUAL_DISK_INFO_SIZE])
- return size
-
- def get_vhd_parent_path(self, vhd_path):
- vhd_info = self.get_vhd_info(vhd_path,
- [GET_VIRTUAL_DISK_INFO_PARENT_LOCATION])
- parent_path = vhd_info['ParentLocationBuffer']
-
- if len(parent_path) > 0:
- return parent_path
- return None
-
- def create_dynamic_vhd(self, path, max_internal_size):
- self._create_vhd(path,
- constants.VHD_TYPE_DYNAMIC,
- max_internal_size=max_internal_size)
-
- def convert_vhd(self, src, dest,
- vhd_type=constants.VHD_TYPE_DYNAMIC):
- self._create_vhd(dest, vhd_type, src_path=src)
-
- def create_differencing_vhd(self, path, parent_path):
- self._create_vhd(path,
- constants.VHD_TYPE_DIFFERENCING,
- parent_path=parent_path)
-
- def reconnect_parent(self, child_path, parent_path):
- open_params = Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2()
- open_params.Version = OPEN_VIRTUAL_DISK_VERSION_2
- open_params.GetInfoOnly = False
-
- handle = self._open(
- child_path,
- open_flag=OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS,
- open_access_mask=VIRTUAL_DISK_ACCESS_NONE,
- open_params=ctypes.byref(open_params))
-
- params = Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS()
- params.Version = SET_VIRTUAL_DISK_INFO_PARENT_PATH
- params.ParentFilePath = parent_path
-
- try:
- self._run_and_check_output(virtdisk.SetVirtualDiskInformation,
- handle,
- ctypes.byref(params))
- finally:
- self._close(handle)
"""
+import contextlib
import os
+from os_win import utilsfactory
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
+from oslo_utils import units
+from oslo_utils import uuidutils
from cinder.image import image_utils
from cinder.volume import driver
-from cinder.volume.drivers.windows import constants
-from cinder.volume.drivers.windows import vhdutils
-from cinder.volume.drivers.windows import windows_utils
from cinder.volume import utils
LOG = logging.getLogger(__name__)
if self.configuration:
self.configuration.append_config_values(windows_opts)
+ self._vhdutils = utilsfactory.get_vhdutils()
+ self._tgt_utils = utilsfactory.get_iscsi_target_utils()
+ self._hostutils = utilsfactory.get_hostutils()
+
def do_setup(self, context):
"""Setup the Windows Volume driver.
Called one time by the manager after the driver is loaded.
Validate the flags we care about
"""
- self.utils = windows_utils.WindowsUtils()
- self.vhdutils = vhdutils.VHDUtils()
+ fileutils.ensure_tree(self.configuration.windows_iscsi_lun_path)
+ fileutils.ensure_tree(CONF.image_conversion_dir)
def check_for_setup_error(self):
"""Check that the driver is working and can communicate."""
- self.utils.check_for_setup_error()
+ self._tgt_utils.get_portal_locations(available_only=True,
+ fail_if_none_found=True)
+
+ def _get_host_information(self, volume):
+ """Getting the portal and port information."""
+ # TODO(lpetrut): properly handle multiple existing portals, also
+ # use the iSCSI traffic addresses config options.
+ target_name = self._get_target_name(volume)
+
+ available_portal_location = self._tgt_utils.get_portal_locations()[0]
+ properties = self._tgt_utils.get_target_information(target_name)
+
+ # Note(lpetrut): the WT_Host CHAPSecret field cannot be accessed
+ # for security reasons.
+ auth = volume['provider_auth']
+ if auth:
+ (auth_method, auth_username, auth_secret) = auth.split()
+ properties['auth_method'] = auth_method
+ properties['auth_username'] = auth_username
+ properties['auth_password'] = auth_secret
+
+ properties['target_discovered'] = False
+ properties['target_portal'] = available_portal_location
+ properties['target_lun'] = 0
+ properties['volume_id'] = volume['id']
+
+ return properties
def initialize_connection(self, volume, connector):
"""Driver entry point to attach a volume to an instance."""
initiator_name = connector['initiator']
target_name = volume['provider_location']
- self.utils.associate_initiator_with_iscsi_target(initiator_name,
- target_name)
+ self._tgt_utils.associate_initiator_with_iscsi_target(initiator_name,
+ target_name)
- properties = self.utils.get_host_information(volume, target_name)
+ properties = self._get_host_information(volume)
return {
'driver_volume_type': 'iscsi',
"""
initiator_name = connector['initiator']
target_name = volume['provider_location']
- self.utils.delete_iscsi_target(initiator_name, target_name)
+ self._tgt_utils.deassociate_initiator(initiator_name, target_name)
def create_volume(self, volume):
"""Driver entry point for creating a new volume."""
vhd_path = self.local_path(volume)
vol_name = volume['name']
- vol_size = volume['size']
+ vol_size_mb = volume['size'] * 1024
- self.utils.create_volume(vhd_path, vol_name, vol_size)
+ self._tgt_utils.create_wt_disk(vhd_path, vol_name,
+ size_mb=vol_size_mb)
- def local_path(self, volume, format=None):
- return self.utils.local_path(volume, format)
+ def local_path(self, volume, disk_format=None):
+ base_vhd_folder = self.configuration.windows_iscsi_lun_path
+ if not disk_format:
+ disk_format = self._tgt_utils.get_supported_disk_format()
+
+ disk_fname = "%s.%s" % (volume['name'], disk_format)
+ return os.path.join(base_vhd_folder, disk_fname)
def delete_volume(self, volume):
"""Driver entry point for destroying existing volumes."""
vol_name = volume['name']
vhd_path = self.local_path(volume)
- self.utils.delete_volume(vol_name, vhd_path)
+ self._tgt_utils.remove_wt_disk(vol_name)
+ fileutils.delete_if_exists(vhd_path)
def create_snapshot(self, snapshot):
"""Driver entry point for creating a snapshot."""
vol_name = snapshot['volume_name']
snapshot_name = snapshot['name']
- self.utils.create_snapshot(vol_name, snapshot_name)
+ self._tgt_utils.create_snapshot(vol_name, snapshot_name)
def create_volume_from_snapshot(self, volume, snapshot):
"""Driver entry point for exporting snapshots as volumes."""
snapshot_name = snapshot['name']
- self.utils.create_volume_from_snapshot(volume, snapshot_name)
+ vol_name = volume['name']
+ vhd_path = self.local_path(volume)
+
+ self._tgt_utils.export_snapshot(snapshot_name, vhd_path)
+ self._tgt_utils.import_wt_disk(vhd_path, vol_name)
def delete_snapshot(self, snapshot):
"""Driver entry point for deleting a snapshot."""
snapshot_name = snapshot['name']
- self.utils.delete_snapshot(snapshot_name)
+ self._tgt_utils.delete_snapshot(snapshot_name)
def ensure_export(self, context, volume):
# iSCSI targets exported by WinTarget persist after host reboot.
pass
+ def _get_target_name(self, volume):
+ return "%s%s" % (self.configuration.iscsi_target_prefix,
+ volume['name'])
+
def create_export(self, context, volume, connector):
"""Driver entry point to get the export info for a new volume."""
- # Since the iSCSI targets are not reused, being deleted when the
- # volume is detached, we should clean up existing targets before
- # creating a new one.
- self.remove_export(context, volume)
-
- target_name = "%s%s" % (self.configuration.iscsi_target_prefix,
- volume['name'])
- updates = {'provider_location': target_name}
- self.utils.create_iscsi_target(target_name)
-
- if self.configuration.use_chap_auth:
- chap_username = (self.configuration.chap_username or
- utils.generate_username())
- chap_password = (self.configuration.chap_password or
- utils.generate_password())
-
- self.utils.set_chap_credentials(target_name,
- chap_username,
- chap_password)
-
- updates['provider_auth'] = ' '.join(('CHAP',
- chap_username,
- chap_password))
- # Get the disk to add
- vol_name = volume['name']
- self.utils.add_disk_to_target(vol_name, target_name)
+ target_name = self._get_target_name(volume)
+ updates = {}
+
+ if not self._tgt_utils.iscsi_target_exists(target_name):
+ self._tgt_utils.create_iscsi_target(target_name)
+ updates['provider_location'] = target_name
+
+ if self.configuration.use_chap_auth:
+ chap_username = (self.configuration.chap_username or
+ utils.generate_username())
+ chap_password = (self.configuration.chap_password or
+ utils.generate_password())
+
+ self._tgt_utils.set_chap_credentials(target_name,
+ chap_username,
+ chap_password)
+
+ updates['provider_auth'] = ' '.join(('CHAP',
+ chap_username,
+ chap_password))
+
+ # This operation is idempotent
+ self._tgt_utils.add_disk_to_target(volume['name'], target_name)
return updates
def remove_export(self, context, volume):
"""Driver entry point to remove an export for a volume."""
- target_name = "%s%s" % (self.configuration.iscsi_target_prefix,
- volume['name'])
-
- self.utils.remove_iscsi_target(target_name)
+ target_name = self._get_target_name(volume)
+ self._tgt_utils.delete_iscsi_target(target_name)
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and create a volume using it."""
# Convert to VHD and file back to VHD
- vhd_type = self.utils.get_supported_vhd_type()
+ vhd_type = self._tgt_utils.get_supported_vhd_type()
with image_utils.temporary_file(suffix='.vhd') as tmp:
volume_path = self.local_path(volume)
image_utils.fetch_to_vhd(context, image_service, image_id, tmp,
self.configuration.volume_dd_blocksize)
# The vhd must be disabled and deleted before being replaced with
# the desired image.
- self.utils.change_disk_status(volume['name'], False)
+ self._tgt_utils.change_wt_disk_status(volume['name'],
+ enabled=False)
os.unlink(volume_path)
- self.vhdutils.convert_vhd(tmp, volume_path,
- vhd_type)
- self.vhdutils.resize_vhd(volume_path,
- volume['size'] << 30)
- self.utils.change_disk_status(volume['name'], True)
+ self._vhdutils.convert_vhd(tmp, volume_path,
+ vhd_type)
+ self._vhdutils.resize_vhd(volume_path,
+ volume['size'] << 30,
+ is_file_max_size=False)
+ self._tgt_utils.change_wt_disk_status(volume['name'],
+ enabled=True)
+
+ @contextlib.contextmanager
+ def _temporary_snapshot(self, volume_name):
+ try:
+ snap_uuid = uuidutils.generate_uuid()
+ snapshot_name = '%s-tmp-snapshot-%s' % (volume_name, snap_uuid)
+ self._tgt_utils.create_snapshot(volume_name, snapshot_name)
+ yield snapshot_name
+ finally:
+ self._tgt_utils.delete_snapshot(snapshot_name)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Copy the volume to the specified image."""
- disk_format = self.utils.get_supported_format()
- if not os.path.exists(self.configuration.image_conversion_dir):
- fileutils.ensure_tree(self.configuration.image_conversion_dir)
-
+ disk_format = self._tgt_utils.get_supported_disk_format()
temp_vhd_path = os.path.join(self.configuration.image_conversion_dir,
str(image_meta['id']) + '.' + disk_format)
- upload_image = temp_vhd_path
try:
- self.utils.copy_vhd_disk(self.local_path(volume), temp_vhd_path)
- # qemu-img does not yet fully support vhdx format, so we'll first
- # convert the image to vhd before attempting upload
- if disk_format == 'vhdx':
- upload_image = upload_image[:-1]
- self.vhdutils.convert_vhd(temp_vhd_path, upload_image,
- constants.VHD_TYPE_DYNAMIC)
-
- image_utils.upload_volume(context, image_service, image_meta,
- upload_image, 'vhd')
+ with self._temporary_snapshot(volume['name']) as tmp_snap_name:
+ # qemu-img cannot access VSS snapshots, for which reason it
+ # must be exported first.
+ self._tgt_utils.export_snapshot(tmp_snap_name, temp_vhd_path)
+ image_utils.upload_volume(context, image_service, image_meta,
+ temp_vhd_path, 'vhd')
finally:
fileutils.delete_if_exists(temp_vhd_path)
- fileutils.delete_if_exists(upload_image)
def create_cloned_volume(self, volume, src_vref):
"""Creates a clone of the specified volume."""
+ src_vol_name = src_vref['name']
vol_name = volume['name']
vol_size = volume['size']
- src_vol_size = src_vref['size']
new_vhd_path = self.local_path(volume)
- src_vhd_path = self.local_path(src_vref)
-
- self.utils.copy_vhd_disk(src_vhd_path,
- new_vhd_path)
-
- if self.utils.is_resize_needed(new_vhd_path, vol_size, src_vol_size):
- self.vhdutils.resize_vhd(new_vhd_path, vol_size << 30)
- self.utils.import_wt_disk(new_vhd_path, vol_name)
+ with self._temporary_snapshot(src_vol_name) as tmp_snap_name:
+ self._tgt_utils.export_snapshot(tmp_snap_name, new_vhd_path)
+ self._vhdutils.resize_vhd(new_vhd_path, vol_size << 30,
+ is_file_max_size=False)
- def get_volume_stats(self, refresh=False):
- """Get volume stats.
+ self._tgt_utils.import_wt_disk(new_vhd_path, vol_name)
- If 'refresh' is True, run update the stats first.
- """
- if refresh:
- self._update_volume_stats()
+ def _get_capacity_info(self):
+ drive = os.path.splitdrive(
+ self.configuration.windows_iscsi_lun_path)[0]
+ (size, free_space) = self._hostutils.get_volume_info(drive)
- return self._stats
+ total_gb = size / units.Gi
+ free_gb = free_space / units.Gi
+ return (total_gb, free_gb)
def _update_volume_stats(self):
"""Retrieve stats info for Windows device."""
-
LOG.debug("Updating volume stats")
+ total_gb, free_gb = self._get_capacity_info()
+
data = {}
- backend_name = self.__class__.__name__
- if self.configuration:
- backend_name = self.configuration.safe_get('volume_backend_name')
+ backend_name = self.configuration.safe_get('volume_backend_name')
data["volume_backend_name"] = backend_name or self.__class__.__name__
data["vendor_name"] = 'Microsoft'
data["driver_version"] = self.VERSION
data["storage_protocol"] = 'iSCSI'
- data['total_capacity_gb'] = 'infinite'
- data['free_capacity_gb'] = 'infinite'
- data['reserved_percentage'] = 100
+ data['total_capacity_gb'] = total_gb
+ data['free_capacity_gb'] = free_gb
+ data['reserved_percentage'] = self.configuration.reserved_percentage
data['QoS_support'] = False
+
self._stats = data
def extend_volume(self, volume, new_size):
old_size = volume['size']
LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
{'old_size': old_size, 'new_size': new_size})
- additional_size = (new_size - old_size) * 1024
- self.utils.extend(volume['name'], additional_size)
+ additional_size_mb = (new_size - old_size) * 1024
+
+ self._tgt_utils.extend_wt_disk(volume['name'], additional_size_mb)
+++ /dev/null
-# Copyright 2013 Pedro Navarro Perez
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Utility class for Windows Storage Server 2012 volume related operations.
-"""
-
-import ctypes
-import os
-import sys
-
-from oslo_config import cfg
-from oslo_log import log as logging
-import six
-
-from cinder import exception
-from cinder.i18n import _, _LI
-from cinder.volume.drivers.windows import constants
-
-# Check needed for unit testing on Unix
-if sys.platform == 'win32':
- import wmi
-
- from ctypes import wintypes
-
-CONF = cfg.CONF
-
-LOG = logging.getLogger(__name__)
-
-
-class WindowsUtils(object):
- """Executes volume driver commands on Windows Storage server."""
-
- def __init__(self, *args, **kwargs):
- # Set the flags
- self._conn_wmi = wmi.WMI(moniker='//./root/wmi')
- self._conn_cimv2 = wmi.WMI(moniker='//./root/cimv2')
-
- def check_for_setup_error(self):
- """Check that the driver is working and can communicate.
-
- Invokes the portal and checks that is listening ISCSI traffic.
- """
- try:
- wt_portal = self._conn_wmi.WT_Portal()[0]
- listen = wt_portal.Listen
- except wmi.x_wmi as exc:
- err_msg = (_('check_for_setup_error: the state of the WT Portal '
- 'could not be verified. WMI exception: %s')
- % six.text_type(exc))
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- if not listen:
- err_msg = (_('check_for_setup_error: there is no ISCSI traffic '
- 'listening.'))
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def get_host_information(self, volume, target_name):
- """Getting the portal and port information."""
- try:
- wt_portal = self._conn_wmi.WT_Portal()[0]
- except wmi.x_wmi as exc:
- err_msg = (_('get_host_information: the state of the WT Portal '
- 'could not be verified. WMI exception: %s')
- % six.text_type(exc))
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
- (address, port) = (wt_portal.Address, wt_portal.Port)
- # Getting the host information
- try:
- hosts = self._conn_wmi.WT_Host(Hostname=target_name)
- host = hosts[0]
- except wmi.x_wmi as exc:
- err_msg = (_('get_host_information: the ISCSI target information '
- 'could not be retrieved. WMI exception: %s')
- % six.text_type(exc))
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- properties = {}
- properties['target_discovered'] = False
- properties['target_portal'] = '%s:%s' % (address, port)
- properties['target_iqn'] = host.TargetIQN
- properties['target_lun'] = 0
- properties['volume_id'] = volume['id']
-
- auth = volume['provider_auth']
- if auth:
- (auth_method, auth_username, auth_secret) = auth.split()
-
- properties['auth_method'] = auth_method
- properties['auth_username'] = auth_username
- properties['auth_password'] = auth_secret
-
- return properties
-
- def associate_initiator_with_iscsi_target(self, initiator_name,
- target_name):
- """Sets information used by the iSCSI target entry."""
- try:
- cl = self._conn_wmi.__getattr__("WT_IDMethod")
- wt_idmethod = cl.new()
- wt_idmethod.HostName = target_name
- # Identification method is IQN
- wt_idmethod.Method = 4
- wt_idmethod.Value = initiator_name
- wt_idmethod.put()
- except wmi.x_wmi as exc:
- err_msg = (_('associate_initiator_with_iscsi_target: an '
- 'association between initiator: %(init)s and '
- 'target name: %(target)s could not be established. '
- 'WMI exception: %(wmi_exc)s') %
- {'init': initiator_name, 'target': target_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def delete_iscsi_target(self, initiator_name, target_name):
- """Removes iSCSI targets to hosts."""
-
- try:
- wt_idmethod = self._conn_wmi.WT_IDMethod(HostName=target_name,
- Method=4,
- Value=initiator_name)[0]
- wt_idmethod.Delete_()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'delete_iscsi_target: error when deleting the iscsi target '
- 'associated with target name: %(target)s . WMI '
- 'exception: %(wmi_exc)s') % {'target': target_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def create_volume(self, vhd_path, vol_name, vol_size=None):
- """Creates a volume."""
- try:
- cl = self._conn_wmi.__getattr__("WT_Disk")
- if vol_size:
- size_mb = vol_size * 1024
- else:
- size_mb = None
- cl.NewWTDisk(DevicePath=vhd_path,
- Description=vol_name,
- SizeInMB=size_mb)
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'create_volume: error when creating the volume name: '
- '%(vol_name)s . WMI exception: '
- '%(wmi_exc)s') % {'vol_name': vol_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def import_wt_disk(self, vhd_path, vol_name):
- """Import a vhd/x image to be used by Windows iSCSI targets"""
- try:
- self._conn_wmi.WT_Disk.ImportWTDisk(DevicePath=vhd_path,
- Description=vol_name)
- except wmi.x_wmi as exc:
- err_msg = (_("Failed to import disk: %(vhd_path)s. "
- "WMI exception: %(exc)s") %
- {'vhd_path': vhd_path,
- 'exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def change_disk_status(self, vol_name, enabled):
- try:
- cl = self._conn_wmi.WT_Disk(Description=vol_name)[0]
- cl.Enabled = enabled
- cl.put()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'Error changing disk status: '
- '%(vol_name)s . WMI exception: '
- '%(wmi_exc)s') % {'vol_name': vol_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def delete_volume(self, vol_name, vhd_path):
- """Driver entry point for destroying existing volumes."""
- try:
- disk = self._conn_wmi.WT_Disk(Description=vol_name)
- if not disk:
- LOG.debug('Skipping deleting disk %s as it does not '
- 'exist.', vol_name)
- return
- wt_disk = disk[0]
- wt_disk.Delete_()
- vhdfiles = self._conn_cimv2.query(
- "Select * from CIM_DataFile where Name = '" +
- vhd_path + "'")
- if len(vhdfiles) > 0:
- vhdfiles[0].Delete()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'delete_volume: error when deleting the volume name: '
- '%(vol_name)s . WMI exception: '
- '%(wmi_exc)s') % {'vol_name': vol_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def create_snapshot(self, vol_name, snapshot_name):
- """Driver entry point for creating a snapshot."""
- try:
- wt_disk = self._conn_wmi.WT_Disk(Description=vol_name)[0]
- # API Calls gets Generic Failure
- cl = self._conn_wmi.__getattr__("WT_Snapshot")
- disk_id = wt_disk.WTD
- out = cl.Create(WTD=disk_id)
- # Setting description since it used as a KEY
- wt_snapshot_created = self._conn_wmi.WT_Snapshot(Id=out[0])[0]
- wt_snapshot_created.Description = snapshot_name
- wt_snapshot_created.put()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'create_snapshot: error when creating the snapshot name: '
- '%(vol_name)s . WMI exception: '
- '%(wmi_exc)s') % {'vol_name': snapshot_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def create_volume_from_snapshot(self, volume, snap_name):
- """Driver entry point for exporting snapshots as volumes."""
- try:
- vol_name = volume['name']
- vol_path = self.local_path(volume)
-
- wt_snapshot = self._conn_wmi.WT_Snapshot(Description=snap_name)[0]
- disk_id = wt_snapshot.Export()[0]
- # This export is read-only, so it needs to be copied
- # to another disk.
- wt_disk = self._conn_wmi.WT_Disk(WTD=disk_id)[0]
- wt_disk.Description = '%s-temp' % vol_name
- wt_disk.put()
- src_path = wt_disk.DevicePath
-
- self.copy(src_path, vol_path)
- self.create_volume(vol_path, vol_name)
- wt_disk.Delete_()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'create_volume_from_snapshot: error when creating the volume '
- 'name: %(vol_name)s from snapshot name: %(snap_name)s. WMI '
- 'exception: %(wmi_exc)s') % {'vol_name': vol_name,
- 'snap_name': snap_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def delete_snapshot(self, snap_name):
- """Driver entry point for deleting a snapshot."""
- try:
- wt_snapshot = self._conn_wmi.WT_Snapshot(Description=snap_name)[0]
- wt_snapshot.Delete_()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'delete_snapshot: error when deleting the snapshot name: '
- '%(snap_name)s . WMI exception: '
- '%(wmi_exc)s') % {'snap_name': snap_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def create_iscsi_target(self, target_name):
- """Creates ISCSI target."""
- try:
- self._conn_wmi.WT_Host.NewHost(HostName=target_name)
- except wmi.x_wmi as exc:
- excep_info = exc.com_error.excepinfo[2]
- if excep_info.find(u'The file exists') != -1:
- err_msg = (_(
- 'create_iscsi_target: error when creating iscsi target: '
- '%(tar_name)s . WMI exception: '
- '%(wmi_exc)s') % {'tar_name': target_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
- else:
- LOG.info(_LI('The iSCSI target %(target_name)s already '
- 'exists.'), {'target_name': target_name})
-
- def remove_iscsi_target(self, target_name):
- """Removes ISCSI target."""
- try:
- host = self._conn_wmi.WT_Host(HostName=target_name)
- if not host:
- LOG.debug('Skipping removing target %s as it does not '
- 'exist.', target_name)
- return
- wt_host = host[0]
- wt_host.RemoveAllWTDisks()
- wt_host.Delete_()
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'remove_iscsi_target: error when deleting iscsi target: '
- '%(tar_name)s . WMI exception: '
- '%(wmi_exc)s') % {'tar_name': target_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def set_chap_credentials(self, target_name, chap_username, chap_password):
- try:
- wt_host = self._conn_wmi.WT_Host(HostName=target_name)[0]
- self._wmi_obj_set_attr(wt_host, 'EnableCHAP', True)
- self._wmi_obj_set_attr(wt_host, 'CHAPUserName', chap_username)
- self._wmi_obj_set_attr(wt_host, 'CHAPSecret', chap_password)
- wt_host.put()
- except wmi.x_wmi as exc:
- err_msg = (_('Failed to set CHAP credentials on '
- 'target %(target_name)s. WMI exception: %(wmi_exc)s')
- % {'target_name': target_name,
- 'wmi_exc': exc})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- @staticmethod
- def _wmi_obj_set_attr(wmi_obj, key, value):
- # Due to a bug in the python WMI module, some wmi object attributes
- # cannot be modified. This method is used as a workaround.
- wmi_property = getattr(wmi_obj, 'wmi_property')
- wmi_property(key).set(value)
-
- def add_disk_to_target(self, vol_name, target_name):
- """Adds the disk to the target."""
- try:
- q = self._conn_wmi.WT_Disk(Description=vol_name)
- wt_disk = q[0]
- wt_host = self._conn_wmi.WT_Host(HostName=target_name)[0]
- wt_host.AddWTDisk(wt_disk.WTD)
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'add_disk_to_target: error adding disk associated to volume : '
- '%(vol_name)s to the target name: %(tar_name)s . WMI '
- 'exception: %(wmi_exc)s') % {'tar_name': target_name,
- 'vol_name': vol_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def copy_vhd_disk(self, source_path, destination_path):
- """Copy the vhd disk from source path to destination path."""
- # Note: As WQL is a small subset of SQL which does not allow multiple
- # queries or comments, WQL queries are not exposed to WQL injection.
- vhdfiles = self._conn_cimv2.query(
- "Select * from CIM_DataFile where Name = '%s'" % source_path)
- if len(vhdfiles) > 0:
- ret_val = vhdfiles[0].Copy(destination_path)[0]
- if ret_val:
- err_msg = (
- _('Could not copy virtual disk %(src_path)s '
- 'to %(dest_path)s. Error code: %(error_code)s') %
- {'src_path': source_path,
- 'dest_path': destination_path,
- 'error_code': ret_val})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- else:
- err_msg = (
- _('Could not copy virtual disk %(src_path)s '
- 'to %(dest_path)s. Could not find source path.') %
- {'src_path': source_path,
- 'dest_path': destination_path})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def is_resize_needed(self, vhd_path, new_size, old_size):
- if new_size > old_size:
- return True
- elif old_size > new_size:
- err_msg = (_("Cannot resize image %(vhd_path)s "
- "to a smaller size. "
- "Image size: %(old_size)s, "
- "Requested size: %(new_size)s") %
- {'vhd_path': vhd_path,
- 'old_size': old_size,
- 'new_size': new_size})
- raise exception.VolumeBackendAPIException(data=err_msg)
- return False
-
- def extend(self, vol_name, additional_size):
- """Extend an existing volume."""
- try:
- q = self._conn_wmi.WT_Disk(Description=vol_name)
- wt_disk = q[0]
- wt_disk.Extend(additional_size)
- except wmi.x_wmi as exc:
- err_msg = (_(
- 'extend: error when extending the volume: %(vol_name)s .WMI '
- 'exception: %(wmi_exc)s') % {'vol_name': vol_name,
- 'wmi_exc': six.text_type(exc)})
- LOG.error(err_msg)
- raise exception.VolumeBackendAPIException(data=err_msg)
-
- def local_path(self, volume, format=None):
- base_vhd_folder = CONF.windows_iscsi_lun_path
- if not os.path.exists(base_vhd_folder):
- LOG.debug('Creating folder: %s', base_vhd_folder)
- os.makedirs(base_vhd_folder)
- if not format:
- format = self.get_supported_format()
- return os.path.join(base_vhd_folder, str(volume['name']) + "." +
- format)
-
- def check_min_windows_version(self, major, minor, build=0):
- version_str = self.get_windows_version()
- return list(map(int, version_str.split('.'))) >= [major, minor, build]
-
- def get_windows_version(self):
- return self._conn_cimv2.Win32_OperatingSystem()[0].Version
-
- def get_supported_format(self):
- if self.check_min_windows_version(6, 3):
- return 'vhdx'
- else:
- return 'vhd'
-
- def get_supported_vhd_type(self):
- if self.check_min_windows_version(6, 3):
- return constants.VHD_TYPE_DYNAMIC
- else:
- return constants.VHD_TYPE_FIXED
-
- def copy(self, src, dest):
- # With large files this is 2x-3x faster than shutil.copy(src, dest),
- # especially with UNC targets.
- kernel32 = ctypes.windll.kernel32
- kernel32.CopyFileW.restype = wintypes.BOOL
-
- retcode = kernel32.CopyFileW(ctypes.c_wchar_p(src),
- ctypes.c_wchar_p(dest),
- wintypes.BOOL(True))
- if not retcode:
- raise IOError(_('The file copy from %(src)s to %(dest)s failed.')
- % {'src': src, 'dest': dest})