-# Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
import mock
from oslo_utils import units
+import six
+from cinder import exception
from cinder import test
from cinder.tests.unit import fake_utils
from cinder.volume import configuration as conf
from cinder.volume.drivers.zfssa import restclient as client
+from cinder.volume.drivers.zfssa import webdavclient
from cinder.volume.drivers.zfssa import zfssaiscsi as iscsi
from cinder.volume.drivers.zfssa import zfssanfs
from cinder.volume.drivers.zfssa import zfssarest as rest
nfs_compression = 'off'
-class FakeZFSSA(object):
- """Fake ZFS SA."""
- def __init__(self):
- self.user = None
- self.host = None
-
- def login(self, user):
- self.user = user
-
- def set_host(self, host, timeout=None):
- self.host = host
-
- def create_project(self, pool, project, compression, logbias):
- out = {}
- if not self.host or not self.user:
- return out
-
- out = {"status": "online",
- "name": "pool",
- "usage": {"available": 10,
- "total": 10,
- "dedupratio": 100,
- "used": 1},
- "peer": "00000000-0000-0000-0000-000000000000",
- "owner": "host",
- "asn": "11111111-2222-3333-4444-555555555555"}
- return out
-
- def create_initiator(self, init, initgrp, chapuser, chapsecret):
- out = {}
- if not self.host or not self.user:
- return out
- out = {"href": "fake_href",
- "alias": "fake_alias",
- "initiator": "fake_iqn.1993-08.org.fake:01:000000000000",
- "chapuser": "",
- "chapsecret": ""
- }
-
- return out
-
- def add_to_initiatorgroup(self, init, initgrp):
- r = rest.ZFSSAApi()
- type(r).rclient = mock.PropertyMock(return_value=FakeAddIni2InitGrp())
- r.add_to_initiatorgroup(init, initgrp)
-
- def create_target(self, tgtalias, inter, tchapuser, tchapsecret):
- out = {}
- if not self.host or not self.user:
- return out
- out = {"href": "fake_href",
- "alias": "fake_tgtgrp",
- "iqn": "iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd",
- "auth": "none",
- "targetchapuser": "",
- "targetchapsecret": "",
- "interfaces": ["eth0"]
- }
-
- return out
-
- def add_to_targetgroup(self, iqn, tgtgrp):
- out = {}
- if not self.host or not self.user:
- return {}
- out = {"href": "fake_href",
- "name": "fake_tgtgrp",
- "targets": ["iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd"]
- }
- return out
-
- def get_lun(self, pool, project, lun):
- ret = {
- 'guid': '600144F0F8FBD5BD000053CE53AB0001',
- 'number': 0,
- 'initiatorgroup': 'fake_initgrp',
- 'size': 1 * units.Gi
- }
- return ret
-
- def get_target(self, target):
- return 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
-
- def create_lun(self, pool, project, lun, volsize, targetgroup, specs):
- out = {}
- if not self.host and not self.user:
- return out
-
- out = {
- "status": "online",
- "lunguid": "600144F0F8FBD5BD000053CE53AB0001",
- "initiatorgroup": ["fake_initgrp"],
- "volsize": volsize,
- "pool": pool,
- "name": lun,
- "project": project,
- "targetgroup": targetgroup,
- "lun": {"assignednumber": 0},
- }
- if specs:
- out.update(specs)
-
- return out
-
- def delete_lun(self, pool, project, lun):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"pool": pool,
- "project": project,
- "name": lun}
-
- return out
-
- def create_snapshot(self, pool, project, vol, snap):
- out = {}
- if not self.host and not self.user:
- return {}
- out = {"name": snap,
- "numclones": 0,
- "share": vol,
- "project": project,
- "pool": pool}
-
- return out
-
- def delete_snapshot(self, pool, project, vol, snap):
- out = {}
- if not self.host and not self.user:
- return {}
- out = {"name": snap,
- "share": vol,
- "project": project,
- "pool": pool}
-
- return out
-
- def clone_snapshot(self, pool, project, pvol, snap, vol):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"origin": {"project": project,
- "snapshot": snap,
- "share": pvol,
- "pool": pool},
- "logbias": "latency",
- "assignednumber": 1,
- "status": "online",
- "lunguid": "600144F0F8FBD5BD000053CE67A50002",
- "volsize": 1,
- "pool": pool,
- "name": vol,
- "project": project}
-
- return out
-
- def set_lun_initiatorgroup(self, pool, project, vol, initgrp):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"lunguid": "600144F0F8FBD5BD000053CE67A50002",
- "pool": pool,
- "name": vol,
- "project": project,
- "initiatorgroup": ["fake_initgrp"]}
-
- return out
-
- def has_clones(self, pool, project, vol, snapshot):
- return False
-
- def set_lun_props(self, pool, project, vol, **kargs):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"pool": pool,
- "name": vol,
- "project": project,
- "volsize": kargs['volsize']}
-
- return out
-
- def get_initiator_initiatorgroup(self, initiator):
- ret = ['test-init-grp1']
- return ret
-
-
-class FakeNFSZFSSA(FakeZFSSA):
- """Fake ZFS SA for the NFS Driver."""
- def set_webdav(self, https_path, auth_str):
- self.webdavclient = https_path
-
- def create_share(self, pool, project, share, args):
- out = {}
- if not self.host and not self.user:
- return out
-
- out = {"logbias": nfs_logbias,
- "compression": nfs_compression,
- "status": "online",
- "pool": pool,
- "name": share,
- "project": project,
- "mountpoint": '/export/nfs_share'}
-
- return out
-
- def get_share(self, pool, project, share):
- out = {}
- if not self.host and not self.user:
- return out
-
- out = {"logbias": nfs_logbias,
- "compression": nfs_compression,
- "encryption": "off",
- "status": "online",
- "pool": pool,
- "name": share,
- "project": project,
- "mountpoint": '/export/nfs_share'}
-
- return out
-
- def create_snapshot_of_volume_file(self, src_file="", dst_file=""):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"status": 201}
-
- return out
-
- def delete_snapshot_of_volume_file(self, src_file=""):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"status": 204}
-
- return out
-
- def create_volume_from_snapshot_file(self, src_file="", dst_file="",
- method='COPY'):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"status": 202}
-
- return out
-
- def modify_service(self, service, args):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"service": {"<status>": "online"}}
- return out
-
- def enable_service(self, service):
- out = {}
- if not self.host and not self.user:
- return out
- out = {"service": {"<status>": "online"}}
- return out
+class FakeResponse(object):
+ def __init__(self, statuscode, data='data'):
+ self.status = statuscode
+ self.data = data
+
+
+class FakeSSL(object):
+ def _create_unverified_context(self):
+ return 'fakecontext'
class TestZFSSAISCSIDriver(test.TestCase):
def setUp(self, _factory_zfssa):
super(TestZFSSAISCSIDriver, self).setUp()
self._create_fake_config()
- _factory_zfssa.return_value = FakeZFSSA()
+ _factory_zfssa.return_value = mock.MagicMock(spec=rest.ZFSSAApi)
iscsi.ZFSSAISCSIDriver._execute = fake_utils.fake_execute
self.drv = iscsi.ZFSSAISCSIDriver(configuration=self.configuration)
self.drv.do_setup({})
self.configuration.safe_get = self.fake_safe_get
def test_create_delete_volume(self):
+ self.drv.zfssa.get_lun.return_value = {'guid':
+ '00000000000000000000000000000',
+ 'number': 0,
+ 'initiatorgroup': 'default',
+ 'size': 1,
+ 'nodestroy': False}
+ lcfg = self.configuration
self.drv.create_volume(self.test_vol)
+ self.drv.zfssa.create_lun.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_vol['name'],
+ six.text_type(self.test_vol['size']) + 'g',
+ lcfg.zfssa_target_group,
+ mock.ANY)
self.drv.delete_volume(self.test_vol)
+ self.drv.zfssa.get_lun.assert_called_once_with(lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_vol['name'])
+ self.drv.zfssa.delete_lun.assert_called_once_with(
+ pool=lcfg.zfssa_pool,
+ project=lcfg.zfssa_project,
+ lun=self.test_vol['name'])
def test_create_delete_snapshot(self):
- self.drv.create_volume(self.test_vol)
+ self.drv.zfssa.has_clones.return_value = False
+ lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
+ self.drv.zfssa.create_snapshot.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_snap['volume_name'],
+ self.test_snap['name'])
self.drv.delete_snapshot(self.test_snap)
- self.drv.delete_volume(self.test_vol)
-
- def test_create_volume_from_snapshot(self):
- self.drv.create_volume(self.test_vol)
+ self.drv.zfssa.delete_snapshot.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_snap['volume_name'],
+ self.test_snap['name'])
+
+ @mock.patch.object(iscsi.ZFSSAISCSIDriver, '_verify_clone_size')
+ def test_create_volume_from_snapshot(self, _verify_clone_size):
+ self.drv._verify_clone_size.return_value = True
+ lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
+ self.drv.zfssa.create_snapshot.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_snap['volume_name'],
+ self.test_snap['name'])
self.drv.create_volume_from_snapshot(self.test_vol_snap,
self.test_snap)
- self.drv.delete_volume(self.test_vol)
-
- def test_remove_export(self):
- self.drv.create_volume(self.test_vol)
- self.drv.terminate_connection(self.test_vol, '')
- self.drv.delete_volume(self.test_vol)
-
- def test_volume_attach_detach(self):
- self.drv.create_volume(self.test_vol)
+ self.drv._verify_clone_size.assert_called_once_with(
+ self.test_snap,
+ self.test_vol_snap['size'] * units.Gi)
+ self.drv.zfssa.clone_snapshot.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_snap['volume_name'],
+ self.test_snap['name'],
+ self.test_vol_snap['name'])
+
+ @mock.patch.object(iscsi.ZFSSAISCSIDriver, '_get_provider_info')
+ def test_volume_attach_detach(self, _get_provider_info):
+ lcfg = self.configuration
+ test_target_iqn = 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
+ stub_val = {'provider_location':
+ '%s %s 0' % (lcfg.zfssa_target_portal, test_target_iqn)}
+ self.drv._get_provider_info.return_value = stub_val
connector = dict(initiator='iqn.1-0.org.deb:01:d7')
props = self.drv.initialize_connection(self.test_vol, connector)
+ self.drv._get_provider_info.assert_called_once_with(self.test_vol)
self.assertEqual('iscsi', props['driver_volume_type'])
self.assertEqual(self.test_vol['id'], props['data']['volume_id'])
+ self.assertEqual(lcfg.zfssa_target_portal,
+ props['data']['target_portal'])
+ self.assertEqual(test_target_iqn, props['data']['target_iqn'])
+ self.assertEqual('0', props['data']['target_lun'])
+ self.assertFalse(props['data']['target_discovered'])
self.drv.terminate_connection(self.test_vol, '')
- self.drv.delete_volume(self.test_vol)
+ self.drv.zfssa.set_lun_initiatorgroup.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_vol['name'],
+ '')
def test_get_volume_stats(self):
- self.drv.get_volume_stats(refresh=False)
+ self.drv.zfssa.get_pool_stats.return_value = 2 * units.Gi, 3 * units.Gi
+ lcfg = self.configuration
+ stats = self.drv.get_volume_stats(refresh=True)
+ self.drv.zfssa.get_pool_stats.assert_called_once_with(lcfg.zfssa_pool)
+ self.assertEqual('Oracle', stats['vendor_name'])
+ self.assertEqual(self.configuration.volume_backend_name,
+ stats['volume_backend_name'])
+ self.assertEqual(self.drv.VERSION, stats['driver_version'])
+ self.assertEqual(self.drv.protocol, stats['storage_protocol'])
+ self.assertEqual(0, stats['reserved_percentage'])
+ self.assertFalse(stats['QoS_support'])
+ self.assertEqual(3, stats['total_capacity_gb'])
+ self.assertEqual(2, stats['free_capacity_gb'])
def test_extend_volume(self):
- self.drv.create_volume(self.test_vol)
+ lcfg = self.configuration
self.drv.extend_volume(self.test_vol, 3)
- self.drv.delete_volume(self.test_vol)
+ self.drv.zfssa.set_lun_props.assert_called_once_with(
+ lcfg.zfssa_pool,
+ lcfg.zfssa_project,
+ self.test_vol['name'],
+ volsize= 3 * units.Gi)
@mock.patch('cinder.volume.volume_types.get_volume_type_extra_specs')
def test_get_voltype_specs(self, get_volume_type_extra_specs):
return val
-class FakeAddIni2InitGrp(object):
-
- def logout(self):
- result = client.RestResult()
- result.status = client.Status.ACCEPTED
- return result
-
- def get(self, path, **kwargs):
- result = client.RestResult()
- result.status = client.Status.OK
- result.data = json.JSONEncoder().encode({'group':
- {'initiators':
- ['iqn.1-0.org.deb:01:d7']}})
- return result
-
- def put(self, path, body="", **kwargs):
- result = client.RestResult()
- result.status = client.Status.ACCEPTED
- return result
-
- def post(self, path, body="", **kwargs):
- result = client.RestResult()
- result.status = client.Status.CREATED
- return result
-
- def islogin(self):
- return True
-
-
class TestZFSSANFSDriver(test.TestCase):
test_vol = {
def setUp(self, _factory_zfssa):
super(TestZFSSANFSDriver, self).setUp()
self._create_fake_config()
- _factory_zfssa.return_value = FakeNFSZFSSA()
+ _factory_zfssa.return_value = mock.MagicMock(spec=rest.ZFSSANfsApi)
self.drv = zfssanfs.ZFSSANFSDriver(configuration=self.configuration)
self.drv._execute = fake_utils.fake_execute
self.drv.do_setup({})
self.configuration.nfs_used_ratio = 1
def test_create_delete_snapshot(self):
+ lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
+ self.drv.zfssa.create_snapshot.assert_called_once_with(
+ lcfg.zfssa_nfs_pool,
+ lcfg.zfssa_nfs_project,
+ lcfg.zfssa_nfs_share,
+ mock.ANY)
+ self.drv.zfssa.create_snapshot_of_volume_file.assert_called_once_with(
+ src_file=mock.ANY,
+ dst_file=self.test_snap['name'])
self.drv.delete_snapshot(self.test_snap)
+ self.drv.zfssa.delete_snapshot_of_volume_file.assert_called_with(
+ src_file=self.test_snap['name'])
def test_create_volume_from_snapshot(self):
self.drv.create_snapshot(self.test_snap)
with mock.patch.object(self.drv, '_ensure_shares_mounted'):
- prov_loc = self.drv.create_volume_from_snapshot(self.test_vol_snap,
- self.test_snap,
- method='COPY')
- self.assertEqual('2.2.2.2:/export/nfs_share',
- prov_loc['provider_location'])
+ self.drv.create_volume_from_snapshot(self.test_vol_snap,
+ self.test_snap,
+ method='COPY')
+
+ self.drv.zfssa.create_volume_from_snapshot_file.\
+ assert_called_once_with(src_file=self.test_snap['name'],
+ dst_file=self.test_vol_snap['name'],
+ method='COPY')
def test_get_volume_stats(self):
self.drv._mounted_shares = ['nfs_share']
mock_get_share_capacity_info:
mock_get_share_capacity_info.return_value = (1073741824,
9663676416)
- self.drv.get_volume_stats(refresh=True)
+ stats = self.drv.get_volume_stats(refresh=True)
+ self.assertEqual(1, stats['free_capacity_gb'])
+ self.assertEqual(10, stats['total_capacity_gb'])
def tearDown(self):
super(TestZFSSANFSDriver, self).tearDown()
+
+
+class TestZFSSAApi(test.TestCase):
+
+ @mock.patch.object(rest, 'factory_restclient')
+ def setUp(self, _restclient):
+ super(TestZFSSAApi, self).setUp()
+ self.host = 'fakehost'
+ self.user = 'fakeuser'
+ self.url = None
+ self.pool = 'fakepool'
+ self.project = 'fakeproject'
+ self.vol = 'fakevol'
+ self.snap = 'fakesnapshot'
+ self.clone = 'fakeclone'
+ self.targetalias = 'fakealias'
+ _restclient.return_value = mock.MagicMock(spec=client.RestClientURL)
+ self.zfssa = rest.ZFSSAApi()
+ self.zfssa.set_host('fakehost')
+ self.pool_url = '/api/storage/v1/pools/'
+
+ def _create_response(self, status, data='data'):
+ response = FakeResponse(status, data)
+ return response
+
+ def test_create_project(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK)
+ self.zfssa.create_project(self.pool, self.project)
+ expected_svc = self.pool_url + self.pool + '/projects/' + self.project
+ self.zfssa.rclient.get.assert_called_with(expected_svc)
+
+ def test_create_initiator(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK)
+ initiator = 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
+ alias = 'init-group'
+ self.zfssa.create_initiator(initiator, alias)
+ self.zfssa.rclient.get.assert_called_with(
+ '/api/san/v1/iscsi/initiators/alias=' + alias)
+
+ def test_create_target(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.NOT_FOUND)
+ ret_val = json.dumps(
+ {'target': {'iqn':
+ 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'}})
+ self.zfssa.rclient.post.return_value = self._create_response(
+ client.Status.CREATED, ret_val)
+ alias = 'tgt-group'
+ self.zfssa.create_target(alias)
+ self.zfssa.rclient.post.assert_called_with('/api/san/v1/iscsi/targets',
+ {'alias': alias})
+
+ def test_get_target(self):
+ ret_val = json.dumps(
+ {'target': {'href': 'fake_href',
+ 'alias': 'tgt-group',
+ 'iqn':
+ 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd',
+ 'targetchapuser': '',
+ 'targetchapsecret': '',
+ 'interfaces': ['nge0']}})
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK, ret_val)
+ ret = self.zfssa.get_target('tgt-group')
+ self.zfssa.rclient.get.assert_called_once_with(
+ '/api/san/v1/iscsi/targets/alias=tgt-group')
+ self.assertEqual('iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd',
+ ret)
+
+ def test_verify_pool(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK)
+ self.zfssa.verify_pool(self.pool)
+ self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool)
+
+ def test_verify_project(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.NOT_FOUND)
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.zfssa.verify_project,
+ self.pool,
+ self.project)
+
+ def test_verify_initiator(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK)
+ self.zfssa.verify_initiator('iqn.1-0.org.deb:01:d7')
+ self.zfssa.rclient.get.assert_called_with(
+ '/api/san/v1/iscsi/initiators/iqn.1-0.org.deb:01:d7')
+
+ def test_verify_target(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.BAD_REQUEST)
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.zfssa.verify_target,
+ self.targetalias)
+
+ def test_create_delete_lun(self):
+ arg = json.dumps({'name': self.vol,
+ 'initiatorgroup': 'com.sun.ms.vss.hg.maskAll'})
+ self.zfssa.rclient.post.return_value = self._create_response(
+ client.Status.CREATED, data=arg)
+ self.zfssa.create_lun(self.pool, self.project, self.vol, 1, 'tgt-grp',
+ None)
+ expected_arg = {'name': self.vol,
+ 'volsize': 1,
+ 'targetgroup': 'tgt-grp',
+ 'initiatorgroup': 'com.sun.ms.vss.hg.maskAll'}
+ self.zfssa.rclient.post.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project + '/luns',
+ expected_arg)
+
+ self.zfssa.rclient.delete.return_value = self._create_response(
+ client.Status.NO_CONTENT)
+ self.zfssa.delete_lun(self.pool, self.project, self.vol)
+ self.zfssa.rclient.delete.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project +
+ '/luns/' + self.vol)
+
+ def test_create_delete_snapshot(self):
+ self.zfssa.rclient.post.return_value = self._create_response(
+ client.Status.CREATED)
+ self.zfssa.create_snapshot(self.pool,
+ self.project,
+ self.vol,
+ self.snap)
+ expected_arg = {'name': self.snap}
+ self.zfssa.rclient.post.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project +
+ '/luns/' + self.vol + '/snapshots', expected_arg)
+
+ self.zfssa.rclient.delete.return_value = self._create_response(
+ client.Status.NO_CONTENT)
+ self.zfssa.delete_snapshot(self.pool,
+ self.project,
+ self.vol,
+ self.snap)
+ self.zfssa.rclient.delete.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project +
+ '/luns/' + self.vol + '/snapshots/' + self.snap)
+
+ def test_clone_snapshot(self):
+ self.zfssa.rclient.put.return_value = self._create_response(
+ client.Status.CREATED)
+ self.zfssa.clone_snapshot(self.pool,
+ self.project,
+ self.vol,
+ self.snap,
+ self.clone)
+ expected_svc = '/api/storage/v1/pools/' + self.pool + '/projects/' + \
+ self.project + '/luns/' + self.vol + '/snapshots/' + self.snap + \
+ '/clone'
+ expected_arg = {'project': self.project,
+ 'share': self.clone,
+ 'nodestroy': True}
+ self.zfssa.rclient.put.assert_called_with(expected_svc, expected_arg)
+
+
+class TestZFSSANfsApi(test.TestCase):
+
+ @mock.patch.object(rest, 'factory_restclient')
+ def setUp(self, _restclient):
+ super(TestZFSSANfsApi, self).setUp()
+ self.host = 'fakehost'
+ self.user = 'fakeuser'
+ self.url = None
+ self.pool = 'fakepool'
+ self.project = 'fakeproject'
+ self.share = 'fakeshare'
+ self.snap = 'fakesnapshot'
+ self.targetalias = 'fakealias'
+ _restclient.return_value = mock.MagicMock(spec=client.RestClientURL)
+ self.webdavclient = mock.MagicMock(spec=webdavclient.ZFSSAWebDAVClient)
+ self.zfssa = rest.ZFSSANfsApi()
+ self.zfssa.set_host('fakehost')
+ self.pool_url = '/api/storage/v1/pools/'
+
+ def _create_response(self, status, data='data'):
+ response = FakeResponse(status, data)
+ return response
+
+ def test_verify_share(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK)
+ self.zfssa.verify_share(self.pool, self.project, self.share)
+ self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool +
+ '/projects/' + self.project +
+ '/filesystems/' + self.share)
+
+ def test_create_delete_snapshot(self):
+ self.zfssa.rclient.post.return_value = self._create_response(
+ client.Status.CREATED)
+ self.zfssa.create_snapshot(self.pool,
+ self.project,
+ self.share,
+ self.snap)
+ expected_arg = {'name': self.snap}
+ self.zfssa.rclient.post.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project +
+ '/filesystems/' + self.share + '/snapshots', expected_arg)
+
+ self.zfssa.rclient.delete.return_value = self._create_response(
+ client.Status.NO_CONTENT)
+ self.zfssa.delete_snapshot(self.pool,
+ self.project,
+ self.share,
+ self.snap)
+ self.zfssa.rclient.delete.assert_called_with(
+ self.pool_url + self.pool + '/projects/' + self.project +
+ '/filesystems/' + self.share + '/snapshots/' + self.snap)
+
+ def create_delete_snapshot_of_volume_file(self):
+ src_file = "fake_src_file"
+ dst_file = "fake_dst_file"
+ self.zfssa.create_snapshot_of_volume_file(src_file=src_file,
+ dst_file=dst_file)
+ self.zfssa.webdavclient.request.assert_called_once_with(
+ src_file=src_file,
+ dst_file=dst_file,
+ method='COPY')
+ self.zfssa.delete_snapshot_of_volume_file(src_file=src_file)
+ self.zfssa.webdavclient.request.assert_called_once_with(
+ src_file=src_file, method='DELETE')
+
+ def test_get_share(self):
+ ret_val = json.dumps({'filesystem': 'test_fs'})
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.OK, ret_val)
+ ret = self.zfssa.get_share(self.pool, self.project, self.share)
+ self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool +
+ '/projects/' + self.project +
+ '/filesystems/' + self.share)
+ self.assertEqual('test_fs', ret)
+
+ def test_create_share(self):
+ self.zfssa.rclient.get.return_value = self._create_response(
+ client.Status.NOT_FOUND)
+ self.zfssa.rclient.post.return_value = self._create_response(
+ client.Status.BAD_REQUEST)
+ self.assertRaises(exception.VolumeBackendAPIException,
+ self.zfssa.create_share,
+ self.pool,
+ self.project,
+ self.share,
+ {})
+
+ @mock.patch.object(rest.ZFSSANfsApi, '_change_service_state')
+ @mock.patch.object(rest.ZFSSANfsApi, 'verify_service')
+ def test_enable_disable_modify_service(self,
+ verify_service,
+ _change_service_state):
+ self.zfssa.enable_service('http')
+ self.zfssa._change_service_state.assert_called_with(
+ 'http', state='enable')
+ self.zfssa.verify_service.assert_called_with('http')
+
+ self.zfssa.disable_service('http')
+ self.zfssa._change_service_state.assert_called_with(
+ 'http', state='disable')
+ self.zfssa.verify_service.assert_called_with('http', status='offline')
+
+ ret_val = json.dumps({'service': {
+ "href": "/api/service/v1/services/http",
+ "<status>": "online",
+ "require_login": False,
+ "protocols": "http/https",
+ "listen_port": 81,
+ "https_port": 443}})
+ self.zfssa.rclient.put.return_value = self._create_response(
+ client.Status.ACCEPTED, ret_val)
+ args = {'listen_port': 81}
+ self.zfssa.modify_service('http', args)
+ self.zfssa.rclient.put.called_with('/api/service/v1/services/http',
+ args)
+
+
+class TestRestClientURL(test.TestCase):
+ def setUp(self):
+ super(TestRestClientURL, self).setUp()
+ self.timeout = 60
+ self.url = '1.1.1.1'
+ self.client = client.RestClientURL(self.url, timeout=self.timeout)
+
+ @mock.patch.object(client.RestClientURL, 'request')
+ def test_post(self, _request):
+ path = '/api/storage/v1/pools'
+ body = {'name': 'fakepool'}
+ self.client.post(path, body=body)
+ self.client.request.assert_called_with(path, 'POST', body)
+
+ @mock.patch.object(client.RestClientURL, 'request')
+ def test_get(self, _request):
+ path = '/api/storage/v1/pools'
+ self.client.get(path)
+ self.client.request.assert_called_with(path, 'GET')
+
+ @mock.patch.object(client.RestClientURL, 'request')
+ def test_put(self, _request):
+ path = '/api/storage/v1/pools'
+ body = {'name': 'fakepool'}
+ self.client.put(path, body=body)
+ self.client.request.assert_called_with(path, 'PUT', body)
+
+ @mock.patch.object(client.RestClientURL, 'request')
+ def test_delete(self, _request):
+ path = '/api/storage/v1/pools'
+ self.client.delete(path)
+ self.client.request.assert_called_with(path, 'DELETE')
+
+ @mock.patch.object(client.RestClientURL, 'request')
+ def test_head(self, _request):
+ path = '/api/storage/v1/pools'
+ self.client.head(path)
+ self.client.request.assert_called_with(path, 'HEAD')
+
+ @mock.patch.object(client, 'RestResult')
+ @mock.patch.object(client.urllib.request, 'Request')
+ @mock.patch.object(client.urllib.request, 'urlopen')
+ def test_request(self, _urlopen, _Request, _RestResult):
+ path = '/api/storage/v1/pools'
+ _urlopen.return_value = mock.Mock()
+ self.client.request(path, mock.ANY)
+ _Request.assert_called_with(self.url + path, None, self.client.headers)
+ _urlopen.assert_called_with(mock.ANY, timeout=self.timeout)
+ _RestResult.assert_called_with(response=mock.ANY)
+
+ @mock.patch.object(client, 'RestResult')
+ @mock.patch.object(client.urllib.request, 'Request')
+ @mock.patch.object(client.urllib.request, 'urlopen')
+ @mock.patch.object(client, 'ssl', new_callable=FakeSSL)
+ def test_ssl_with_context(self, _ssl, _urlopen, _Request, _RestResult):
+ """Test PEP476 certificate opt_out fix. """
+ path = '/api/storage/v1/pools'
+ _urlopen.return_value = mock.Mock()
+ self.client.request(path, mock.ANY)
+ _urlopen.assert_called_once_with(mock.ANY,
+ timeout=self.timeout,
+ context='fakecontext')
+
+ @mock.patch.object(client, 'RestResult')
+ @mock.patch.object(client.urllib.request, 'Request')
+ @mock.patch.object(client.urllib.request, 'urlopen')
+ @mock.patch.object(client, 'ssl', new_callable=object)
+ def test_ssl_no_context(self, _ssl, _urlopen, _Request, _RestResult):
+ """Verify the PEP476 fix backward compatibility. """
+ path = '/api/storage/v1/pools'
+ _urlopen.return_value = mock.Mock()
+ self.client.request(path, mock.ANY)
+ _urlopen.assert_called_once_with(mock.ANY, timeout=self.timeout)