]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Convert all eqlx tests from mox to mock
authorrajinir <rajini_ram@dell.com>
Wed, 4 Mar 2015 16:47:25 +0000 (10:47 -0600)
committerrajinir <rajini_ram@dell.com>
Thu, 5 Mar 2015 23:24:59 +0000 (17:24 -0600)
Converted unit tests for eqlx driver
to use mock framework.

Change-Id: I4cb38ea72d38f7d08b2ae14abebed55532205e77
Closes-bug: 1415193

cinder/tests/test_eqlx.py

index b5fb350cfdf8df4a7f8c0becd7b9892e90105782..cd341ce389a1e466da858cbab2808af39343e1b3 100644 (file)
@@ -17,7 +17,6 @@ import time
 
 from eventlet import greenthread
 import mock
-import mox
 from oslo_concurrency import processutils
 import paramiko
 
@@ -37,8 +36,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
 
     def setUp(self):
         super(DellEQLSanISCSIDriverTestCase, self).setUp()
-        self.configuration = mox.MockObject(conf.Configuration)
-        self.configuration.append_config_values(mox.IgnoreArg())
+        self.configuration = mock.Mock(conf.Configuration)
         self.configuration.san_is_local = False
         self.configuration.san_ip = "10.0.0.1"
         self.configuration.san_login = "foo"
@@ -59,6 +57,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
         self.configuration.chap_username = 'admin'
         self.configuration.chap_password = 'password'
 
+        self.cmd = 'this is dummy command'
         self._context = context.get_admin_context()
         self.driver = eqlx.DellEQLSanISCSIDriver(
             configuration=self.configuration)
@@ -75,6 +74,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
             "       7dab76162"]
 
         self.fake_iqn = 'iqn.2003-10.com.equallogic:group01:25366:fakev'
