]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Clean up comparison assertions
authorZhongyue Luo <zhongyue.nah@intel.com>
Tue, 3 Sep 2013 05:07:21 +0000 (13:07 +0800)
committerGerrit Code Review <review@openstack.org>
Wed, 9 Oct 2013 02:30:10 +0000 (02:30 +0000)
Using assertTrue and the comparison operators to test
if an element is greater, equal, or less is too python2.4.
Our unit testing framework 'testtools' supports
- assertEqual
- and testtools Matcher classes
which were created for these types of tests.

Re-implemented assertGreater and assertGreaterEqual
with compatibility for python < 2.7.

Change-Id: I6f3e3279630393e594c75d30cb4b895c2e2c22ca

cinder/test.py
cinder/tests/api/test_extensions.py
cinder/tests/brick/test_brick_connector.py
cinder/tests/test_backup.py
cinder/tests/test_backup_ceph.py
cinder/tests/test_gpfs.py
cinder/tests/test_hds.py
cinder/tests/test_hp3par.py
cinder/tests/test_migrations.py

index 1f0f2908e9a114738591c0552c2e7b79a7e11a1c..cc5f33c2739afa2964a7acfdd57cb931b8c4c8d8 100644 (file)
@@ -35,6 +35,7 @@ import mox
 from oslo.config import cfg
 import stubout
 import testtools
+from testtools import matchers
 
 from cinder.common import config  # Need to register global_opts
 from cinder.db import migration
@@ -267,3 +268,25 @@ class TestCase(testtools.TestCase):
                                     'd1value': d1value,
                                     'd2value': d2value,
                                 })
+
+    def assertGreater(self, first, second, msg=None):
+        """Python < v2.7 compatibility.  Assert 'first' > 'second'"""
+        try:
+            f = super(TestCase, self).assertGreater
+        except AttributeError:
+            self.assertThat(first,
+                            matchers.GreaterThan(second),
+                            message=msg or '')
+        else:
+            f(first, second, msg=msg)
+
+    def assertGreaterEqual(self, first, second, msg=None):
+        """Python < v2.7 compatibility.  Assert 'first' >= 'second'"""
+        try:
+            f = super(TestCase, self).assertGreaterEqual
+        except AttributeError:
+            self.assertThat(first,
+                            matchers.Not(matchers.LessThan(second)),
+                            message=msg or '')
+        else:
+            f(first, second, msg=msg)
index 41d520cd0af6990b290f85eae8e2d3f86ba03874..65bfe101bdb71b50ee1ab849677cccfd17e86fda 100644 (file)
@@ -119,7 +119,7 @@ class ExtensionControllerTest(ExtensionTestCase):
 
         # Make sure we have all the extensions, extras extensions being OK.
         exts = root.findall('{0}extension'.format(NS))
-        self.assertTrue(len(exts) >= len(self.ext_list))
+        self.assertGreaterEqual(len(exts), len(self.ext_list))
 
         # Make sure that at least Fox in Sox is correct.
         (fox_ext, ) = [x for x in exts if x.get('alias') == 'FOXNSOX']
index 262ef66f9e8916697985d527777a7d7ee1cfb1c4..e6251df10088fefaf59dd191935cb2238c23fbc9 100644 (file)
@@ -605,8 +605,8 @@ class LocalConnectorTestCase(test.TestCase):
         self.connector = connector.LocalConnector(None)
         cprops = self.connection_properties
         dev_info = self.connector.connect_volume(cprops)
-        self.assertTrue(dev_info['type'] == 'local')
-        self.assertTrue(dev_info['path'] == cprops['device_path'])
+        self.assertEqual(dev_info['type'], 'local')
+        self.assertEqual(dev_info['path'], cprops['device_path'])
 
     def test_connect_volume_with_invalid_connection_data(self):
         self.connector = connector.LocalConnector(None)
index a01adb5d8c18533a0baf56018736dfb49b6d2573..cddd1042389fbf8d15e1bfaeedaec4c994a106a5 100644 (file)
@@ -358,7 +358,7 @@ class BackupTestCase(test.TestCase):
         ctxt_read_deleted = context.get_admin_context('yes')
         backup = db.backup_get(ctxt_read_deleted, backup_id)
         self.assertEqual(backup.deleted, True)
-        self.assertTrue(timeutils.utcnow() > backup.deleted_at)
+        self.assertGreater(timeutils.utcnow(), backup.deleted_at)
         self.assertEqual(backup.status, 'deleted')
 
     def test_list_backup(self):
index 7702443ca82e503451e513d70f61eea773ad71a8..43a7331b36c6e437c84bdc6b5a4851d1afe1ee68 100644 (file)
@@ -186,7 +186,7 @@ class BackupCephTestCase(test.TestCase):
 
         self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
         snaps = self.service.get_backup_snaps(self.service.rbd.Image())
-        self.assertTrue(len(snaps) == 3)
+        self.assertEqual(len(snaps), 3)
 
     def test_transfer_data_from_rbd_to_file(self):
         self._set_common_backup_stubs(self.service)
@@ -409,12 +409,12 @@ class BackupCephTestCase(test.TestCase):
         self.stubs.Set(self.service.rbd.Image, 'discard', _setter)
 
         self.service._discard_bytes(mock_rbd(), 123456, 0)
-        self.assertTrue(len(calls) == 0)
+        self.assertEqual(len(calls), 0)
 
         image = mock_rbd().Image()
         wrapped_rbd = self._get_wrapped_rbd_io(image)
         self.service._discard_bytes(wrapped_rbd, 123456, 1234)
