from oslo.config import cfg
import stubout
import testtools
+from testtools import matchers
from cinder.common import config # Need to register global_opts
from cinder.db import migration
'd1value': d1value,
'd2value': d2value,
})
+
+ def assertGreater(self, first, second, msg=None):
+ """Python < v2.7 compatibility. Assert 'first' > 'second'"""
+ try:
+ f = super(TestCase, self).assertGreater
+ except AttributeError:
+ self.assertThat(first,
+ matchers.GreaterThan(second),
+ message=msg or '')
+ else:
+ f(first, second, msg=msg)
+
+ def assertGreaterEqual(self, first, second, msg=None):
+ """Python < v2.7 compatibility. Assert 'first' >= 'second'"""
+ try:
+ f = super(TestCase, self).assertGreaterEqual
+ except AttributeError:
+ self.assertThat(first,
+ matchers.Not(matchers.LessThan(second)),
+ message=msg or '')
+ else:
+ f(first, second, msg=msg)
# Make sure we have all the extensions, extras extensions being OK.
exts = root.findall('{0}extension'.format(NS))
- self.assertTrue(len(exts) >= len(self.ext_list))
+ self.assertGreaterEqual(len(exts), len(self.ext_list))
# Make sure that at least Fox in Sox is correct.
(fox_ext, ) = [x for x in exts if x.get('alias') == 'FOXNSOX']
self.connector = connector.LocalConnector(None)
cprops = self.connection_properties
dev_info = self.connector.connect_volume(cprops)
- self.assertTrue(dev_info['type'] == 'local')
- self.assertTrue(dev_info['path'] == cprops['device_path'])
+ self.assertEqual(dev_info['type'], 'local')
+ self.assertEqual(dev_info['path'], cprops['device_path'])
def test_connect_volume_with_invalid_connection_data(self):
self.connector = connector.LocalConnector(None)
ctxt_read_deleted = context.get_admin_context('yes')
backup = db.backup_get(ctxt_read_deleted, backup_id)
self.assertEqual(backup.deleted, True)
- self.assertTrue(timeutils.utcnow() > backup.deleted_at)
+ self.assertGreater(timeutils.utcnow(), backup.deleted_at)
self.assertEqual(backup.status, 'deleted')
def test_list_backup(self):
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
snaps = self.service.get_backup_snaps(self.service.rbd.Image())
- self.assertTrue(len(snaps) == 3)
+ self.assertEqual(len(snaps), 3)
def test_transfer_data_from_rbd_to_file(self):
self._set_common_backup_stubs(self.service)
self.stubs.Set(self.service.rbd.Image, 'discard', _setter)
self.service._discard_bytes(mock_rbd(), 123456, 0)
- self.assertTrue(len(calls) == 0)
+ self.assertEqual(len(calls), 0)
image = mock_rbd().Image()
wrapped_rbd = self._get_wrapped_rbd_io(image)
self.service._discard_bytes(wrapped_rbd, 123456, 1234)
- self.assertTrue(len(calls) == 1)
+ self.assertEqual(len(calls), 1)
self.stubs.Set(image, 'write', _setter)
wrapped_rbd = self._get_wrapped_rbd_io(image)
lambda *args: False)
self.service._discard_bytes(wrapped_rbd, 0,
self.service.chunk_size * 2)
- self.assertTrue(len(calls) == 3)
+ self.assertEqual(len(calls), 3)
def test_delete_backup_snapshot(self):
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
self.volume.create_volume(self.context, volume_src['id'])
snapCount = len(db.snapshot_get_all_for_volume(self.context,
volume_src['id']))
- self.assertTrue(snapCount == 0)
+ self.assertEqual(snapCount, 0)
snapshot = self._create_snapshot(volume_src['id'])
snapshot_id = snapshot['id']
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot['name'])))
snapCount = len(db.snapshot_get_all_for_volume(self.context,
volume_src['id']))
- self.assertTrue(snapCount == 1)
+ self.assertEqual(snapCount, 1)
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
self.assertFalse(os.path.exists(os.path.join(self.volumes_path,
snapshot['name'])))
snapCount = len(db.snapshot_get_all_for_volume(self.context,
volume_src['id']))
- self.assertTrue(snapCount == 0)
+ self.assertEqual(snapCount, 0)
def test_create_volume_from_snapshot(self):
volume_src = test_utils.create_volume(self.context, host=CONF.host)
stats = self.driver.get_volume_stats(True)
self.assertEqual(stats["vendor_name"], "HDS")
self.assertEqual(stats["storage_protocol"], "iSCSI")
- self.assertTrue(stats["total_capacity_gb"] > 0)
+ self.assertGreater(stats["total_capacity_gb"], 0)
def test_create_volume(self):
loc = self.driver.create_volume(_VOLUME)
num_luns_before = len(SimulatedHusBackend.alloc_lun)
self.driver.delete_volume(vol)
num_luns_after = len(SimulatedHusBackend.alloc_lun)
- self.assertTrue(num_luns_before > num_luns_after)
+ self.assertGreater(num_luns_before, num_luns_after)
def test_extend_volume(self):
vol = self.test_create_volume()
num_luns_before = len(SimulatedHusBackend.alloc_lun)
self.driver.delete_snapshot(svol)
num_luns_after = len(SimulatedHusBackend.alloc_lun)
- self.assertTrue(num_luns_before > num_luns_after)
+ self.assertGreater(num_luns_before, num_luns_after)
def test_create_volume_from_snapshot(self):
svol = self.test_create_snapshot()
num_conn_before = len(SimulatedHusBackend.connections)
self.driver.terminate_connection(vol, conn)
num_conn_after = len(SimulatedHusBackend.connections)
- self.assertTrue(num_conn_before > num_conn_after)
+ self.assertGreater(num_conn_before, num_conn_after)
self.mox.ReplayAll()
ports = self.driver.common.get_ports()['members']
- self.assertTrue(len(ports) == 3)
+ self.assertEqual(len(ports), 3)
def test_get_iscsi_ip_active(self):
self.flags(lock_path=self.tempdir)
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='openstack_citest'")
- self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
+ self.assertGreater(total.scalar(), 0,
+ msg="No tables found. Wrong schema?")
noninnodb = connection.execute("SELECT count(*) "
"from information_schema.TABLES "