volume = db.volume_get(ctx, volume['id'])
self.assertEqual(volume['status'], 'in-use')
self.assertEqual(volume['instance_uuid'], stubs.FAKE_UUID)
- self.assertEqual(volume['attached_host'], None)
+ self.assertIsNone(volume['attached_host'])
self.assertEqual(volume['mountpoint'], mountpoint)
self.assertEqual(volume['attach_status'], 'attached')
admin_metadata = volume['volume_admin_metadata']
volume = db.volume_get(ctx, volume['id'])
# status changed to 'available'
self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['instance_uuid'], None)
- self.assertEqual(volume['attached_host'], None)
- self.assertEqual(volume['mountpoint'], None)
+ self.assertIsNone(volume['instance_uuid'])
+ self.assertIsNone(volume['attached_host'])
+ self.assertIsNone(volume['mountpoint'])
self.assertEqual(volume['attach_status'], 'detached')
admin_metadata = volume['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 1)
# volume is attached
volume = db.volume_get(ctx, volume['id'])
self.assertEqual(volume['status'], 'in-use')
- self.assertEqual(volume['instance_uuid'], None)
+ self.assertIsNone(volume['instance_uuid'])
self.assertEqual(volume['attached_host'], host_name)
self.assertEqual(volume['mountpoint'], mountpoint)
self.assertEqual(volume['attach_status'], 'attached')
volume = db.volume_get(ctx, volume['id'])
# status changed to 'available'
self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['instance_uuid'], None)
- self.assertEqual(volume['attached_host'], None)
- self.assertEqual(volume['mountpoint'], None)
+ self.assertIsNone(volume['instance_uuid'])
+ self.assertIsNone(volume['attached_host'])
+ self.assertIsNone(volume['mountpoint'])
self.assertEqual(volume['attach_status'], 'detached')
admin_metadata = volume['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 1)
def test_content_type_missing(self):
request = wsgi.Request.blank('/tests/123', method='POST')
request.body = "<body />"
- self.assertEqual(None, request.get_content_type())
+ self.assertIsNone(request.get_content_type())
def test_content_type_unsupported(self):
request = wsgi.Request.blank('/tests/123', method='POST')
self.stubs.SmartSet(request.accept_language,
'best_match', fake_best_match)
- self.assertEqual(request.best_match_language(), None)
+ self.assertIsNone(request.best_match_language())
# If accept-language is not included or empty, match should be None
request.headers = {'Accept-Language': ''}
- self.assertEqual(request.best_match_language(), None)
+ self.assertIsNone(request.best_match_language())
request.headers.pop('Accept-Language')
- self.assertEqual(request.best_match_language(), None)
+ self.assertIsNone(request.best_match_language())
class ActionDispatcherTest(test.TestCase):
request.body = 'foo'
content_type, body = resource.get_body(request)
- self.assertEqual(content_type, None)
+ self.assertIsNone(content_type)
self.assertEqual(body, '')
def test_get_body_no_content_type(self):
request.body = 'foo'
content_type, body = resource.get_body(request)
- self.assertEqual(content_type, None)
+ self.assertIsNone(content_type)
self.assertEqual(body, '')
def test_get_body_no_content_body(self):
request.body = ''
content_type, body = resource.get_body(request)
- self.assertEqual(content_type, None)
+ self.assertIsNone(content_type)
self.assertEqual(body, '')
def test_get_body(self):
extensions = [extension1, extension2]
response, post = resource.pre_process_extensions(extensions, None, {})
self.assertEqual(called, [])
- self.assertEqual(response, None)
+ self.assertIsNone(response)
self.assertEqual(list(post), [extension2, extension1])
def test_pre_process_extensions_generator(self):
response, post = resource.pre_process_extensions(extensions, None, {})
post = list(post)
self.assertEqual(called, ['pre1', 'pre2'])
- self.assertEqual(response, None)
+ self.assertIsNone(response)
self.assertEqual(len(post), 2)
self.assertTrue(inspect.isgenerator(post[0]))
self.assertTrue(inspect.isgenerator(post[1]))
response = resource.post_process_extensions([extension2, extension1],
None, None, {})
self.assertEqual(called, [2, 1])
- self.assertEqual(response, None)
+ self.assertIsNone(response)
def test_post_process_extensions_regular_response(self):
class Controller(object):
None, None, {})
self.assertEqual(called, [2, 1])
- self.assertEqual(response, None)
+ self.assertIsNone(response)
def test_post_process_extensions_generator_response(self):
class Controller(object):
def test_missing_key_selector(self):
sel = xmlutil.Selector('test2', 'attrs')
- self.assertEqual(sel(self.obj_for_test), None)
+ self.assertIsNone(sel(self.obj_for_test))
self.assertRaises(KeyError, sel, self.obj_for_test, True)
def test_constant_selector(self):
# Create a template element with no subselector
elem = xmlutil.TemplateElement('test')
- self.assertEqual(elem.subselector, None)
+ self.assertIsNone(elem.subselector)
def test_element_subselector_string(self):
# Create a template element with a string subselector
elem = xmlutil.TemplateElement('test')
# Ensure that it has no text
- self.assertEqual(elem.text, None)
+ self.assertIsNone(elem.text)
# Try setting it to a string and ensure it becomes a selector
elem.text = 'test'
# Try resetting the text to None
elem.text = None
- self.assertEqual(elem.text, None)
+ self.assertIsNone(elem.text)
# Now make up a selector and try setting the text to that
sel = xmlutil.Selector()
# Finally, try deleting the text and see what happens
del elem.text
- self.assertEqual(elem.text, None)
+ self.assertIsNone(elem.text)
def test_apply_attrs(self):
# Create a template element
class TemplateBuilderTest(test.TestCase):
def test_master_template_builder(self):
# Make sure the template hasn't been built yet
- self.assertEqual(MasterTemplateBuilder._tmpl, None)
+ self.assertIsNone(MasterTemplateBuilder._tmpl)
# Now, construct the template
tmpl1 = MasterTemplateBuilder()
# Make sure that there is a template cached...
- self.assertNotEqual(MasterTemplateBuilder._tmpl, None)
+ self.assertIsNotNone(MasterTemplateBuilder._tmpl)
# Make sure it wasn't what was returned...
self.assertNotEqual(MasterTemplateBuilder._tmpl, tmpl1)
def test_slave_template_builder(self):
# Make sure the template hasn't been built yet
- self.assertEqual(SlaveTemplateBuilder._tmpl, None)
+ self.assertIsNone(SlaveTemplateBuilder._tmpl)
# Now, construct the template
tmpl1 = SlaveTemplateBuilder()
# Make sure there is a template cached...
- self.assertNotEqual(SlaveTemplateBuilder._tmpl, None)
+ self.assertIsNotNone(SlaveTemplateBuilder._tmpl)
# Make sure it was what was returned...
self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1)
"""Test a limit handles 1 GET per second."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
self.assertEqual(0, limit.next_request)
self.assertEqual(0, limit.last_request)
"""Test two calls to 1 GET per second limit."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
delay = limit("GET", "/anything")
self.assertEqual(1, delay)
self.time += 4
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
self.assertEqual(4, limit.next_request)
self.assertEqual(4, limit.last_request)
def test_good_url(self):
delay = self._request("GET", "/something")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
def test_escaping(self):
delay = self._request("GET", "/something/jump%20up")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
def test_response_to_delays(self):
delay = self._request("GET", "/delayed")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed")
self.assertEqual(delay, '60.00')
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user2")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user1")
self.assertEqual(delay, '60.00')
"""Test a limit handles 1 GET per second."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
self.assertEqual(0, limit.next_request)
self.assertEqual(0, limit.last_request)
"""Test two calls to 1 GET per second limit."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
delay = limit("GET", "/anything")
self.assertEqual(1, delay)
self.time += 4
delay = limit("GET", "/anything")
- self.assertEqual(None, delay)
+ self.assertIsNone(delay)
self.assertEqual(4, limit.next_request)
self.assertEqual(4, limit.last_request)
def test_good_url(self):
delay = self._request("GET", "/something")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
def test_escaping(self):
delay = self._request("GET", "/something/jump%20up")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
def test_response_to_delays(self):
delay = self._request("GET", "/delayed")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed")
self.assertEqual(delay, '60.00')
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user2")
- self.assertEqual(delay, None)
+ self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user1")
self.assertEqual(delay, '60.00')
self.stubs.Set(self.connector, '_execute', initiator_no_file)
initiator = self.connector.get_initiator()
- self.assertEqual(initiator, None)
+ self.assertIsNone(initiator)
self.stubs.Set(self.connector, '_execute', initiator_get_text)
initiator = self.connector.get_initiator()
self.assertEqual(initiator, 'iqn.1234-56.foo.bar:01:23456789abc')
self.assertEqual(src_volume['name'], expected_name)
self.assertEqual(src_volume['host'], 'dest')
self.assertEqual(src_volume['status'], 'available')
- self.assertEqual(src_volume['migration_status'], None)
+ self.assertIsNone(src_volume['migration_status'])
res = db.qos_specs_associations_get(self.ctxt, specs_id)
self.assertEqual(len(res), 0)
res = db.volume_type_get(self.ctxt, type_id)
- self.assertEqual(res['qos_specs_id'], None)
+ self.assertIsNone(res['qos_specs_id'])
def test_qos_specs_disassociate_all(self):
specs_id = self._create_qos_specs('FakeQos')
num_images = len(self.service.detail(self.context))
image_id = self.service.create(self.context, fixture)['id']
- self.assertNotEquals(None, image_id)
+ self.assertIsNotNone(image_id)
self.assertEqual(num_images + 1,
len(self.service.detail(self.context)))
fixture = self._make_fixture(name='test image')
image_id = self.service.create(self.context, fixture)['id']
- self.assertNotEquals(None, image_id)
+ self.assertIsNotNone(image_id)
self.assertRaises(exception.ImageNotFound,
self.service.show,
self.context,
def test_update_from_volume_capability(self):
fake_host = host_manager.HostState('host1')
- self.assertEqual(fake_host.free_capacity_gb, None)
+ self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 1024,
'free_capacity_gb': 512,
def test_update_from_volume_infinite_capability(self):
fake_host = host_manager.HostState('host1')
- self.assertEqual(fake_host.free_capacity_gb, None)
+ self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 'infinite',
'free_capacity_gb': 'infinite',
def test_update_from_volume_unknown_capability(self):
fake_host = host_manager.HostState('host1')
- self.assertEqual(fake_host.free_capacity_gb, None)
+ self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 'infinite',
'free_capacity_gb': 'unknown',
def test_get_compressor(self):
service = SwiftBackupDriver(self.ctxt)
compressor = service._get_compressor('None')
- self.assertEqual(compressor, None)
+ self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(compressor, zlib)
compressor = service._get_compressor('bz2')
self.assertEqual(volume['mountpoint'], '/tmp')
self.assertEqual(volume['attach_status'], 'attached')
self.assertEqual(volume['instance_uuid'], instance_uuid)
- self.assertEqual(volume['attached_host'], None)
+ self.assertIsNone(volume['attached_host'])
def test_volume_attached_to_host(self):
volume = db.volume_create(self.ctxt, {'host': 'host1'})
self.assertEqual(volume['status'], 'in-use')
self.assertEqual(volume['mountpoint'], '/tmp')
self.assertEqual(volume['attach_status'], 'attached')
- self.assertEqual(volume['instance_uuid'], None)
+ self.assertIsNone(volume['instance_uuid'])
self.assertEqual(volume['attached_host'], host_name)
def test_volume_data_get_for_host(self):
encryption_get = \
db.volume_type_encryption_get(self.ctxt,
encryption['volume_type_id'])
- self.assertEqual(None, encryption_get)
+ self.assertIsNone(encryption_get)
class DBAPIReservationTestCase(BaseTest):
def test_create_volume(self):
loc = self.driver.create_volume(_VOLUME)
- self.assertNotEqual(loc, None)
+ self.assertIsNotNone(loc)
vol = _VOLUME.copy()
vol['provider_location'] = loc['provider_location']
- self.assertNotEqual(loc['provider_location'], None)
+ self.assertIsNotNone(loc['provider_location'])
return vol
def test_delete_volume(self):
svol = vol.copy()
svol['volume_size'] = svol['size']
loc = self.driver.create_snapshot(svol)
- self.assertNotEqual(loc, None)
+ self.assertIsNotNone(loc)
svol['provider_location'] = loc['provider_location']
return svol
svol = vol.copy()
svol['volume_size'] = svol['size']
loc = self.driver.create_snapshot(svol)
- self.assertNotEqual(loc, None)
+ self.assertIsNotNone(loc)
svol['provider_location'] = loc['provider_location']
return svol
def test_create_volume_from_snapshot(self):
svol = self.test_create_snapshot()
vol = self.driver.create_volume_from_snapshot(_VOLUME, svol)
- self.assertNotEqual(vol, None)
+ self.assertIsNotNone(vol)
return vol
def test_initialize_connection(self):
def test_log_in_success(self):
deviceid = self.driver.common.login()
- self.assertNotEqual(deviceid, None)
+ self.assertIsNotNone(deviceid)
def test_log_out_success(self):
self.driver.common.login()
def test_delete_volume_success(self):
self.driver.common.login()
self.driver.delete_volume(test_volume)
- self.assertEqual(self.driver.common.lun_id, None)
+ self.assertIsNone(self.driver.common.lun_id)
def test_delete_snapshot_success(self):
self.driver.common.login()
self.driver.delete_snapshot(test_snap)
- self.assertEqual(self.driver.common.snapshot_id, None)
+ self.assertIsNone(self.driver.common.snapshot_id)
def test_colone_volume_success(self):
self.driver.common.login()
def test_get_volume_stats(self):
self.driver.common.login()
status = self.driver.get_volume_stats()
- self.assertNotEqual(status['free_capacity_gb'], None)
+ self.assertIsNotNone(status['free_capacity_gb'])
def test_create_snapshot_fail(self):
self.driver.common.login()
def test_log_in_Success(self):
deviceid = self.driver.common.login()
- self.assertNotEqual(deviceid, None)
+ self.assertIsNotNone(deviceid)
def test_create_volume_success(self):
self.driver.common.login()
def test_delete_volume_success(self):
self.driver.common.login()
self.driver.delete_volume(test_volume)
- self.assertEqual(self.driver.common.lun_id, None)
+ self.assertIsNone(self.driver.common.lun_id)
def test_delete_snapshot_success(self):
self.driver.common.login()
self.driver.delete_snapshot(test_snap)
- self.assertEqual(self.driver.common.snapshot_id, None)
+ self.assertIsNone(self.driver.common.snapshot_id)
def test_colone_volume_success(self):
self.driver.common.login()
def test_get_volume_stats(self):
self.driver.common.login()
status = self.driver.get_volume_stats()
- self.assertNotEqual(status['free_capacity_gb'], None)
+ self.assertIsNotNone(status['free_capacity_gb'])
def test_create_snapshot_fail(self):
self.driver.common.login()
self.driver.delete_volume, FAKE_VOLUME)
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
- self.assertEqual(FAKE_VOLUME['provider_location'], None)
+ self.assertIsNone(LUN_INFO['ID'])
+ self.assertIsNone(FAKE_VOLUME['provider_location'])
def test_create_delete_cloned_volume(self):
# Test no source volume
FAKE_CLONED_VOLUME, FAKE_VOLUME)
self.assertEqual(CLONED_LUN_INFO['ID'], VOLUME_SNAP_ID['vol_copy'])
self.driver.delete_volume(FAKE_CLONED_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
# Test start luncopy failed
self.assertEqual(LUN_INFO['ID'], VOLUME_SNAP_ID['vol'])
set_error_flg('chgluncopystatus')
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.create_cloned_volume,
FAKE_CLONED_VOLUME, FAKE_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
self.assertEqual(LUN_INFO['ID'], VOLUME_SNAP_ID['vol'])
# Test luncopy status abnormal
LUNCOPY_SETTING['Status'] = 'Disable'
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.create_cloned_volume,
FAKE_CLONED_VOLUME, FAKE_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
self.assertEqual(LUN_INFO['ID'], VOLUME_SNAP_ID['vol'])
LUNCOPY_SETTING['Status'] = 'Normal'
# Test delete luncopy failed
FAKE_CLONED_VOLUME, FAKE_VOLUME)
self.assertEqual(CLONED_LUN_INFO['ID'], VOLUME_SNAP_ID['vol_copy'])
self.driver.delete_volume(FAKE_CLONED_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
# need to clean up LUNCopy
LUNCOPY_INFO['Name'] = None
LUNCOPY_INFO['ID'] = None
self.assertEqual(CLONED_LUN_INFO['ID'], VOLUME_SNAP_ID['vol_copy'])
self.assertEqual(ret['provider_location'], CLONED_LUN_INFO['ID'])
self.driver.delete_volume(FAKE_CLONED_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
- self.assertEqual(FAKE_CLONED_VOLUME['provider_location'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
+ self.assertIsNone(FAKE_CLONED_VOLUME['provider_location'])
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def test_create_delete_snapshot(self):
# Test no resource pool
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.create_snapshot,
FAKE_SNAPSHOT)
- self.assertEqual(SNAPSHOT_INFO['ID'], None)
- self.assertEqual(SNAPSHOT_INFO['Status'], None)
+ self.assertIsNone(SNAPSHOT_INFO['ID'])
+ self.assertIsNone(SNAPSHOT_INFO['Status'])
# Test disable snapshot failed
set_error_flg('disablesnapshot')
self.driver.create_snapshot(FAKE_SNAPSHOT)
self.assertEqual(SNAPSHOT_INFO['Status'], 'Active')
self.assertEqual(ret['provider_location'], SNAPSHOT_INFO['ID'])
self.driver.delete_snapshot(FAKE_SNAPSHOT)
- self.assertEqual(SNAPSHOT_INFO['ID'], None)
- self.assertEqual(SNAPSHOT_INFO['Status'], None)
+ self.assertIsNone(SNAPSHOT_INFO['ID'])
+ self.assertIsNone(SNAPSHOT_INFO['Status'])
def test_create_delete_snapshot_volume(self):
# Test no source snapshot
self.driver.delete_snapshot(FAKE_SNAPSHOT)
self.driver.delete_volume(FAKE_VOLUME)
self.driver.delete_volume(FAKE_CLONED_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
- self.assertEqual(SNAPSHOT_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
+ self.assertIsNone(SNAPSHOT_INFO['ID'])
def test_initialize_connection(self):
# Test can not get iscsi iqn
self.assertEqual(LUN_INFO['Owner Controller'], 'B')
self.driver.terminate_connection(FAKE_VOLUME, FAKE_CONNECTOR)
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def test_terminate_connection(self):
# Test no host was found
self.assertEqual(LUN_INFO['ID'], VOLUME_SNAP_ID['vol'])
self.driver.initialize_connection(FAKE_VOLUME, FAKE_CONNECTOR)
self.driver.terminate_connection(FAKE_VOLUME, FAKE_CONNECTOR)
- self.assertEqual(MAP_INFO["DEV LUN ID"], None)
+ self.assertIsNone(MAP_INFO["DEV LUN ID"])
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def test_get_volume_stats(self):
stats = self.driver.get_volume_stats(True)
self.driver.create_volume(FAKE_VOLUME)
self.assertEqual(LUN_INFO['ID'], VOLUME_SNAP_ID['vol'])
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def test_create_delete_snapshot(self):
self.driver.create_volume(FAKE_VOLUME)
self.driver.create_snapshot(FAKE_SNAPSHOT)
self.assertEqual(SNAPSHOT_INFO['ID'], VOLUME_SNAP_ID['snap'])
self.driver.delete_snapshot(FAKE_SNAPSHOT)
- self.assertEqual(SNAPSHOT_INFO['ID'], None)
+ self.assertIsNone(SNAPSHOT_INFO['ID'])
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def test_create_cloned_volume(self):
self.driver.create_volume(FAKE_VOLUME)
self.assertEqual(ret['provider_location'], CLONED_LUN_INFO['ID'])
self.driver.delete_volume(FAKE_CLONED_VOLUME)
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
+ self.assertIsNone(LUN_INFO['ID'])
def test_create_snapshot_volume(self):
self.driver.create_volume(FAKE_VOLUME)
self.assertEqual(ret['provider_location'], CLONED_LUN_INFO['ID'])
self.driver.delete_volume(FAKE_CLONED_VOLUME)
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(CLONED_LUN_INFO['ID'], None)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(CLONED_LUN_INFO['ID'])
+ self.assertIsNone(LUN_INFO['ID'])
def test_initialize_terminitat_connection(self):
self.driver.create_volume(FAKE_VOLUME)
self.assertEqual(MAP_INFO["DEV LUN ID"], LUN_INFO['ID'])
self.driver.terminate_connection(FAKE_VOLUME, FAKE_CONNECTOR)
- self.assertEqual(MAP_INFO["DEV LUN ID"], None)
- self.assertEqual(MAP_INFO["Host LUN ID"], None)
+ self.assertIsNone(MAP_INFO["DEV LUN ID"])
+ self.assertIsNone(MAP_INFO["Host LUN ID"])
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
def _test_get_volume_stats(self):
stats = self.driver.get_volume_stats(True)
FAKE_CONNECTOR['initiator'])
self.driver.terminate_connection(FAKE_VOLUME, FAKE_CONNECTOR)
self.driver.delete_volume(FAKE_VOLUME)
- self.assertEqual(LUN_INFO['ID'], None)
+ self.assertIsNone(LUN_INFO['ID'])
class SSHMethodTestCase(test.TestCase):
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
- self.assertEqual(vol_dict['provider_location'], None)
+ self.assertIsNone(vol_dict['provider_location'])
def test_clone_image_resizefails(self):
drv = self._driver
mox.VerifyAll()
self.assertFalse(result)
self.assertFalse(vol_dict['bootable'])
- self.assertEqual(vol_dict['provider_location'], None)
+ self.assertIsNone(vol_dict['provider_location'])
def test_is_cloneable_share_badformats(self):
drv = self._driver
action = "example:get_http"
target = {}
result = policy.enforce(self.context, action, target)
- self.assertEqual(result, None)
+ self.assertIsNone(result)
def test_enforce_http_false(self):
# Make sure no reservation was created for snapshot gigabytes.
reservations = db.reservation_get_all_by_project(self.context,
self.project_id)
- self.assertEqual(reservations.get('gigabytes'), None)
+ self.assertIsNone(reservations.get('gigabytes'))
# Make sure the snapshot volume_size isn't included in usage.
vol_ref2 = volume.API().create(self.context, 10, '', '')
resource = quota.BaseResource('test_resource')
self.assertEqual(resource.name, 'test_resource')
- self.assertEqual(resource.flag, None)
+ self.assertIsNone(resource.flag)
self.assertEqual(resource.default, -1)
def test_with_flag(self):
resource = quota.VolumeTypeResource('volumes', volume)
self.assertEqual(resource.name, 'volumes_%s' % volume_type_name)
- self.assertEqual(resource.flag, None)
+ self.assertIsNone(resource.flag)
self.assertEqual(resource.default, -1)
class TestUtil(test.TestCase):
def test_ascii_str(self):
- self.assertEqual(None, driver.ascii_str(None))
+ self.assertIsNone(driver.ascii_str(None))
self.assertEqual('foo', driver.ascii_str('foo'))
self.assertEqual('foo', driver.ascii_str(u'foo'))
self.assertRaises(UnicodeEncodeError,
sfv = SolidFireDriver(configuration=self.configuration)
model_update = sfv.create_volume(testvol)
- self.assertNotEqual(model_update, None)
+ self.assertIsNotNone(model_update)
def test_create_volume(self):
self.stubs.Set(SolidFireDriver, '_issue_api_request',
sfv = SolidFireDriver(configuration=self.configuration)
model_update = sfv.create_volume(testvol)
- self.assertNotEqual(model_update, None)
- self.assertEqual(model_update.get('provider_geometry', None), None)
+ self.assertIsNotNone(model_update)
+ self.assertIsNone(model_update.get('provider_geometry', None))
def test_create_volume_non_512(self):
self.stubs.Set(SolidFireDriver, '_issue_api_request',
sfv = SolidFireDriver(configuration=self.configuration)
model_update = sfv.create_volume(testvol)
- self.assertNotEqual(model_update, None)
+ self.assertIsNotNone(model_update)
def test_create_volume_fails(self):
# NOTE(JDG) This test just fakes update_cluster_status
self.stubs.Set(SolidFireDriver, '_issue_api_request',
self.fake_issue_api_request)
account = sfv._create_sfaccount('project-id')
- self.assertNotEqual(account, None)
+ self.assertIsNotNone(account)
def test_create_sfaccount_fails(self):
sfv = SolidFireDriver(configuration=self.configuration)
self.stubs.Set(SolidFireDriver, '_issue_api_request',
self.fake_issue_api_request_fails)
account = sfv._create_sfaccount('project-id')
- self.assertEqual(account, None)
+ self.assertIsNone(account)
def test_get_sfaccount_by_name(self):
sfv = SolidFireDriver(configuration=self.configuration)
self.stubs.Set(SolidFireDriver, '_issue_api_request',
self.fake_issue_api_request)
account = sfv._get_sfaccount_by_name('some-name')
- self.assertNotEqual(account, None)
+ self.assertIsNotNone(account)
def test_get_sfaccount_by_name_fails(self):
sfv = SolidFireDriver(configuration=self.configuration)
self.stubs.Set(SolidFireDriver, '_issue_api_request',
self.fake_issue_api_request_fails)
account = sfv._get_sfaccount_by_name('some-name')
- self.assertEqual(account, None)
+ self.assertIsNone(account)
def test_delete_volume(self):
self.stubs.Set(SolidFireDriver, '_issue_api_request',
'host': u'unicode.foo}.bar}.baz-%s' % rand_id}
self.driver.initialize_connection(volume1, conn)
host_name = self.driver._get_host_from_connector(conn)
- self.assertNotEqual(host_name, None)
+ self.assertIsNotNone(host_name)
self.driver.terminate_connection(volume1, conn)
host_name = self.driver._get_host_from_connector(conn)
- self.assertEqual(host_name, None)
+ self.assertIsNone(host_name)
self.driver.delete_volume(volume1)
# Clean up temporary hosts
for tmpconn in [tmpconn1, tmpconn2]:
host_name = self.driver._get_host_from_connector(tmpconn)
- self.assertNotEqual(host_name, None)
+ self.assertIsNotNone(host_name)
self.driver._delete_host(host_name)
def test_storwize_svc_validate_connector(self):
# Check case where no hosts exist
if self.USESIM:
ret = self.driver._get_host_from_connector(self._connector)
- self.assertEqual(ret, None)
+ self.assertIsNone(ret)
# Make sure that the volumes have been created
self._assert_vol_exists(volume1['name'], True)
self.driver.terminate_connection(volume1, self._connector)
if self.USESIM:
ret = self.driver._get_host_from_connector(self._connector)
- self.assertEqual(ret, None)
+ self.assertIsNone(ret)
# Check cases with no auth set for host
if self.USESIM:
chap_ret = self.driver._get_chap_secret_for_host(host_name)
if auth_enabled or host_exists == 'yes-auth':
self.assertIn('auth_password', init_ret['data'])
- self.assertNotEqual(chap_ret, None)
+ self.assertIsNotNone(chap_ret)
else:
self.assertNotIn('auth_password', init_ret['data'])
- self.assertEqual(chap_ret, None)
+ self.assertIsNone(chap_ret)
self.driver.terminate_connection(volume1, conn_na)
self._set_flag('storwize_svc_iscsi_chap_enabled', True)
# Make sure our host still exists
host_name = self.driver._get_host_from_connector(self._connector)
- self.assertNotEqual(host_name, None)
+ self.assertIsNotNone(host_name)
# Remove the mapping from the 2nd volume and delete it. The host should
# be automatically removed because there are no more mappings.
# Check if our host still exists (it should not)
if self.USESIM:
ret = self.driver._get_host_from_connector(self._connector)
- self.assertEqual(ret, None)
+ self.assertIsNone(ret)
def test_storwize_svc_multi_host_maps(self):
# We can't test connecting to multiple hosts from a single host when
def test_check_ssh_injection(self):
cmd_list = ['ssh', '-D', 'my_name@name_of_remote_computer']
- self.assertEqual(utils.check_ssh_injection(cmd_list), None)
+ self.assertIsNone(utils.check_ssh_injection(cmd_list))
def test_check_ssh_injection_on_error(self):
with_space = ['shh', 'my_name@ name_of_remote_computer']
"""Test _get_snapshot_from_tree."""
volops = volumeops.VMwareVolumeOps
ret = volops._get_snapshot_from_tree(mox.IgnoreArg(), None)
- self.assertEqual(ret, None)
+ self.assertIsNone(ret)
name = 'snapshot_name'
snapshot = FakeMor('VirtualMachineSnapshot', 'my_snap')
root = FakeSnapshotTree(name='snapshot_name', snapshot=snapshot)
1,
'name',
'description')
- self.assertEqual(volume['volume_type_id'], None)
- self.assertEqual(volume['encryption_key_id'], None)
+ self.assertIsNone(volume['volume_type_id'])
+ self.assertIsNone(volume['encryption_key_id'])
# Create default volume type
vol_type = conf_fixture.def_vol_type
self.assertEqual(vol['attach_status'], "attached")
self.assertEqual(vol['mountpoint'], mountpoint)
self.assertEqual(vol['instance_uuid'], instance_uuid)
- self.assertEqual(vol['attached_host'], None)
+ self.assertIsNone(vol['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 2)
self.assertEqual(admin_metadata[0]['key'], 'readonly')
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
self.assertEqual(vol['mountpoint'], mountpoint)
- self.assertEqual(vol['instance_uuid'], None)
+ self.assertIsNone(vol['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual(vol['attached_host'], 'fake-host')
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(vol['attach_status'], "attached")
self.assertEqual(vol['mountpoint'], mountpoint)
self.assertEqual(vol['instance_uuid'], instance_uuid)
- self.assertEqual(vol['attached_host'], None)
+ self.assertIsNone(vol['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 2)
self.assertEqual(admin_metadata[0]['key'], 'readonly')
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
self.assertEqual(vol['attach_status'], "detached")
- self.assertEqual(vol['mountpoint'], None)
- self.assertEqual(vol['instance_uuid'], None)
- self.assertEqual(vol['attached_host'], None)
+ self.assertIsNone(vol['mountpoint'])
+ self.assertIsNone(vol['instance_uuid'])
+ self.assertIsNone(vol['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 1)
self.assertEqual(admin_metadata[0]['key'], 'readonly')
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
self.assertEqual(vol['mountpoint'], mountpoint)
- self.assertEqual(vol['instance_uuid'], None)
+ self.assertIsNone(vol['instance_uuid'])
self.assertEqual(vol['attached_host'], 'fake-host')
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 2)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
self.assertEqual(vol['attach_status'], "detached")
- self.assertEqual(vol['mountpoint'], None)
- self.assertEqual(vol['instance_uuid'], None)
- self.assertEqual(vol['attached_host'], None)
+ self.assertIsNone(vol['mountpoint'])
+ self.assertIsNone(vol['instance_uuid'])
+ self.assertIsNone(vol['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(len(admin_metadata), 1)
self.assertEqual(admin_metadata[0]['key'], 'readonly')
# create raw snapshot
volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'])
- self.assertEqual(snapshot['display_name'], None)
+ self.assertIsNone(snapshot['display_name'])
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
# check volume properties
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual(volume['host'], 'newhost')
- self.assertEqual(volume['migration_status'], None)
+ self.assertIsNone(volume['migration_status'])
def test_migrate_volume_generic(self):
def fake_migr(vol, host):
host_obj, True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual(volume['host'], 'newhost')
- self.assertEqual(volume['migration_status'], None)
+ self.assertIsNone(volume['migration_status'])
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, False)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
def test_lvm_migrate_volume_bad_loc_info(self):
capabilities = {'location_info': 'foo'}
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, False)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
def test_lvm_migrate_volume_diff_driver(self):
capabilities = {'location_info': 'FooDriver:foo:bar'}
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, False)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
def test_lvm_migrate_volume_diff_host(self):
capabilities = {'location_info': 'LVMVolumeDriver:foo:bar'}
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, False)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
def test_lvm_migrate_volume_in_use(self):
hostname = socket.gethostname()
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, False)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
def test_lvm_migrate_volume_proceed(self):
hostname = socket.gethostname()
moved, model_update = self.volume.driver.migrate_volume(self.context,
vol, host)
self.assertEqual(moved, True)
- self.assertEqual(model_update, None)
+ self.assertIsNone(model_update)
class LVMVolumeDriverTestCase(DriverTestCase):
def test_safe_get(self):
c = configuration.Configuration(volume_opts, config_group='foo')
- self.assertEqual(c.safe_get('none_opt'), None)
+ self.assertIsNone(c.safe_get('none_opt'))
type_ref = volume_types.create(self.ctxt, "type1", {"key2": "val2",
"key3": "val3"})
res = volume_types.get_volume_type_qos_specs(type_ref['id'])
- self.assertEqual(res['qos_specs'], None)
+ self.assertIsNone(res['qos_specs'])
qos_specs.associate_qos_with_type(self.ctxt,
qos_ref['id'],
type_ref['id'])