-        self.assertTrue(len(calls) == 1)
+        self.assertEqual(len(calls), 1)
 
         self.stubs.Set(image, 'write', _setter)
         wrapped_rbd = self._get_wrapped_rbd_io(image)
@@ -422,7 +422,7 @@ class BackupCephTestCase(test.TestCase):
                        lambda *args: False)
         self.service._discard_bytes(wrapped_rbd, 0,
                                     self.service.chunk_size * 2)
-        self.assertTrue(len(calls) == 3)
+        self.assertEqual(len(calls), 3)
 
     def test_delete_backup_snapshot(self):
         snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
index 4fdb7882cf085524ed93be33151387637a9e78c0..316e182834fbe25b18dc2340557b998eee98c0f4 100644 (file)
@@ -193,7 +193,7 @@ class GPFSDriverTestCase(test.TestCase):
         self.volume.create_volume(self.context, volume_src['id'])
         snapCount = len(db.snapshot_get_all_for_volume(self.context,
                                                        volume_src['id']))
-        self.assertTrue(snapCount == 0)
+        self.assertEqual(snapCount, 0)
         snapshot = self._create_snapshot(volume_src['id'])
         snapshot_id = snapshot['id']
         self.volume.create_snapshot(self.context, volume_src['id'],
@@ -202,14 +202,14 @@ class GPFSDriverTestCase(test.TestCase):
                                                     snapshot['name'])))
         snapCount = len(db.snapshot_get_all_for_volume(self.context,
                                                        volume_src['id']))
-        self.assertTrue(snapCount == 1)
+        self.assertEqual(snapCount, 1)
         self.volume.delete_snapshot(self.context, snapshot_id)
         self.volume.delete_volume(self.context, volume_src['id'])
         self.assertFalse(os.path.exists(os.path.join(self.volumes_path,
                                                      snapshot['name'])))
         snapCount = len(db.snapshot_get_all_for_volume(self.context,
                                                        volume_src['id']))
-        self.assertTrue(snapCount == 0)
+        self.assertEqual(snapCount, 0)
 
     def test_create_volume_from_snapshot(self):
         volume_src = test_utils.create_volume(self.context, host=CONF.host)
index a04b12e09084acda22951ae67d4c873f1ec05603..1d4ac437db2bb298b975f78d674ce2b73b532cc4 100644 (file)
@@ -189,7 +189,7 @@ class HUSiSCSIDriverTest(test.TestCase):
         stats = self.driver.get_volume_stats(True)
         self.assertEqual(stats["vendor_name"], "HDS")
         self.assertEqual(stats["storage_protocol"], "iSCSI")
-        self.assertTrue(stats["total_capacity_gb"] > 0)
+        self.assertGreater(stats["total_capacity_gb"], 0)
 
     def test_create_volume(self):
         loc = self.driver.create_volume(_VOLUME)
@@ -214,7 +214,7 @@ class HUSiSCSIDriverTest(test.TestCase):
         num_luns_before = len(SimulatedHusBackend.alloc_lun)
         self.driver.delete_volume(vol)
         num_luns_after = len(SimulatedHusBackend.alloc_lun)
-        self.assertTrue(num_luns_before > num_luns_after)
+        self.assertGreater(num_luns_before, num_luns_after)
 
     def test_extend_volume(self):
         vol = self.test_create_volume()
@@ -261,7 +261,7 @@ class HUSiSCSIDriverTest(test.TestCase):
         num_luns_before = len(SimulatedHusBackend.alloc_lun)
         self.driver.delete_snapshot(svol)
         num_luns_after = len(SimulatedHusBackend.alloc_lun)
-        self.assertTrue(num_luns_before > num_luns_after)
+        self.assertGreater(num_luns_before, num_luns_after)
 
     def test_create_volume_from_snapshot(self):
         svol = self.test_create_snapshot()
@@ -293,4 +293,4 @@ class HUSiSCSIDriverTest(test.TestCase):
         num_conn_before = len(SimulatedHusBackend.connections)
         self.driver.terminate_connection(vol, conn)
         num_conn_after = len(SimulatedHusBackend.connections)
-        self.assertTrue(num_conn_before > num_conn_after)
+        self.assertGreater(num_conn_before, num_conn_after)
index f6adbb6768f59b2126dc31675dd1a5c6c13f97d2..80f28e65ceece662a734eadfc0a05d1cc936f776 100644 (file)
@@ -1079,7 +1079,7 @@ class TestHP3PARISCSIDriver(HP3PARBaseDriver, test.TestCase):
         self.mox.ReplayAll()
 
         ports = self.driver.common.get_ports()['members']
-        self.assertTrue(len(ports) == 3)
+        self.assertEqual(len(ports), 3)
 
     def test_get_iscsi_ip_active(self):
         self.flags(lock_path=self.tempdir)
index 257c3e83c855095f73b62cb30c035a18ca6aa248..acd74a49aec997209949f90e0b65acfabcbdcfe5 100644 (file)
@@ -265,7 +265,8 @@ class TestMigrations(test.TestCase):
         total = connection.execute("SELECT count(*) "
                                    "from information_schema.TABLES "
                                    "where TABLE_SCHEMA='openstack_citest'")
-        self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
+        self.assertGreater(total.scalar(), 0,
+                           msg="No tables found. Wrong schema?")
 
         noninnodb = connection.execute("SELECT count(*) "
                                        "from information_schema.TABLES "