body = [x for x in root.iterchildren()]
request = body[0]
tag = request.tag
- api = etree.QName(tag).localname or tag
- if 'volume-get-iter' == api:
+ localname = etree.QName(tag).localname or tag
+ if 'volume-get-iter' == localname:
body = """<results status="passed"><attributes-list>
<volume-attributes>
<volume-id-attributes>
</volume-attributes>
</attributes-list>
<num-records>4</num-records></results>"""
- elif 'aggr-options-list-info' == api:
+ elif 'aggr-options-list-info' == localname:
body = """<results status="passed">
<options>
<aggr-option-info>
</aggr-option-info>
</options>
</results>"""
- elif 'sis-get-iter' == api:
+ elif 'sis-get-iter' == localname:
body = """<results status="passed">
<attributes-list>
<sis-status-info>
</sis-status-info>
</attributes-list>
</results>"""
- elif 'storage-disk-get-iter' == api:
+ elif 'storage-disk-get-iter' == localname:
body = """<results status="passed">
<attributes-list>
<storage-disk-info>
def test_cl_vols_ssc_all(self):
"""Test cluster ssc for all vols."""
- mox = self.mox
na_server = api.NaServer('127.0.0.1')
vserver = 'openstack'
test_vols = set([copy.deepcopy(self.vol1),
sis = {'vola': {'dedup': False, 'compression': False},
'volb': {'dedup': True, 'compression': False}}
- mox.StubOutWithMock(ssc_utils, 'query_cluster_vols_for_ssc')
- mox.StubOutWithMock(ssc_utils, 'get_sis_vol_dict')
- mox.StubOutWithMock(ssc_utils, 'query_aggr_options')
- mox.StubOutWithMock(ssc_utils, 'query_aggr_storage_disk')
+ self.mox.StubOutWithMock(ssc_utils, 'query_cluster_vols_for_ssc')
+ self.mox.StubOutWithMock(ssc_utils, 'get_sis_vol_dict')
+ self.mox.StubOutWithMock(ssc_utils, 'query_aggr_options')
+ self.mox.StubOutWithMock(ssc_utils, 'query_aggr_storage_disk')
ssc_utils.query_cluster_vols_for_ssc(
na_server, vserver, None).AndReturn(test_vols)
ssc_utils.get_sis_vol_dict(na_server, vserver, None).AndReturn(sis)
na_server, IgnoreArg()).AndReturn(raid4)
ssc_utils.query_aggr_storage_disk(
na_server, IgnoreArg()).AndReturn('SAS')
- mox.ReplayAll()
+ self.mox.ReplayAll()
res_vols = ssc_utils.get_cluster_vols_with_ssc(
na_server, vserver, volume=None)
- mox.VerifyAll()
+ self.mox.VerifyAll()
for vol in res_vols:
if vol.id['name'] == 'volc':
self.assertEqual(vol.sis['compression'], False)
def test_cl_vols_ssc_single(self):
"""Test cluster ssc for single vol."""
- mox = self.mox
na_server = api.NaServer('127.0.0.1')
vserver = 'openstack'
test_vols = set([copy.deepcopy(self.vol1)])
sis = {'vola': {'dedup': False, 'compression': False}}
- mox.StubOutWithMock(ssc_utils, 'query_cluster_vols_for_ssc')
- mox.StubOutWithMock(ssc_utils, 'get_sis_vol_dict')
- mox.StubOutWithMock(ssc_utils, 'query_aggr_options')
- mox.StubOutWithMock(ssc_utils, 'query_aggr_storage_disk')
+ self.mox.StubOutWithMock(ssc_utils, 'query_cluster_vols_for_ssc')
+ self.mox.StubOutWithMock(ssc_utils, 'get_sis_vol_dict')
+ self.mox.StubOutWithMock(ssc_utils, 'query_aggr_options')
+ self.mox.StubOutWithMock(ssc_utils, 'query_aggr_storage_disk')
ssc_utils.query_cluster_vols_for_ssc(
na_server, vserver, 'vola').AndReturn(test_vols)
ssc_utils.get_sis_vol_dict(
ssc_utils.query_aggr_options(
na_server, 'aggr1').AndReturn(raiddp)
ssc_utils.query_aggr_storage_disk(na_server, 'aggr1').AndReturn('SSD')
- mox.ReplayAll()
+ self.mox.ReplayAll()
res_vols = ssc_utils.get_cluster_vols_with_ssc(
na_server, vserver, volume='vola')
- mox.VerifyAll()
+ self.mox.VerifyAll()
self.assertEqual(len(res_vols), 1)
def test_get_cluster_ssc(self):
"""Test get cluster ssc map."""
- mox = self.mox
na_server = api.NaServer('127.0.0.1')
vserver = 'openstack'
test_vols = set(
[self.vol1, self.vol2, self.vol3, self.vol4, self.vol5])
- mox.StubOutWithMock(ssc_utils, 'get_cluster_vols_with_ssc')
+ self.mox.StubOutWithMock(ssc_utils, 'get_cluster_vols_with_ssc')
ssc_utils.get_cluster_vols_with_ssc(
na_server, vserver).AndReturn(test_vols)
- mox.ReplayAll()
+ self.mox.ReplayAll()
res_map = ssc_utils.get_cluster_ssc(na_server, vserver)
- mox.VerifyAll()
+ self.mox.VerifyAll()
self.assertEqual(len(res_map['mirrored']), 1)
self.assertEqual(len(res_map['dedup']), 3)
self.assertEqual(len(res_map['compression']), 1)