The method assertEquals has been deprecated since python 2.7.
http://docs.python.org/2/library/unittest.html#deprecated-aliases
Also in Python 3, a deprecated warning is raised when using assertEquals
therefore we should use assertEqual instead.
This patch replaces the remaining assertEquals to assertEqual
Change-Id: I4e726ce414382e0d5bf982fad20363d61e00967a
body = {"volume": vol}
req = fakes.HTTPRequest.blank('/v1/volumes')
res_dict = self.controller.create(req, body)
- self.assertEquals(res_dict['volume']['volume_type'],
- db_vol_type['name'])
+ self.assertEqual(res_dict['volume']['volume_type'],
+ db_vol_type['name'])
def test_volume_creation_fails_with_bad_size(self):
vol = {"size": '',
spec_id3 = self._create_qos_specs('Name3', value3)
specs = db.qos_specs_get_all(self.ctxt)
- self.assertEquals(len(specs), 3,
- "Unexpected number of qos specs records")
+ self.assertEqual(len(specs), 3,
+ "Unexpected number of qos specs records")
expected1 = dict(name='Name1', id=spec_id1, consumer='front-end')
expected2 = dict(name='Name2', id=spec_id2, consumer='back-end')
db.transfer_get_all,
self.ctxt)
xfer = db.transfer_get_all(context.get_admin_context())
- self.assertEquals(len(xfer), 2,
- "Unexpected number of transfer records")
+ self.assertEqual(len(xfer), 2, "Unexpected number of transfer records")
xfer = db.transfer_get_all_by_project(self.ctxt, self.ctxt.project_id)
- self.assertEquals(len(xfer), 2,
- "Unexpected number of transfer records")
+ self.assertEqual(len(xfer), 2, "Unexpected number of transfer records")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
nctxt, self.ctxt.project_id)
xfer = db.transfer_get_all_by_project(nctxt.elevated(),
self.ctxt.project_id)
- self.assertEquals(len(xfer), 2,
- "Unexpected number of transfer records")
+ self.assertEqual(len(xfer), 2, "Unexpected number of transfer records")
def test_transfer_destroy(self):
volume_id = utils.create_volume(self.ctxt)['id']
xfer_id2 = self._create_transfer(volume_id2)
xfer = db.transfer_get_all(context.get_admin_context())
- self.assertEquals(len(xfer), 2,
- "Unexpected number of transfer records")
+ self.assertEqual(len(xfer), 2, "Unexpected number of transfer records")
self.assertFalse(xfer[0]['deleted'], "Deleted flag is set")
db.transfer_destroy(self.ctxt, xfer_id1)
xfer = db.transfer_get_all(context.get_admin_context())
- self.assertEquals(len(xfer), 1,
- "Unexpected number of transfer records")
- self.assertEquals(xfer[0]['id'], xfer_id2,
- "Unexpected value for Transfer id")
+ self.assertEqual(len(xfer), 1, "Unexpected number of transfer records")
+ self.assertEqual(xfer[0]['id'], xfer_id2,
+ "Unexpected value for Transfer id")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
db.transfer_destroy(nctxt.elevated(), xfer_id2)
xfer = db.transfer_get_all(context.get_admin_context())
- self.assertEquals(len(xfer), 0,
- "Unexpected number of transfer records")
+ self.assertEqual(len(xfer), 0, "Unexpected number of transfer records")
image_id = self.service.create(self.context, fixture)['id']
self.assertNotEquals(None, image_id)
- self.assertEquals(num_images + 1,
- len(self.service.detail(self.context)))
+ self.assertEqual(num_images + 1,
+ len(self.service.detail(self.context)))
def test_create_and_show_non_existing_image(self):
fixture = self._make_fixture(name='test image')
(service, same_id) = glance.get_remote_image_service(self.context,
image_url)
self.assertEqual(same_id, image_id)
- self.assertEquals(service._client.netloc,
- 'something-less-likely')
+ self.assertEqual(service._client.netloc, 'something-less-likely')
for ipv6_url in ('[::1]', '::1', '[::1]:444'):
image_url = 'http://%s/%s' % (ipv6_url, image_id)
(service, same_id) = glance.get_remote_image_service(self.context,
"""Test glance version set by flag is honoured."""
client_wrapper_v1 = glance.GlanceClientWrapper('fake', 'fake_host',
9292)
- self.assertEquals(client_wrapper_v1.client.__module__,
- 'glanceclient.v1.client')
+ self.assertEqual(client_wrapper_v1.client.__module__,
+ 'glanceclient.v1.client')
self.flags(glance_api_version=2)
client_wrapper_v2 = glance.GlanceClientWrapper('fake', 'fake_host',
9292)
- self.assertEquals(client_wrapper_v2.client.__module__,
- 'glanceclient.v2.client')
+ self.assertEqual(client_wrapper_v2.client.__module__,
+ 'glanceclient.v2.client')
CONF.reset()
def test_glance_version_by_arg(self):
"""Test glance version set by arg to GlanceClientWrapper"""
client_wrapper_v1 = glance.GlanceClientWrapper('fake', 'fake_host',
9292, version=1)
- self.assertEquals(client_wrapper_v1.client.__module__,
- 'glanceclient.v1.client')
+ self.assertEqual(client_wrapper_v1.client.__module__,
+ 'glanceclient.v1.client')
client_wrapper_v2 = glance.GlanceClientWrapper('fake', 'fake_host',
9292, version=2)
- self.assertEquals(client_wrapper_v2.client.__module__,
- 'glanceclient.v2.client')
+ self.assertEqual(client_wrapper_v2.client.__module__,
+ 'glanceclient.v2.client')
def _create_failing_glance_client(info):
self.volume_id)
name = self.service._get_backup_base_name(self.volume_id, '1234')
- self.assertEquals(name, "volume-%s.backup.%s" %
- (self.volume_id, '1234'))
+ self.assertEqual(name,
+ "volume-%s.backup.%s" % (self.volume_id, '1234'))
def test_backup_volume_from_rbd(self):
self._create_volume_db_entry(self.volume_id, 1)
self.drv.local_path(TEST_VOLUME1).AndReturn('/dev/loop1')
self.mox.ReplayAll()
data = self.drv.initialize_connection(TEST_VOLUME1, TEST_CONNECTOR)
- self.assertEquals(data, {
+ self.assertEqual(data, {
'driver_volume_type': 'local',
'data': {'device_path': '/dev/loop1'}
})
.AndReturn('dev_path')
self.mox.ReplayAll()
result = self.drv.create_volume(TEST_VOLUME)
- self.assertEquals(result, {
+ self.assertEqual(result, {
'provider_location': 'None:3260,None None '
'None dev_path'})
AndReturn('BlockDeviceDriver')
self.mox.ReplayAll()
self.drv._update_volume_stats()
- self.assertEquals(self.drv._stats,
- {'total_capacity_gb': 2,
- 'free_capacity_gb': 2,
- 'reserved_percentage':
- self.configuration.reserved_percentage,
- 'QoS_support': False,
- 'vendor_name': "Open Source",
- 'driver_version': self.drv.VERSION,
- 'storage_protocol': 'unknown',
- 'volume_backend_name': 'BlockDeviceDriver',
- })
+ self.assertEqual(self.drv._stats,
+ {'total_capacity_gb': 2,
+ 'free_capacity_gb': 2,
+ 'reserved_percentage':
+ self.configuration.reserved_percentage,
+ 'QoS_support': False,
+ 'vendor_name': "Open Source",
+ 'driver_version': self.drv.VERSION,
+ 'storage_protocol': 'unknown',
+ 'volume_backend_name': 'BlockDeviceDriver',
+ })
def test_create_cloned_volume(self):
TEST_SRC = {'id': '1',
volutils.copy_volume('/dev/loop1', dev, 2048,
execute=self.drv._execute)
self.mox.ReplayAll()
- self.assertEquals(self.drv.create_cloned_volume(TEST_VOLUME, TEST_SRC),
- {'provider_location': 'None:3260,'
- 'None None None /dev/loop2'})
+ self.assertEqual(self.drv.create_cloned_volume(TEST_VOLUME, TEST_SRC),
+ {'provider_location': 'None:3260,'
+ 'None None None /dev/loop2'})
def test_copy_image_to_volume(self):
TEST_VOLUME = {'provider_location': '1 2 3 /dev/loop1'}
for dev in self.configuration.available_devices:
self.drv._get_device_size(dev).AndReturn(1)
self.mox.ReplayAll()
- self.assertEquals(self.drv._devices_sizes(),
- {'/dev/loop1': 1,
- '/dev/loop2': 1})
+ self.assertEqual(self.drv._devices_sizes(),
+ {'/dev/loop1': 1, '/dev/loop2': 1})
def test_find_appropriate_size_device_no_free_disks(self):
size = 1
self.mox.StubOutWithMock(self.drv, '_get_used_devices')
self.drv._get_used_devices().AndReturn(set())
self.mox.ReplayAll()
- self.assertEquals(self.drv.find_appropriate_size_device(size),
- '/dev/loop2')
+ self.assertEqual(self.drv.find_appropriate_size_device(size),
+ '/dev/loop2')
image_id=None, request_spec=None,
filter_properties=None):
- self.test_inst.assertEquals(self.expected_spec, request_spec)
+ self.test_inst.assertEqual(self.expected_spec, request_spec)
class fake_volume_api(object):
snapshot_id=None, image_id=None,
source_volid=None):
- self.test_inst.assertEquals(self.expected_spec, request_spec)
- self.test_inst.assertEquals(request_spec['source_volid'],
- source_volid)
- self.test_inst.assertEquals(request_spec['snapshot_id'],
- snapshot_id)
- self.test_inst.assertEquals(request_spec['image_id'],
- image_id)
+ self.test_inst.assertEqual(self.expected_spec, request_spec)
+ self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
+ self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
+ self.test_inst.assertEqual(request_spec['image_id'], image_id)
class fake_db(object):
self.assertEqual(unicode(exc), 'default message')
def test_error_msg(self):
- self.assertEquals(unicode(exception.CinderException('test')),
- 'test')
+ self.assertEqual(unicode(exception.CinderException('test')), 'test')
def test_default_error_msg_with_kwargs(self):
class FakeCinderException(exception.CinderException):
mox.ReplayAll()
- self.assertEquals((df_avail, df_total_size),
- drv._get_available_capacity(
- self.TEST_EXPORT1))
+ self.assertEqual((df_avail, df_total_size),
+ drv._get_available_capacity(self.TEST_EXPORT1))
mox.VerifyAll()
mox.ReplayAll()
- self.assertEquals((df_total_size - du_used, df_total_size),
- drv._get_available_capacity(
- self.TEST_EXPORT1))
+ self.assertEqual((df_total_size - du_used, df_total_size),
+ drv._get_available_capacity(self.TEST_EXPORT1))
mox.VerifyAll()
mox.ReplayAll()
expected_file = 'volume-%s.%s' % (self.VOLUME_UUID, self.SNAP_UUID)
- self.assertEquals(drv._get_backing_file(qemu_img_info_output),
- expected_file)
+ self.assertEqual(drv._get_backing_file(qemu_img_info_output),
+ expected_file)
mox.VerifyAll()
info = drv._read_info_file(info_path)
- self.assertEquals(info[self.VOLUME_UUID],
- 'volume-%s' % self.VOLUME_UUID)
+ self.assertEqual(info[self.VOLUME_UUID],
+ 'volume-%s' % self.VOLUME_UUID)
mox.VerifyAll()
self.assertEqual(LUN_INFO['Owner Controller'], 'A')
ret = self.driver.initialize_connection(FAKE_VOLUME, FAKE_CONNECTOR)
iscsi_propers = ret['data']
- self.assertEquals(iscsi_propers['target_iqn'],
- INITIATOR_SETTING['TargetIQN-form'])
- self.assertEquals(iscsi_propers['target_portal'],
- INITIATOR_SETTING['Initiator TargetIP'] + ':3260')
+ self.assertEqual(iscsi_propers['target_iqn'],
+ INITIATOR_SETTING['TargetIQN-form'])
+ self.assertEqual(iscsi_propers['target_portal'],
+ INITIATOR_SETTING['Initiator TargetIP'] + ':3260')
self.assertEqual(MAP_INFO["DEV LUN ID"], LUN_INFO['ID'])
self.assertEqual(MAP_INFO["INI Port Info"],
FAKE_CONNECTOR['initiator'])
self.driver.create_volume(FAKE_VOLUME)
ret = self.driver.initialize_connection(FAKE_VOLUME, FAKE_CONNECTOR)
fc_properties = ret['data']
- self.assertEquals(fc_properties['target_wwn'],
- INITIATOR_SETTING['WWN'])
+ self.assertEqual(fc_properties['target_wwn'],
+ INITIATOR_SETTING['WWN'])
self.assertEqual(MAP_INFO["DEV LUN ID"], LUN_INFO['ID'])
self.driver.terminate_connection(FAKE_VOLUME, FAKE_CONNECTOR)
self.driver.create_volume(FAKE_VOLUME)
ret = self.driver.initialize_connection(FAKE_VOLUME, FAKE_CONNECTOR)
iscsi_propers = ret['data']
- self.assertEquals(iscsi_propers['target_iqn'],
- INITIATOR_SETTING['TargetIQN-form'])
- self.assertEquals(iscsi_propers['target_portal'],
- INITIATOR_SETTING['Initiator TargetIP'] + ':3260')
+ self.assertEqual(iscsi_propers['target_iqn'],
+ INITIATOR_SETTING['TargetIQN-form'])
+ self.assertEqual(iscsi_propers['target_portal'],
+ INITIATOR_SETTING['Initiator TargetIP'] + ':3260')
self.assertEqual(MAP_INFO["DEV LUN ID"], LUN_INFO['ID'])
self.assertEqual(MAP_INFO["INI Port Info"],
FAKE_CONNECTOR['initiator'])
self.assertEqual(inf.snapshots[0]['tag'], 'snap1')
self.assertEqual(inf.snapshots[0]['vm_size'], '1.7G')
self.assertEqual(inf.snapshots[0]['date'], '2011-10-04')
- self.assertEquals(inf.snapshots[0]['vm_clock'],
- '19:04:00 32:06:34.974')
+ self.assertEqual(inf.snapshots[0]['vm_clock'], '19:04:00 32:06:34.974')
self.assertEqual(str(inf), TEST_STR)
result = image_utils.discover_vhd_chain('some/path')
mox.VerifyAll()
- self.assertEquals(
+ self.assertEqual(
['some/path/0.vhd', 'some/path/1.vhd'], result)
mox.ReplayAll()
- self.assertEquals((stat_total_size, stat_avail, du_used),
- drv._get_capacity_info(self.TEST_NFS_EXPORT1))
+ self.assertEqual((stat_total_size, stat_avail, du_used),
+ drv._get_capacity_info(self.TEST_NFS_EXPORT1))
mox.VerifyAll()
mox.ReplayAll()
- self.assertEquals((stat_total_size, stat_avail, du_used),
- drv._get_capacity_info(self.TEST_NFS_EXPORT_SPACES))
+ self.assertEqual((stat_total_size, stat_avail, du_used),
+ drv._get_capacity_info(self.TEST_NFS_EXPORT_SPACES))
mox.VerifyAll()
self.assertEqual(expected, actual)
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
- self.assertEquals(expected,
- self.volume.driver.clone_image(object(), None, None))
+ self.assertEqual(expected,
+ self.volume.driver.clone_image(object(), None, None))
# Test Success Case(s)
expected = ({'provider_location': None}, True)
'always get passed '
'correctly')
runs = int(runs.strip())
- self.assertEquals(runs, 10,
- 'Ran %d times instead of 10.' % (runs,))
+ self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,))
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
input = [{'a': {'b': {'c': 'c_1'}}},
{'a': {'b': {'c': 'c_2'}}}]
- self.assertEquals([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
- f(input, "a"))
+ self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
+ f(input, "a"))
self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
def test_hour(self):
begin, end = utils.last_completed_audit_period(unit='hour')
- self.assertEquals(begin,
- datetime.datetime(hour=7,
- day=5,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(hour=8,
- day=5,
- month=3,
- year=2012))
+ self.assertEqual(begin,
+ datetime.datetime(hour=7,
+ day=5,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(hour=8,
+ day=5,
+ month=3,
+ year=2012))
def test_hour_with_offset_before_current(self):
begin, end = utils.last_completed_audit_period(unit='hour@10')
- self.assertEquals(begin, datetime.datetime(minute=10,
- hour=7,
- day=5,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(minute=10,
- hour=8,
- day=5,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(minute=10,
+ hour=7,
+ day=5,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(minute=10,
+ hour=8,
+ day=5,
+ month=3,
+ year=2012))
def test_hour_with_offset_after_current(self):
begin, end = utils.last_completed_audit_period(unit='hour@30')
- self.assertEquals(begin, datetime.datetime(minute=30,
- hour=6,
- day=5,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(minute=30,
- hour=7,
- day=5,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(minute=30,
+ hour=6,
+ day=5,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(minute=30,
+ hour=7,
+ day=5,
+ month=3,
+ year=2012))
def test_day(self):
begin, end = utils.last_completed_audit_period(unit='day')
- self.assertEquals(begin, datetime.datetime(day=4,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(day=5,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=4,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(day=5,
+ month=3,
+ year=2012))
def test_day_with_offset_before_current(self):
begin, end = utils.last_completed_audit_period(unit='day@6')
- self.assertEquals(begin, datetime.datetime(hour=6,
- day=4,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(hour=6,
- day=5,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(hour=6,
+ day=4,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(hour=6,
+ day=5,
+ month=3,
+ year=2012))
def test_day_with_offset_after_current(self):
begin, end = utils.last_completed_audit_period(unit='day@10')
- self.assertEquals(begin, datetime.datetime(hour=10,
- day=3,
- month=3,
- year=2012))
- self.assertEquals(end, datetime.datetime(hour=10,
- day=4,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(hour=10,
+ day=3,
+ month=3,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(hour=10,
+ day=4,
+ month=3,
+ year=2012))
def test_month(self):
begin, end = utils.last_completed_audit_period(unit='month')
- self.assertEquals(begin, datetime.datetime(day=1,
- month=2,
- year=2012))
- self.assertEquals(end, datetime.datetime(day=1,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=1,
+ month=2,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(day=1,
+ month=3,
+ year=2012))
def test_month_with_offset_before_current(self):
begin, end = utils.last_completed_audit_period(unit='month@2')
- self.assertEquals(begin, datetime.datetime(day=2,
- month=2,
- year=2012))
- self.assertEquals(end, datetime.datetime(day=2,
- month=3,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=2,
+ month=2,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(day=2,
+ month=3,
+ year=2012))
def test_month_with_offset_after_current(self):
begin, end = utils.last_completed_audit_period(unit='month@15')
- self.assertEquals(begin, datetime.datetime(day=15,
- month=1,
- year=2012))
- self.assertEquals(end, datetime.datetime(day=15,
- month=2,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=15,
+ month=1,
+ year=2012))
+ self.assertEqual(end, datetime.datetime(day=15,
+ month=2,
+ year=2012))
def test_year(self):
begin, end = utils.last_completed_audit_period(unit='year')
- self.assertEquals(begin, datetime.datetime(day=1,
- month=1,
- year=2011))
- self.assertEquals(end, datetime.datetime(day=1,
- month=1,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=1,
+ month=1,
+ year=2011))
+ self.assertEqual(end, datetime.datetime(day=1,
+ month=1,
+ year=2012))
def test_year_with_offset_before_current(self):
begin, end = utils.last_completed_audit_period(unit='year@2')
- self.assertEquals(begin, datetime.datetime(day=1,
- month=2,
- year=2011))
- self.assertEquals(end, datetime.datetime(day=1,
- month=2,
- year=2012))
+ self.assertEqual(begin, datetime.datetime(day=1,
+ month=2,
+ year=2011))
+ self.assertEqual(end, datetime.datetime(day=1,
+ month=2,
+ year=2012))
def test_year_with_offset_after_current(self):
begin, end = utils.last_completed_audit_period(unit='year@6')
- self.assertEquals(begin, datetime.datetime(day=1,
- month=6,
- year=2010))
- self.assertEquals(end, datetime.datetime(day=1,
- month=6,
- year=2011))
+ self.assertEqual(begin, datetime.datetime(day=1,
+ month=6,
+ year=2010))
+ self.assertEqual(end, datetime.datetime(day=1,
+ month=6,
+ year=2011))
class FakeSSHClient(object):
"""Test _get_disk_type."""
volume = FakeObject()
volume['volume_type_id'] = None
- self.assertEquals(vmdk.VMwareEsxVmdkDriver._get_disk_type(volume),
- 'thin')
+ self.assertEqual(vmdk.VMwareEsxVmdkDriver._get_disk_type(volume),
+ 'thin')
def test_init_conn_with_instance_no_backing(self):
"""Test initialize_connection with instance and without backing."""
m.ReplayAll()
actual_vmdk_path = self._volumeops.get_vmdk_path(backing)
- self.assertEquals(backingInfo.__class__.__name__,
- 'VirtualDiskFlatVer2BackingInfo')
+ self.assertEqual(backingInfo.__class__.__name__,
+ 'VirtualDiskFlatVer2BackingInfo')
self.assertEqual(virtualDisk.__class__.__name__, 'VirtualDisk')
self.assertEqual(actual_vmdk_path, vmdk_path)
m.UnsetStubs()
updated_at=self.updated_at)
response = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
- self.assertEquals('awaiting-transfer', volume['status'],
- 'Unexpected state')
+ self.assertEqual('awaiting-transfer', volume['status'],
+ 'Unexpected state')
tx_api.delete(self.ctxt, response['id'])
volume = db.volume_get(self.ctxt, '1')
- self.assertEquals('available', volume['status'],
- 'Unexpected state')
+ self.assertEqual('available', volume['status'], 'Unexpected state')
def test_transfer_invalid_volume(self):
tx_api = transfer_api.API()
tx_api.create,
self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
- self.assertEquals('in-use', volume['status'],
- 'Unexpected state')
+ self.assertEqual('in-use', volume['status'], 'Unexpected state')
def test_transfer_accept(self):
tx_api = transfer_api.API()
updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
- self.assertEquals('awaiting-transfer', volume['status'],
- 'Unexpected state')
+ self.assertEqual('awaiting-transfer', volume['status'],
+ 'Unexpected state')
self.assertRaises(exception.TransferNotFound,
tx_api.accept,
transfer['id'],
transfer['auth_key'])
volume = db.volume_get(self.ctxt, '1')
- self.assertEquals(volume['project_id'], 'new_project_id',
- 'Unexpected project id')
- self.assertEquals(volume['user_id'], 'new_user_id',
- 'Unexpected user id')
-
- self.assertEquals(volume['id'], response['volume_id'],
- 'Unexpected volume id in response.')
- self.assertEquals(transfer['id'], response['id'],
- 'Unexpected transfer id in response.')
+ self.assertEqual(volume['project_id'], 'new_project_id',
+ 'Unexpected project id')
+ self.assertEqual(volume['user_id'], 'new_user_id',
+ 'Unexpected user id')
+
+ self.assertEqual(volume['id'], response['volume_id'],
+ 'Unexpected volume id in response.')
+ self.assertEqual(transfer['id'], response['id'],
+ 'Unexpected transfer id in response.')
def test_transfer_get(self):
tx_api = transfer_api.API()
volume_type = db.volume_type_get(
context.get_admin_context(),
self.volume_type1_id)
- self.assertEquals(volume_type['extra_specs'],
- self.vol_type1_specs)
+ self.assertEqual(volume_type['extra_specs'], self.vol_type1_specs)
volume_type = db.volume_type_get(
context.get_admin_context(),
volume_type = db.volume_type_get_by_name(
context.get_admin_context(),
self.vol_type1['name'])
- self.assertEquals(volume_type['extra_specs'],
- self.vol_type1_specs)
+ self.assertEqual(volume_type['extra_specs'], self.vol_type1_specs)
volume_type = db.volume_type_get_by_name(
context.get_admin_context(),
types = db.volume_type_get_all(context.get_admin_context())
- self.assertEquals(
+ self.assertEqual(
types[self.vol_type1['name']]['extra_specs'], expected_specs)
- self.assertEquals(
+ self.assertEqual(
types[self.vol_type2_noextra['name']]['extra_specs'], {})
def test_get_host_from_queue_simple(self):
fullname = "%s.%s@%s" % (self.QUEUE_NAME, self.HOSTNAME, self.BACKEND)
- self.assertEquals(volume_utils.get_host_from_queue(fullname),
- self.HOSTNAME)
+ self.assertEqual(volume_utils.get_host_from_queue(fullname),
+ self.HOSTNAME)
def test_get_host_from_queue_ip(self):
fullname = "%s.%s@%s" % (self.QUEUE_NAME, self.HOSTIP, self.BACKEND)
- self.assertEquals(volume_utils.get_host_from_queue(fullname),
- self.HOSTIP)
+ self.assertEqual(volume_utils.get_host_from_queue(fullname),
+ self.HOSTIP)
def test_get_host_from_queue_multi_at_symbol(self):
fullname = "%s.%s@%s" % (self.QUEUE_NAME, self.HOSTNAME,
self.MULTI_AT_BACKEND)
- self.assertEquals(volume_utils.get_host_from_queue(fullname),
- self.HOSTNAME)
+ self.assertEqual(volume_utils.get_host_from_queue(fullname),
+ self.HOSTNAME)
def test_get_host_from_queue_ip_multi_at_symbol(self):
fullname = "%s.%s@%s" % (self.QUEUE_NAME, self.HOSTIP,
self.MULTI_AT_BACKEND)
- self.assertEquals(volume_utils.get_host_from_queue(fullname),
- self.HOSTIP)
+ self.assertEqual(volume_utils.get_host_from_queue(fullname),
+ self.HOSTIP)
class LVMVolumeDriverTestCase(test.TestCase):
)
mock.VerifyAll()
- self.assertEquals(
+ self.assertEqual(
dict(
driver_volume_type='xensm',
data=dict(
)
mock.VerifyAll()
- self.assertEquals(
+ self.assertEqual(
dict(
driver_volume_type='xensm',
data=dict(
mock.ReplayAll()
result = drv.create_snapshot(snapshot)
mock.VerifyAll()
- self.assertEquals(
+ self.assertEqual(
dict(provider_location="copied-sr/copied-vdi"),
result)
result = drv.create_volume_from_snapshot(volume, snapshot)
mock.VerifyAll()
- self.assertEquals(
+ self.assertEqual(
dict(provider_location='copied-sr/copied-vdi'), result)
def test_delete_snapshot(self):
mock_open = mock.Mock(return_value=mock_context_manager)
with mock.patch('__builtin__.open', mock_open):
- self.assertEquals(
+ self.assertEqual(
'blah', tools._stripped_first_line_of('/somefile'))
mock_open.assert_called_once_with('/somefile', 'rb')
def test_initialized_should_set_xiv_ds8k_info(self):
"""Test that the san flags are passed to the IBM proxy."""
- self.assertEquals(
+ self.assertEqual(
self.driver.xiv_ds8k_proxy.xiv_ds8k_info['xiv_ds8k_user'],
self.driver.configuration.san_login)
- self.assertEquals(
+ self.assertEqual(
self.driver.xiv_ds8k_proxy.xiv_ds8k_info['xiv_ds8k_pass'],
self.driver.configuration.san_password)
- self.assertEquals(
+ self.assertEqual(
self.driver.xiv_ds8k_proxy.xiv_ds8k_info['xiv_ds8k_address'],
self.driver.configuration.san_ip)
- self.assertEquals(
+ self.assertEqual(
self.driver.xiv_ds8k_proxy.xiv_ds8k_info['xiv_ds8k_vol_pool'],
self.driver.configuration.san_clustername)
self.assertEqual(data['total_capacity_gb'], 'infinite')
self.assertEqual(data['free_capacity_gb'], 'infinite')
- self.assertEquals(data,
- {'total_capacity_gb': 'infinite',
- 'free_capacity_gb': 'infinite',
- 'reserved_percentage':
- self.configuration.reserved_percentage,
- 'QoS_support': False,
- 'vendor_name': 'Zadara Storage',
- 'driver_version': self.driver.VERSION,
- 'storage_protocol': 'iSCSI',
- 'volume_backend_name': 'ZadaraVPSAISCSIDriver',
- })
+ self.assertEqual(data,
+ {'total_capacity_gb': 'infinite',
+ 'free_capacity_gb': 'infinite',
+ 'reserved_percentage':
+ self.configuration.reserved_percentage,
+ 'QoS_support': False,
+ 'vendor_name': 'Zadara Storage',
+ 'driver_version': self.driver.VERSION,
+ 'storage_protocol': 'iSCSI',
+ 'volume_backend_name': 'ZadaraVPSAISCSIDriver',
+ })