+        self.fake_iqn_return = ['iSCSI target name is %s.' % self.fake_iqn]
         self.driver._group_ip = '10.0.1.6'
         self.properties = {
             'target_discoverd': True,
@@ -93,245 +93,236 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
         return self.properties
 
     def test_create_volume(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name, 'size': 1}
-        self.driver._eql_execute('volume', 'create', volume['name'],
-                                 "%sG" % (volume['size']), 'pool',
-                                 self.configuration.eqlx_pool,
-                                 'thin-provision').\
-            AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
-        self.driver._eql_execute('volume', 'select', volume['name'],
-                                 'multihost-access', 'enable')
-        self.mox.ReplayAll()
-        model_update = self.driver.create_volume(volume)
-        self.assertEqual(model_update, self._model_update)
+        mock_attrs = {'args': ['volume', 'create', volume['name'],
+                               "%sG" % (volume['size']), 'pool',
+                               self.configuration.eqlx_pool,
+                               'thin-provision']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.return_value = self.fake_iqn_return
+            model_update = self.driver.create_volume(volume)
+            self.assertEqual(self._model_update, model_update)
 
     def test_delete_volume(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name, 'size': 1}
-        self.driver._eql_execute('volume', 'select', volume['name'], 'show')
-        self.driver._eql_execute('volume', 'select', volume['name'], 'offline')
-        self.driver._eql_execute('volume', 'delete', volume['name'])
-        self.mox.ReplayAll()
-        self.driver.delete_volume(volume)
+        show_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+        off_attrs = {'args': ['volume', 'select', volume['name'], 'offline']}
+        delete_attrs = {'args': ['volume', 'delete', volume['name']]}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**show_attrs)
+            mock_eql_execute.configure_mock(**off_attrs)
+            mock_eql_execute.configure_mock(**delete_attrs)
+            self.driver.delete_volume(volume)
 
     def test_delete_absent_volume(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name, 'size': 1, 'id': self.volid}
-        self.driver._eql_execute('volume', 'select', volume['name'], 'show').\
-            AndRaise(processutils.ProcessExecutionError(
-                stdout='% Error ..... does not exist.\n'))
-        self.mox.ReplayAll()
-        self.driver.delete_volume(volume)
+        mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.side_effect = processutils.ProcessExecutionError(
+                stdout='% Error ..... does not exist.\n')
+            self.driver.delete_volume(volume)
 
     def test_ensure_export(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name, 'size': 1}
-        self.driver._eql_execute('volume', 'select', volume['name'], 'show')
-        self.mox.ReplayAll()
-        self.driver.ensure_export({}, volume)
+        mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            self.driver.ensure_export({}, volume)
 
     def test_create_snapshot(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
         snap_name = 'fake_snap_name'
-        self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
-                                 'snapshot', 'create-now').\
-            AndReturn(['Snapshot name is %s' % snap_name])
-        self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
-                                 'snapshot', 'rename', snap_name,
-                                 snapshot['name'])
-        self.mox.ReplayAll()
-        self.driver.create_snapshot(snapshot)
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.return_value = ['Snapshot name is %s' % snap_name]
+            self.driver.create_snapshot(snapshot)
 
     def test_create_volume_from_snapshot(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
         volume = {'name': self.volume_name}
-        self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
-                                 'snapshot', 'select', snapshot['name'],
-                                 'clone', volume['name']).\
-            AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
-        self.driver._eql_execute('volume', 'select', volume['name'],
-                                 'multihost-access', 'enable')
-        self.mox.ReplayAll()
-        model_update = self.driver.create_volume_from_snapshot(volume,
-                                                               snapshot)
-        self.assertEqual(model_update, self._model_update)
+        mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
+                               'snapshot', 'select', snapshot['name'],
+                               'clone', volume['name']]}
+
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.return_value = self.fake_iqn_return
+            model_update = self.driver.create_volume_from_snapshot(volume,
+                                                                   snapshot)
+            self.assertEqual(self._model_update, model_update)
 
     def test_create_cloned_volume(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         src_vref = {'name': 'fake_uuid'}
         volume = {'name': self.volume_name}
-        self.driver._eql_execute('volume', 'select', src_vref['name'], 'clone',
-                                 volume['name']).\
-            AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
-        self.driver._eql_execute('volume', 'select', volume['name'],
-                                 'multihost-access', 'enable')
-        self.mox.ReplayAll()
-        model_update = self.driver.create_cloned_volume(volume, src_vref)
-        self.assertEqual(model_update, self._model_update)
+        mock_attrs = {'args': ['volume', 'select', volume['name'],
+                               'multihost-access', 'enable']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.return_value = self.fake_iqn_return
+            model_update = self.driver.create_cloned_volume(volume, src_vref)
+            self.assertEqual(self._model_update, model_update)
 
     def test_delete_snapshot(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
-        self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
-                                 'snapshot', 'delete', snapshot['name'])
-        self.mox.ReplayAll()
-        self.driver.delete_snapshot(snapshot)
+        mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
+                               'snapshot', 'delete', snapshot['name']]}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            self.driver.delete_snapshot(snapshot)
 
     def test_extend_volume(self):
         new_size = '200'
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name, 'size': 100}
-        self.driver._eql_execute('volume', 'select', volume['name'],
-                                 'size', "%sG" % new_size)
-        self.mox.ReplayAll()
-        self.driver.extend_volume(volume, new_size)
+        mock_attrs = {'args': ['volume', 'select', volume['name'],
+                               'size', "%sG" % new_size]}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            self.driver.extend_volume(volume, new_size)
 
     def test_initialize_connection(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         volume = {'name': self.volume_name}
-        self.stubs.Set(self.driver, "_get_iscsi_properties",
-                       self._fake_get_iscsi_properties)
-        self.driver._eql_execute('volume', 'select', volume['name'], 'access',
-                                 'create', 'initiator',
-                                 self.connector['initiator'],
-                                 'authmethod', 'chap',
-                                 'username',
-                                 self.configuration.chap_username)
-        self.mox.ReplayAll()
-        iscsi_properties = self.driver.initialize_connection(volume,
-                                                             self.connector)
-        self.assertEqual(iscsi_properties['data'],
-                         self._fake_get_iscsi_properties(volume))
+        mock_attrs = {'args': ['volume', 'select', volume['name'], 'access',
+                               'create', 'initiator',
+                               self.connector['initiator'],
+                               'authmethod', 'chap',
+                               'username',
+                               self.configuration.chap_username]}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            with mock.patch.object(self.driver,
+                                   '_get_iscsi_properties') as mock_iscsi:
+                mock_eql_execute.configure_mock(**mock_attrs)
+                mock_iscsi.return_value = self.properties
+                iscsi_properties = self.driver.initialize_connection(
+                    volume, self.connector)
+                self.assertEqual(self._fake_get_iscsi_properties(volume),
+                                 iscsi_properties['data'])
 
     def test_terminate_connection(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
+        def my_side_effect(*args, **kwargs):
+            if args[4] == 'show':
+                return self.access_record_output
+            else:
+                return ''
         volume = {'name': self.volume_name}
-        self.driver._eql_execute('volume', 'select', volume['name'], 'access',
-                                 'show').AndReturn(self.access_record_output)
-        self.driver._eql_execute('volume', 'select', volume['name'], 'access',
-                                 'delete', '1')
-        self.mox.ReplayAll()
-        self.driver.terminate_connection(volume, self.connector)
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.side_effect = my_side_effect
+            self.driver.terminate_connection(volume, self.connector)
 
     def test_do_setup(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
         fake_group_ip = '10.1.2.3'
-        for feature in ('confirmation', 'paging', 'events', 'formatoutput'):
-            self.driver._eql_execute('cli-settings', feature, 'off')
-        self.driver._eql_execute('grpparams', 'show').\
-            AndReturn(['Group-Ipaddress: %s' % fake_group_ip])
-        self.mox.ReplayAll()
-        self.driver.do_setup(self._context)
-        self.assertEqual(fake_group_ip, self.driver._group_ip)
+
+        def my_side_effect(*args, **kwargs):
+            if args[0] == 'grpparams':
+                return ['Group-Ipaddress: %s' % fake_group_ip]
+            else:
+                return ''
+
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.side_effect = my_side_effect
+            self.driver.do_setup(self._context)
+            self.assertEqual(self.driver._group_ip, fake_group_ip)
 
     def test_update_volume_stats(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
-        self.driver._eql_execute('pool', 'select',
-                                 self.configuration.eqlx_pool, 'show').\
-            AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
-        self.mox.ReplayAll()
-        self.driver._update_volume_stats()
-        self.assertEqual(self.driver._stats['total_capacity_gb'], 111.0)
-        self.assertEqual(self.driver._stats['free_capacity_gb'], 11.0)
+        mock_attrs = {'args': ['pool', 'select',
+                               self.configuration.eqlx_pool, 'show']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.return_value = ['TotalCapacity: 111GB',
+                                             'FreeSpace: 11GB']
+            self.driver._update_volume_stats()
+            self.assertEqual(111.0, self.driver._stats['total_capacity_gb'])
+            self.assertEqual(11.0, self.driver._stats['free_capacity_gb'])
 
     def test_get_volume_stats(self):
-        self.driver._eql_execute = self.mox.\
-            CreateMock(self.driver._eql_execute)
-        self.driver._eql_execute('pool', 'select',
-                                 self.configuration.eqlx_pool, 'show').\
-            AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
-        self.mox.ReplayAll()
-        stats = self.driver.get_volume_stats(refresh=True)
-        self.assertEqual(stats['total_capacity_gb'], float('111.0'))
-        self.assertEqual(stats['free_capacity_gb'], float('11.0'))
-        self.assertEqual(stats['vendor_name'], 'Dell')
+        mock_attrs = {'args': ['pool', 'select',
+                               self.configuration.eqlx_pool, 'show']}
+        with mock.patch.object(self.driver,
+                               '_eql_execute') as mock_eql_execute:
+            mock_eql_execute.configure_mock(**mock_attrs)
+            mock_eql_execute.return_value = ['TotalCapacity: 111GB',
+                                             'FreeSpace: 11GB']
+            stats = self.driver.get_volume_stats(refresh=True)
+            self.assertEqual(float('111.0'), stats['total_capacity_gb'])
+            self.assertEqual(float('11.0'), stats['free_capacity_gb'])
+            self.assertEqual('Dell', stats['vendor_name'])
 
     def test_get_space_in_gb(self):
-        self.assertEqual(self.driver._get_space_in_gb('123.0GB'), 123.0)
-        self.assertEqual(self.driver._get_space_in_gb('123.0TB'), 123.0 * 1024)
-        self.assertEqual(self.driver._get_space_in_gb('1024.0MB'), 1.0)
+        self.assertEqual(123.0, self.driver._get_space_in_gb('123.0GB'))
+        self.assertEqual(123.0 * 1024, self.driver._get_space_in_gb('123.0TB'))
+        self.assertEqual(1.0, self.driver._get_space_in_gb('1024.0MB'))
 
     def test_get_output(self):
 
         def _fake_recv(ignore_arg):
             return '%s> ' % self.configuration.eqlx_group_name
 
-        chan = self.mox.CreateMock(paramiko.Channel)
-        self.stubs.Set(chan, "recv", _fake_recv)
-        self.assertEqual(self.driver._get_output(chan), [_fake_recv(None)])
+        chan = mock.Mock(paramiko.Channel)
+        mock_recv = self.mock_object(chan, 'recv')
+        mock_recv.return_value = '%s> ' % self.configuration.eqlx_group_name
+        self.assertEqual([_fake_recv(None)], self.driver._get_output(chan))
 
     def test_get_prefixed_value(self):
         lines = ['Line1 passed', 'Line1 failed']
         prefix = ['Line1', 'Line2']
         expected_output = [' passed', None]
-        self.assertEqual(self.driver._get_prefixed_value(lines, prefix[0]),
-                         expected_output[0])
-        self.assertEqual(self.driver._get_prefixed_value(lines, prefix[1]),
-                         expected_output[1])
+        self.assertEqual(expected_output[0],
+                         self.driver._get_prefixed_value(lines, prefix[0]))
+        self.assertEqual(expected_output[1],
+                         self.driver._get_prefixed_value(lines, prefix[1]))
 
     def test_ssh_execute(self):
-        ssh = self.mox.CreateMock(paramiko.SSHClient)
-        chan = self.mox.CreateMock(paramiko.Channel)
-        transport = self.mox.CreateMock(paramiko.Transport)
-        self.mox.StubOutWithMock(self.driver, '_get_output')
-        self.mox.StubOutWithMock(chan, 'invoke_shell')
+        ssh = mock.Mock(paramiko.SSHClient)
+        chan = mock.Mock(paramiko.Channel)
+        transport = mock.Mock(paramiko.Transport)
+        mock_get_output = self.mock_object(self.driver, '_get_output')
+        self.mock_object(chan, 'invoke_shell')
         expected_output = ['NoError: test run']
-        ssh.get_transport().AndReturn(transport)
-        transport.open_session().AndReturn(chan)
+        mock_get_output.return_value = expected_output
+        ssh.get_transport.return_value = transport
+        transport.open_session.return_value = chan
         chan.invoke_shell()
-        self.driver._get_output(chan).AndReturn(expected_output)
-        cmd = 'this is dummy command'
         chan.send('stty columns 255' + '\r')
-        self.driver._get_output(chan).AndReturn(expected_output)
-        chan.send(cmd + '\r')
-        self.driver._get_output(chan).AndReturn(expected_output)
+        chan.send(self.cmd + '\r')
         chan.close()
-        self.mox.ReplayAll()
-        self.assertEqual(self.driver._ssh_execute(ssh, cmd), expected_output)
+        self.assertEqual(expected_output,
+                         self.driver._ssh_execute(ssh, self.cmd))
 
     def test_ssh_execute_error(self):
-        ssh = self.mox.CreateMock(paramiko.SSHClient)
-        chan = self.mox.CreateMock(paramiko.Channel)
-        transport = self.mox.CreateMock(paramiko.Transport)
-        self.mox.StubOutWithMock(self.driver, '_get_output')
-        self.mox.StubOutWithMock(ssh, 'get_transport')
-        self.mox.StubOutWithMock(chan, 'invoke_shell')
+        ssh = mock.Mock(paramiko.SSHClient)
+        chan = mock.Mock(paramiko.Channel)
+        transport = mock.Mock(paramiko.Transport)
+        mock_get_output = self.mock_object(self.driver, '_get_output')
+        self.mock_object(ssh, 'get_transport')
+        self.mock_object(chan, 'invoke_shell')
         expected_output = ['Error: test run', '% Error']
-        ssh.get_transport().AndReturn(transport)
-        transport.open_session().AndReturn(chan)
+        mock_get_output.return_value = expected_output
+        ssh.get_transport().return_value = transport
+        transport.open_session.return_value = chan
         chan.invoke_shell()
-        self.driver._get_output(chan).AndReturn(expected_output)
-        cmd = 'this is dummy command'
         chan.send('stty columns 255' + '\r')
-        self.driver._get_output(chan).AndReturn(expected_output)
-        chan.send(cmd + '\r')
-        self.driver._get_output(chan).AndReturn(expected_output)
+        chan.send(self.cmd + '\r')
         chan.close()
-        self.mox.ReplayAll()
         self.assertRaises(processutils.ProcessExecutionError,
-                          self.driver._ssh_execute, ssh, cmd)
+                          self.driver._ssh_execute, ssh, self.cmd)
 
     @mock.patch.object(greenthread, 'sleep')
     def test_ensure_retries(self, _gt_sleep):
         num_attempts = 3
         self.driver.configuration.eqlx_cli_max_retries = num_attempts
-
         self.mock_object(self.driver, '_ssh_execute',
                          mock.Mock(side_effect=exception.
                                    VolumeBackendAPIException("some error")))