From: rajinir Date: Wed, 4 Mar 2015 16:47:25 +0000 (-0600) Subject: Convert all eqlx tests from mox to mock X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=2b9cc5a053e86d7c80522c0cbe5dd22c28cac6cf;p=openstack-build%2Fcinder-build.git Convert all eqlx tests from mox to mock Converted unit tests for eqlx driver to use mock framework. Change-Id: I4cb38ea72d38f7d08b2ae14abebed55532205e77 Closes-bug: 1415193 --- diff --git a/cinder/tests/test_eqlx.py b/cinder/tests/test_eqlx.py index b5fb350cf..cd341ce38 100644 --- a/cinder/tests/test_eqlx.py +++ b/cinder/tests/test_eqlx.py @@ -17,7 +17,6 @@ import time from eventlet import greenthread import mock -import mox from oslo_concurrency import processutils import paramiko @@ -37,8 +36,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase): def setUp(self): super(DellEQLSanISCSIDriverTestCase, self).setUp() - self.configuration = mox.MockObject(conf.Configuration) - self.configuration.append_config_values(mox.IgnoreArg()) + self.configuration = mock.Mock(conf.Configuration) self.configuration.san_is_local = False self.configuration.san_ip = "10.0.0.1" self.configuration.san_login = "foo" @@ -59,6 +57,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase): self.configuration.chap_username = 'admin' self.configuration.chap_password = 'password' + self.cmd = 'this is dummy command' self._context = context.get_admin_context() self.driver = eqlx.DellEQLSanISCSIDriver( configuration=self.configuration) @@ -75,6 +74,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase): " 7dab76162"] self.fake_iqn = 'iqn.2003-10.com.equallogic:group01:25366:fakev' + self.fake_iqn_return = ['iSCSI target name is %s.' % self.fake_iqn] self.driver._group_ip = '10.0.1.6' self.properties = { 'target_discoverd': True, @@ -93,245 +93,236 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase): return self.properties def test_create_volume(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name, 'size': 1} - self.driver._eql_execute('volume', 'create', volume['name'], - "%sG" % (volume['size']), 'pool', - self.configuration.eqlx_pool, - 'thin-provision').\ - AndReturn(['iSCSI target name is %s.' % self.fake_iqn]) - self.driver._eql_execute('volume', 'select', volume['name'], - 'multihost-access', 'enable') - self.mox.ReplayAll() - model_update = self.driver.create_volume(volume) - self.assertEqual(model_update, self._model_update) + mock_attrs = {'args': ['volume', 'create', volume['name'], + "%sG" % (volume['size']), 'pool', + self.configuration.eqlx_pool, + 'thin-provision']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.return_value = self.fake_iqn_return + model_update = self.driver.create_volume(volume) + self.assertEqual(self._model_update, model_update) def test_delete_volume(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name, 'size': 1} - self.driver._eql_execute('volume', 'select', volume['name'], 'show') - self.driver._eql_execute('volume', 'select', volume['name'], 'offline') - self.driver._eql_execute('volume', 'delete', volume['name']) - self.mox.ReplayAll() - self.driver.delete_volume(volume) + show_attrs = {'args': ['volume', 'select', volume['name'], 'show']} + off_attrs = {'args': ['volume', 'select', volume['name'], 'offline']} + delete_attrs = {'args': ['volume', 'delete', volume['name']]} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**show_attrs) + mock_eql_execute.configure_mock(**off_attrs) + mock_eql_execute.configure_mock(**delete_attrs) + self.driver.delete_volume(volume) def test_delete_absent_volume(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name, 'size': 1, 'id': self.volid} - self.driver._eql_execute('volume', 'select', volume['name'], 'show').\ - AndRaise(processutils.ProcessExecutionError( - stdout='% Error ..... does not exist.\n')) - self.mox.ReplayAll() - self.driver.delete_volume(volume) + mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.side_effect = processutils.ProcessExecutionError( + stdout='% Error ..... does not exist.\n') + self.driver.delete_volume(volume) def test_ensure_export(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name, 'size': 1} - self.driver._eql_execute('volume', 'select', volume['name'], 'show') - self.mox.ReplayAll() - self.driver.ensure_export({}, volume) + mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + self.driver.ensure_export({}, volume) def test_create_snapshot(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'} snap_name = 'fake_snap_name' - self.driver._eql_execute('volume', 'select', snapshot['volume_name'], - 'snapshot', 'create-now').\ - AndReturn(['Snapshot name is %s' % snap_name]) - self.driver._eql_execute('volume', 'select', snapshot['volume_name'], - 'snapshot', 'rename', snap_name, - snapshot['name']) - self.mox.ReplayAll() - self.driver.create_snapshot(snapshot) + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.return_value = ['Snapshot name is %s' % snap_name] + self.driver.create_snapshot(snapshot) def test_create_volume_from_snapshot(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'} volume = {'name': self.volume_name} - self.driver._eql_execute('volume', 'select', snapshot['volume_name'], - 'snapshot', 'select', snapshot['name'], - 'clone', volume['name']).\ - AndReturn(['iSCSI target name is %s.' % self.fake_iqn]) - self.driver._eql_execute('volume', 'select', volume['name'], - 'multihost-access', 'enable') - self.mox.ReplayAll() - model_update = self.driver.create_volume_from_snapshot(volume, - snapshot) - self.assertEqual(model_update, self._model_update) + mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'], + 'snapshot', 'select', snapshot['name'], + 'clone', volume['name']]} + + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.return_value = self.fake_iqn_return + model_update = self.driver.create_volume_from_snapshot(volume, + snapshot) + self.assertEqual(self._model_update, model_update) def test_create_cloned_volume(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) src_vref = {'name': 'fake_uuid'} volume = {'name': self.volume_name} - self.driver._eql_execute('volume', 'select', src_vref['name'], 'clone', - volume['name']).\ - AndReturn(['iSCSI target name is %s.' % self.fake_iqn]) - self.driver._eql_execute('volume', 'select', volume['name'], - 'multihost-access', 'enable') - self.mox.ReplayAll() - model_update = self.driver.create_cloned_volume(volume, src_vref) - self.assertEqual(model_update, self._model_update) + mock_attrs = {'args': ['volume', 'select', volume['name'], + 'multihost-access', 'enable']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.return_value = self.fake_iqn_return + model_update = self.driver.create_cloned_volume(volume, src_vref) + self.assertEqual(self._model_update, model_update) def test_delete_snapshot(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'} - self.driver._eql_execute('volume', 'select', snapshot['volume_name'], - 'snapshot', 'delete', snapshot['name']) - self.mox.ReplayAll() - self.driver.delete_snapshot(snapshot) + mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'], + 'snapshot', 'delete', snapshot['name']]} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + self.driver.delete_snapshot(snapshot) def test_extend_volume(self): new_size = '200' - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name, 'size': 100} - self.driver._eql_execute('volume', 'select', volume['name'], - 'size', "%sG" % new_size) - self.mox.ReplayAll() - self.driver.extend_volume(volume, new_size) + mock_attrs = {'args': ['volume', 'select', volume['name'], + 'size', "%sG" % new_size]} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + self.driver.extend_volume(volume, new_size) def test_initialize_connection(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) volume = {'name': self.volume_name} - self.stubs.Set(self.driver, "_get_iscsi_properties", - self._fake_get_iscsi_properties) - self.driver._eql_execute('volume', 'select', volume['name'], 'access', - 'create', 'initiator', - self.connector['initiator'], - 'authmethod', 'chap', - 'username', - self.configuration.chap_username) - self.mox.ReplayAll() - iscsi_properties = self.driver.initialize_connection(volume, - self.connector) - self.assertEqual(iscsi_properties['data'], - self._fake_get_iscsi_properties(volume)) + mock_attrs = {'args': ['volume', 'select', volume['name'], 'access', + 'create', 'initiator', + self.connector['initiator'], + 'authmethod', 'chap', + 'username', + self.configuration.chap_username]} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + with mock.patch.object(self.driver, + '_get_iscsi_properties') as mock_iscsi: + mock_eql_execute.configure_mock(**mock_attrs) + mock_iscsi.return_value = self.properties + iscsi_properties = self.driver.initialize_connection( + volume, self.connector) + self.assertEqual(self._fake_get_iscsi_properties(volume), + iscsi_properties['data']) def test_terminate_connection(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) + def my_side_effect(*args, **kwargs): + if args[4] == 'show': + return self.access_record_output + else: + return '' volume = {'name': self.volume_name} - self.driver._eql_execute('volume', 'select', volume['name'], 'access', - 'show').AndReturn(self.access_record_output) - self.driver._eql_execute('volume', 'select', volume['name'], 'access', - 'delete', '1') - self.mox.ReplayAll() - self.driver.terminate_connection(volume, self.connector) + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.side_effect = my_side_effect + self.driver.terminate_connection(volume, self.connector) def test_do_setup(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) fake_group_ip = '10.1.2.3' - for feature in ('confirmation', 'paging', 'events', 'formatoutput'): - self.driver._eql_execute('cli-settings', feature, 'off') - self.driver._eql_execute('grpparams', 'show').\ - AndReturn(['Group-Ipaddress: %s' % fake_group_ip]) - self.mox.ReplayAll() - self.driver.do_setup(self._context) - self.assertEqual(fake_group_ip, self.driver._group_ip) + + def my_side_effect(*args, **kwargs): + if args[0] == 'grpparams': + return ['Group-Ipaddress: %s' % fake_group_ip] + else: + return '' + + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.side_effect = my_side_effect + self.driver.do_setup(self._context) + self.assertEqual(self.driver._group_ip, fake_group_ip) def test_update_volume_stats(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) - self.driver._eql_execute('pool', 'select', - self.configuration.eqlx_pool, 'show').\ - AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB']) - self.mox.ReplayAll() - self.driver._update_volume_stats() - self.assertEqual(self.driver._stats['total_capacity_gb'], 111.0) - self.assertEqual(self.driver._stats['free_capacity_gb'], 11.0) + mock_attrs = {'args': ['pool', 'select', + self.configuration.eqlx_pool, 'show']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.return_value = ['TotalCapacity: 111GB', + 'FreeSpace: 11GB'] + self.driver._update_volume_stats() + self.assertEqual(111.0, self.driver._stats['total_capacity_gb']) + self.assertEqual(11.0, self.driver._stats['free_capacity_gb']) def test_get_volume_stats(self): - self.driver._eql_execute = self.mox.\ - CreateMock(self.driver._eql_execute) - self.driver._eql_execute('pool', 'select', - self.configuration.eqlx_pool, 'show').\ - AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB']) - self.mox.ReplayAll() - stats = self.driver.get_volume_stats(refresh=True) - self.assertEqual(stats['total_capacity_gb'], float('111.0')) - self.assertEqual(stats['free_capacity_gb'], float('11.0')) - self.assertEqual(stats['vendor_name'], 'Dell') + mock_attrs = {'args': ['pool', 'select', + self.configuration.eqlx_pool, 'show']} + with mock.patch.object(self.driver, + '_eql_execute') as mock_eql_execute: + mock_eql_execute.configure_mock(**mock_attrs) + mock_eql_execute.return_value = ['TotalCapacity: 111GB', + 'FreeSpace: 11GB'] + stats = self.driver.get_volume_stats(refresh=True) + self.assertEqual(float('111.0'), stats['total_capacity_gb']) + self.assertEqual(float('11.0'), stats['free_capacity_gb']) + self.assertEqual('Dell', stats['vendor_name']) def test_get_space_in_gb(self): - self.assertEqual(self.driver._get_space_in_gb('123.0GB'), 123.0) - self.assertEqual(self.driver._get_space_in_gb('123.0TB'), 123.0 * 1024) - self.assertEqual(self.driver._get_space_in_gb('1024.0MB'), 1.0) + self.assertEqual(123.0, self.driver._get_space_in_gb('123.0GB')) + self.assertEqual(123.0 * 1024, self.driver._get_space_in_gb('123.0TB')) + self.assertEqual(1.0, self.driver._get_space_in_gb('1024.0MB')) def test_get_output(self): def _fake_recv(ignore_arg): return '%s> ' % self.configuration.eqlx_group_name - chan = self.mox.CreateMock(paramiko.Channel) - self.stubs.Set(chan, "recv", _fake_recv) - self.assertEqual(self.driver._get_output(chan), [_fake_recv(None)]) + chan = mock.Mock(paramiko.Channel) + mock_recv = self.mock_object(chan, 'recv') + mock_recv.return_value = '%s> ' % self.configuration.eqlx_group_name + self.assertEqual([_fake_recv(None)], self.driver._get_output(chan)) def test_get_prefixed_value(self): lines = ['Line1 passed', 'Line1 failed'] prefix = ['Line1', 'Line2'] expected_output = [' passed', None] - self.assertEqual(self.driver._get_prefixed_value(lines, prefix[0]), - expected_output[0]) - self.assertEqual(self.driver._get_prefixed_value(lines, prefix[1]), - expected_output[1]) + self.assertEqual(expected_output[0], + self.driver._get_prefixed_value(lines, prefix[0])) + self.assertEqual(expected_output[1], + self.driver._get_prefixed_value(lines, prefix[1])) def test_ssh_execute(self): - ssh = self.mox.CreateMock(paramiko.SSHClient) - chan = self.mox.CreateMock(paramiko.Channel) - transport = self.mox.CreateMock(paramiko.Transport) - self.mox.StubOutWithMock(self.driver, '_get_output') - self.mox.StubOutWithMock(chan, 'invoke_shell') + ssh = mock.Mock(paramiko.SSHClient) + chan = mock.Mock(paramiko.Channel) + transport = mock.Mock(paramiko.Transport) + mock_get_output = self.mock_object(self.driver, '_get_output') + self.mock_object(chan, 'invoke_shell') expected_output = ['NoError: test run'] - ssh.get_transport().AndReturn(transport) - transport.open_session().AndReturn(chan) + mock_get_output.return_value = expected_output + ssh.get_transport.return_value = transport + transport.open_session.return_value = chan chan.invoke_shell() - self.driver._get_output(chan).AndReturn(expected_output) - cmd = 'this is dummy command' chan.send('stty columns 255' + '\r') - self.driver._get_output(chan).AndReturn(expected_output) - chan.send(cmd + '\r') - self.driver._get_output(chan).AndReturn(expected_output) + chan.send(self.cmd + '\r') chan.close() - self.mox.ReplayAll() - self.assertEqual(self.driver._ssh_execute(ssh, cmd), expected_output) + self.assertEqual(expected_output, + self.driver._ssh_execute(ssh, self.cmd)) def test_ssh_execute_error(self): - ssh = self.mox.CreateMock(paramiko.SSHClient) - chan = self.mox.CreateMock(paramiko.Channel) - transport = self.mox.CreateMock(paramiko.Transport) - self.mox.StubOutWithMock(self.driver, '_get_output') - self.mox.StubOutWithMock(ssh, 'get_transport') - self.mox.StubOutWithMock(chan, 'invoke_shell') + ssh = mock.Mock(paramiko.SSHClient) + chan = mock.Mock(paramiko.Channel) + transport = mock.Mock(paramiko.Transport) + mock_get_output = self.mock_object(self.driver, '_get_output') + self.mock_object(ssh, 'get_transport') + self.mock_object(chan, 'invoke_shell') expected_output = ['Error: test run', '% Error'] - ssh.get_transport().AndReturn(transport) - transport.open_session().AndReturn(chan) + mock_get_output.return_value = expected_output + ssh.get_transport().return_value = transport + transport.open_session.return_value = chan chan.invoke_shell() - self.driver._get_output(chan).AndReturn(expected_output) - cmd = 'this is dummy command' chan.send('stty columns 255' + '\r') - self.driver._get_output(chan).AndReturn(expected_output) - chan.send(cmd + '\r') - self.driver._get_output(chan).AndReturn(expected_output) + chan.send(self.cmd + '\r') chan.close() - self.mox.ReplayAll() self.assertRaises(processutils.ProcessExecutionError, - self.driver._ssh_execute, ssh, cmd) + self.driver._ssh_execute, ssh, self.cmd) @mock.patch.object(greenthread, 'sleep') def test_ensure_retries(self, _gt_sleep): num_attempts = 3 self.driver.configuration.eqlx_cli_max_retries = num_attempts - self.mock_object(self.driver, '_ssh_execute', mock.Mock(side_effect=exception. VolumeBackendAPIException("some error")))