The method assertEquals has been deprecated since python 2.7.
http://docs.python.org/2/library/unittest.html#deprecated-aliases
Also in Python 3, a deprecated warning is raised when using assertEquals
therefore we should use assertEqual instead.
This patch is a mere sed replace of one-liner assertEquals to assertEqual.
grep -lR "assertEquals(" cinder/tests/* | \
xargs sed -ri 's/(\W+self.assertEqual)s(.+)(\)$)/\1\2\3/'
Change-Id: I4feea94d53afa866af9b7e14bd80345fa7276e75
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
volume = db.volume_get(ctx, volume['id'])
# status changed to 'error'
- self.assertEquals(volume['status'], 'error')
+ self.assertEqual(volume['status'], 'error')
def test_reset_status_as_non_admin(self):
# current status is 'error'
req.environ['cinder.context'] = context.RequestContext('fake', 'fake')
resp = req.get_response(app())
# request is not authorized
- self.assertEquals(resp.status_int, 403)
+ self.assertEqual(resp.status_int, 403)
volume = db.volume_get(context.get_admin_context(), volume['id'])
# status is still 'error'
- self.assertEquals(volume['status'], 'error')
+ self.assertEqual(volume['status'], 'error')
def test_malformed_reset_status_body(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# bad request
- self.assertEquals(resp.status_int, 400)
+ self.assertEqual(resp.status_int, 400)
volume = db.volume_get(ctx, volume['id'])
# status is still 'available'
- self.assertEquals(volume['status'], 'available')
+ self.assertEqual(volume['status'], 'available')
def test_invalid_status_for_volume(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# bad request
- self.assertEquals(resp.status_int, 400)
+ self.assertEqual(resp.status_int, 400)
volume = db.volume_get(ctx, volume['id'])
# status is still 'available'
- self.assertEquals(volume['status'], 'available')
+ self.assertEqual(volume['status'], 'available')
def test_reset_status_for_missing_volume(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# not found
- self.assertEquals(resp.status_int, 404)
+ self.assertEqual(resp.status_int, 404)
self.assertRaises(exception.NotFound, db.volume_get, ctx,
'missing-volume-id')
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
volume = db.volume_get(ctx, volume['id'])
# attach_status changed to 'detached'
- self.assertEquals(volume['attach_status'], 'detached')
+ self.assertEqual(volume['attach_status'], 'detached')
# status un-modified
- self.assertEquals(volume['status'], 'available')
+ self.assertEqual(volume['status'], 'available')
def test_invalid_reset_attached_status(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# bad request
- self.assertEquals(resp.status_int, 400)
+ self.assertEqual(resp.status_int, 400)
volume = db.volume_get(ctx, volume['id'])
# status and attach_status un-modified
- self.assertEquals(volume['status'], 'available')
- self.assertEquals(volume['attach_status'], 'detached')
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volume['attach_status'], 'detached')
def test_snapshot_reset_status(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
snapshot = db.snapshot_get(ctx, snapshot['id'])
# status changed to 'error'
- self.assertEquals(snapshot['status'], 'error')
+ self.assertEqual(snapshot['status'], 'error')
def test_invalid_status_for_snapshot(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 400)
+ self.assertEqual(resp.status_int, 400)
snapshot = db.snapshot_get(ctx, snapshot['id'])
# status is still 'available'
- self.assertEquals(snapshot['status'], 'available')
+ self.assertEqual(snapshot['status'], 'available')
def test_force_delete(self):
# admin context
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
# volume is deleted
self.assertRaises(exception.NotFound, db.volume_get, ctx, volume['id'])
# make request
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
# snapshot is deleted
self.assertRaises(exception.NotFound, db.snapshot_get, ctx,
snapshot['id'])
mountpoint, 'rw')
# volume is attached
volume = db.volume_get(ctx, volume['id'])
- self.assertEquals(volume['status'], 'in-use')
- self.assertEquals(volume['instance_uuid'], stubs.FAKE_UUID)
- self.assertEquals(volume['attached_host'], None)
- self.assertEquals(volume['mountpoint'], mountpoint)
- self.assertEquals(volume['attach_status'], 'attached')
+ self.assertEqual(volume['status'], 'in-use')
+ self.assertEqual(volume['instance_uuid'], stubs.FAKE_UUID)
+ self.assertEqual(volume['attached_host'], None)
+ self.assertEqual(volume['mountpoint'], mountpoint)
+ self.assertEqual(volume['attach_status'], 'attached')
admin_metadata = volume['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'rw')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'rw')
conn_info = self.volume_api.initialize_connection(ctx,
volume, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'rw')
+ self.assertEqual(conn_info['data']['access_mode'], 'rw')
# build request to force detach
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
# make request
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
volume = db.volume_get(ctx, volume['id'])
# status changed to 'available'
- self.assertEquals(volume['status'], 'available')
- self.assertEquals(volume['instance_uuid'], None)
- self.assertEquals(volume['attached_host'], None)
- self.assertEquals(volume['mountpoint'], None)
- self.assertEquals(volume['attach_status'], 'detached')
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volume['instance_uuid'], None)
+ self.assertEqual(volume['attached_host'], None)
+ self.assertEqual(volume['mountpoint'], None)
+ self.assertEqual(volume['attach_status'], 'detached')
admin_metadata = volume['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
# cleanup
svc.stop()
self.volume_api.attach(ctx, volume, None, host_name, mountpoint, 'ro')
# volume is attached
volume = db.volume_get(ctx, volume['id'])
- self.assertEquals(volume['status'], 'in-use')
- self.assertEquals(volume['instance_uuid'], None)
- self.assertEquals(volume['attached_host'], host_name)
- self.assertEquals(volume['mountpoint'], mountpoint)
- self.assertEquals(volume['attach_status'], 'attached')
+ self.assertEqual(volume['status'], 'in-use')
+ self.assertEqual(volume['instance_uuid'], None)
+ self.assertEqual(volume['attached_host'], host_name)
+ self.assertEqual(volume['mountpoint'], mountpoint)
+ self.assertEqual(volume['attach_status'], 'attached')
admin_metadata = volume['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'ro')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'ro')
conn_info = self.volume_api.initialize_connection(ctx,
volume, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'ro')
+ self.assertEqual(conn_info['data']['access_mode'], 'ro')
# build request to force detach
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
# make request
resp = req.get_response(app())
# request is accepted
- self.assertEquals(resp.status_int, 202)
+ self.assertEqual(resp.status_int, 202)
volume = db.volume_get(ctx, volume['id'])
# status changed to 'available'
- self.assertEquals(volume['status'], 'available')
- self.assertEquals(volume['instance_uuid'], None)
- self.assertEquals(volume['attached_host'], None)
- self.assertEquals(volume['mountpoint'], None)
- self.assertEquals(volume['attach_status'], 'detached')
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volume['instance_uuid'], None)
+ self.assertEqual(volume['attached_host'], None)
+ self.assertEqual(volume['mountpoint'], None)
+ self.assertEqual(volume['attach_status'], 'detached')
admin_metadata = volume['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
# cleanup
svc.stop()
mountpoint, 'rw')
conn_info = self.volume_api.initialize_connection(ctx,
volume, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'rw')
+ self.assertEqual(conn_info['data']['access_mode'], 'rw')
self.assertRaises(exception.InvalidVolume,
self.volume_api.attach,
ctx,
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
# verify status
- self.assertEquals(resp.status_int, expected_status)
+ self.assertEqual(resp.status_int, expected_status)
volume = db.volume_get(admin_ctx, volume['id'])
return volume
ctx = context.RequestContext('admin', 'fake', True)
volume = self._migrate_volume_prep()
volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
- self.assertEquals(volume['migration_status'], 'starting')
+ self.assertEqual(volume['migration_status'], 'starting')
def test_migrate_volume_as_non_admin(self):
expected_status = 403
resp = req.get_response(app())
resp_dict = ast.literal_eval(resp.body)
# verify status
- self.assertEquals(resp.status_int, expected_status)
+ self.assertEqual(resp.status_int, expected_status)
if expected_id:
- self.assertEquals(resp_dict['save_volume_id'], expected_id)
+ self.assertEqual(resp_dict['save_volume_id'], expected_id)
else:
self.assertNotIn('save_volume_id', resp_dict)
backup = dom.getElementsByTagName('backup')
name = backup.item(0).getAttribute('name')
container_name = backup.item(0).getAttribute('container')
- self.assertEquals(container_name.strip(), "volumebackups")
- self.assertEquals(name.strip(), "test_backup")
+ self.assertEqual(container_name.strip(), "volumebackups")
+ self.assertEqual(name.strip(), "test_backup")
db.backup_destroy(context.get_admin_context(), backup_id)
db.volume_destroy(context.get_admin_context(), volume_id)
self.stubs.Set(qos_specs, 'delete',
return_qos_specs_delete)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/1')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.controller.delete(req, 1)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_qos_specs_delete_not_found(self):
self.stubs.Set(qos_specs, 'get_qos_specs',
self.stubs.Set(qos_specs, 'delete',
return_qos_specs_delete)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/777')
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
req, '777')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_qos_specs_delete_inuse(self):
self.stubs.Set(qos_specs, 'get_qos_specs',
return_qos_specs_delete)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/666')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
req, '666')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_qos_specs_delete_inuse_force(self):
self.stubs.Set(qos_specs, 'get_qos_specs',
return_qos_specs_delete)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/666?force=True')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.assertRaises(webob.exc.HTTPInternalServerError,
self.controller.delete,
req, '666')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_create(self):
self.stubs.Set(qos_specs, 'create',
"key1": "value1"}}
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
res_dict = self.controller.create(req, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
self.assertEqual('qos_specs_1', res_dict['qos_specs']['name'])
def test_create_conflict(self):
"key1": "value1"}}
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.assertRaises(webob.exc.HTTPConflict,
self.controller.create, req, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_create_failed(self):
self.stubs.Set(qos_specs, 'create',
"key1": "value1"}}
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.assertRaises(webob.exc.HTTPInternalServerError,
self.controller.create, req, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def _create_qos_specs_bad_body(self, body):
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs')
self.stubs.Set(qos_specs, 'update',
return_qos_specs_update)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/555')
body = {'qos_specs': {'key1': 'value1',
'key2': 'value2'}}
res = self.controller.update(req, '555', body)
self.assertDictMatch(res, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_update_not_found(self):
self.stubs.Set(qos_specs, 'update',
return_qos_specs_update)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/777')
body = {'qos_specs': {'key1': 'value1',
'key2': 'value2'}}
self.assertRaises(webob.exc.HTTPNotFound, self.controller.update,
req, '777', body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_update_invalid_input(self):
self.stubs.Set(qos_specs, 'update',
return_qos_specs_update)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/888')
body = {'qos_specs': {'key1': 'value1',
'key2': 'value2'}}
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
req, '888', body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_update_failed(self):
self.stubs.Set(qos_specs, 'update',
return_qos_specs_update)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/qos-specs/999')
body = {'qos_specs': {'key1': 'value1',
'key2': 'value2'}}
self.assertRaises(webob.exc.HTTPInternalServerError,
self.controller.update,
req, '999', body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_show(self):
self.stubs.Set(qos_specs, 'get_qos_specs',
body = make_body(root=False, tenant_id=None)
for node in tree:
self.assertIn(node.tag, body)
- self.assertEquals(str(body[node.tag]), node.text)
+ self.assertEqual(str(body[node.tag]), node.text)
self.stubs.Set(cinder.db, 'volume_type_extra_specs_delete',
delete_volume_type_extra_specs)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank(self.api_path + '/key5')
self.controller.delete(req, 1, 'key5')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_delete_not_found(self):
self.stubs.Set(cinder.db, 'volume_type_extra_specs_delete',
return_create_volume_type_extra_specs)
body = {"extra_specs": {"key1": "value1"}}
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank(self.api_path)
res_dict = self.controller.create(req, 1, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
self.assertEqual('value1', res_dict['extra_specs']['key1'])
return_create_volume_type_extra_specs)
body = {"key1": "value1"}
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank(self.api_path + '/key1')
res_dict = self.controller.update(req, 1, 'key1', body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
self.assertEqual('value1', res_dict['key1'])
return_volume_types_destroy)
req = fakes.HTTPRequest.blank('/v2/fake/types/1')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.controller._delete(req, 1)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_volume_types_delete_not_found(self):
self.stubs.Set(volume_types, 'get_volume_type',
self.stubs.Set(volume_types, 'destroy',
return_volume_types_destroy)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
req = fakes.HTTPRequest.blank('/v2/fake/types/777')
self.assertRaises(webob.exc.HTTPNotFound, self.controller._delete,
req, '777')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
def test_create(self):
self.stubs.Set(volume_types, 'create',
"extra_specs": {"key1": "value1"}}}
req = fakes.HTTPRequest.blank('/v2/fake/types')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
res_dict = self.controller._create(req, body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
self.assertEqual(1, len(res_dict))
self.assertEqual('vol_type_1', res_dict['volume_type']['name'])
dom = minidom.parseString(res.body)
transfer_xml = dom.getElementsByTagName('transfer')
name = transfer_xml.item(0).getAttribute('name')
- self.assertEquals(name.strip(), "test_transfer")
+ self.assertEqual(name.strip(), "test_transfer")
db.transfer_destroy(context.get_admin_context(), transfer['id'])
db.volume_destroy(context.get_admin_context(), volume_id)
'provider': provider,
'volume_type_id': volume_type['id']}}
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
res = self._get_response(volume_type)
res_dict = json.loads(res.body)
self.assertEqual(200, res.status_code)
req_headers='application/json')
res_dict = json.loads(res.body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
# check response
self.assertIn('encryption', res_dict)
req_headers='application/json')
res_dict = json.loads(res.body)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.assertEqual(404, res.status_code)
expected = {
for chunk in sizelimit.LimitingReader(data, BYTES):
bytes_read += len(chunk)
- self.assertEquals(bytes_read, BYTES)
+ self.assertEqual(bytes_read, BYTES)
bytes_read = 0
data = StringIO.StringIO("*" * BYTES)
bytes_read += 1
byte = reader.read(1)
- self.assertEquals(bytes_read, BYTES)
+ self.assertEqual(bytes_read, BYTES)
def test_limiting_reader_fails(self):
BYTES = 1024
'display_description': 'Default description',
'metadata': {},
}}
- self.assertEquals(expected, res_dict)
+ self.assertEqual(expected, res_dict)
def test_snapshot_update_missing_body(self):
body = {}
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 2)
for snapshot in resp['snapshots']:
- self.assertEquals(snapshot['status'], 'available')
+ self.assertEqual(snapshot['status'], 'available')
# no match
req = fakes.HTTPRequest.blank('/v1/snapshots?status=error')
resp = self.controller.index(req)
req = fakes.HTTPRequest.blank('/v1/snapshots?display_name=backup2')
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 1)
- self.assertEquals(resp['snapshots'][0]['display_name'], 'backup2')
+ self.assertEqual(resp['snapshots'][0]['display_name'], 'backup2')
# filter no match
req = fakes.HTTPRequest.blank('/v1/snapshots?display_name=backup4')
resp = self.controller.index(req)
'id': '1',
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'size': 1}}
- self.assertEquals(res_dict, expected)
+ self.assertEqual(res_dict, expected)
def test_volume_update_metadata(self):
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'size': 1
}}
- self.assertEquals(res_dict, expected)
+ self.assertEqual(res_dict, expected)
def test_update_empty_body(self):
body = {}
use_admin_context=is_admin)
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], 2)
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], 2)
#admin case
volume_detail_limit_offset(is_admin=True)
size="1"></volume>"""
request = self.deserializer.deserialize(self_request)
expected = {"volume": {"size": "1", }, }
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_display_name(self):
self_request = """
"display_name": "Volume-xml",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_display_description(self):
self_request = """
"display_description": "description",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_volume_type(self):
self_request = """
"volume_type": "289da7f8-6440-407c-9fb4-7db01ec49164",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_availability_zone(self):
self_request = """
"availability_zone": "us-east1",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_metadata(self):
self_request = """
},
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_full_volume(self):
self_request = """
},
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_imageref(self):
self_request = """
"imageRef": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
def test_snapshot_id(self):
self_request = """
"snapshot_id": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
def test_source_volid(self):
self_request = """
"source_volid": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
class VolumesUnprocessableEntityTestCase(test.TestCase):
'metadata': {},
}
}
- self.assertEquals(expected, res_dict)
+ self.assertEqual(expected, res_dict)
def test_snapshot_update_missing_body(self):
body = {}
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 2)
for snapshot in resp['snapshots']:
- self.assertEquals(snapshot['status'], 'available')
+ self.assertEqual(snapshot['status'], 'available')
# no match
req = fakes.HTTPRequest.blank('/v2/snapshots?status=error')
resp = self.controller.index(req)
req = fakes.HTTPRequest.blank('/v2/snapshots?name=backup2')
resp = self.controller.index(req)
self.assertEqual(len(resp['snapshots']), 1)
- self.assertEquals(resp['snapshots'][0]['name'], 'backup2')
+ self.assertEqual(resp['snapshots'][0]['name'], 'backup2')
# filter no match
req = fakes.HTTPRequest.blank('/v2/snapshots?name=backup4')
resp = self.controller.index(req)
req = fakes.HTTPRequest.blank('/v2/volumes')
res_dict = self.controller.create(req, body)
volume_id = res_dict['volume']['id']
- self.assertEquals(len(res_dict), 1)
+ self.assertEqual(len(res_dict), 1)
self.stubs.Set(volume_api.API, 'get_all',
lambda *args, **kwargs:
],
}
}
- self.assertEquals(res_dict, expected)
+ self.assertEqual(res_dict, expected)
def test_volume_update_metadata(self):
self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)
}
],
}}
- self.assertEquals(res_dict, expected)
+ self.assertEqual(res_dict, expected)
def test_update_empty_body(self):
body = {}
req = fakes.HTTPRequest.blank('/v2/volumes?marker=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 2)
- self.assertEquals(volumes[0]['id'], 1)
- self.assertEquals(volumes[1]['id'], 2)
+ self.assertEqual(len(volumes), 2)
+ self.assertEqual(volumes[0]['id'], 1)
+ self.assertEqual(volumes[1]['id'], 2)
def test_volume_index_limit(self):
req = fakes.HTTPRequest.blank('/v2/volumes?limit=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
+ self.assertEqual(len(volumes), 1)
def test_volume_index_limit_negative(self):
req = fakes.HTTPRequest.blank('/v2/volumes?limit=-1')
req = fakes.HTTPRequest.blank('/v2/volumes?marker=1&limit=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], '1')
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], '1')
def test_volume_index_limit_offset(self):
def stub_volume_get_all_by_project(context, project_id, marker, limit,
req = fakes.HTTPRequest.blank('/v2/volumes?limit=2&offset=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], 2)
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], 2)
req = fakes.HTTPRequest.blank('/v2/volumes?limit=-1&offset=1')
self.assertRaises(exception.InvalidInput,
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 2)
- self.assertEquals(volumes[0]['id'], 1)
- self.assertEquals(volumes[1]['id'], 2)
+ self.assertEqual(len(volumes), 2)
+ self.assertEqual(volumes[0]['id'], 1)
+ self.assertEqual(volumes[1]['id'], 2)
def test_volume_detail_limit(self):
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
+ self.assertEqual(len(volumes), 1)
def test_volume_detail_limit_negative(self):
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=-1')
req = fakes.HTTPRequest.blank('/v2/volumes/detail?marker=1&limit=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], '1')
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], '1')
def test_volume_detail_limit_offset(self):
def stub_volume_get_all_by_project(context, project_id, marker, limit,
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=2&offset=1')
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], 2)
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], 2)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=2&offset=1',
use_admin_context=True)
res_dict = self.controller.index(req)
volumes = res_dict['volumes']
- self.assertEquals(len(volumes), 1)
- self.assertEquals(volumes[0]['id'], 2)
+ self.assertEqual(len(volumes), 1)
+ self.assertEqual(volumes[0]['id'], 2)
req = fakes.HTTPRequest.blank('/v2/volumes/detail?limit=-1&offset=1')
self.assertRaises(exception.InvalidInput,
"size": "1",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_name(self):
self_request = """
"name": "Volume-xml",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_description(self):
self_request = """
"description": "description",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_volume_type(self):
self_request = """
"volume_type": "289da7f8-6440-407c-9fb4-7db01ec49164",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_availability_zone(self):
self_request = """
"availability_zone": "us-east1",
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_metadata(self):
self_request = """
},
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_full_volume(self):
self_request = """
},
},
}
- self.assertEquals(request['body'], expected)
+ self.assertEqual(request['body'], expected)
def test_imageref(self):
self_request = """
"imageRef": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
def test_snapshot_id(self):
self_request = """
"snapshot_id": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
def test_source_volid(self):
self_request = """
"source_volid": "4a90189d-d702-4c7c-87fc-6608c554d737",
},
}
- self.assertEquals(expected, request['body'])
+ self.assertEqual(expected, request['body'])
expected = ['/dev/disk/by-path/' + dev for dev in self.devlist]
driver = host_driver.HostDriver()
actual = driver.get_all_block_devices()
- self.assertEquals(expected, actual)
+ self.assertEqual(expected, actual)
class ISCSIConnectorTestCase(ConnectorTestCase):
self.stubs.Set(self.connector, '_execute', initiator_no_file)
initiator = self.connector.get_initiator()
- self.assertEquals(initiator, None)
+ self.assertEqual(initiator, None)
self.stubs.Set(self.connector, '_execute', initiator_get_text)
initiator = self.connector.get_initiator()
- self.assertEquals(initiator, 'iqn.1234-56.foo.bar:01:23456789abc')
+ self.assertEqual(initiator, 'iqn.1234-56.foo.bar:01:23456789abc')
@test.testtools.skipUnless(os.path.exists('/dev/disk/by-path'),
'Test requires /dev/disk/by-path')
connection_info = self.iscsi_connection(vol, location, iqn)
device = self.connector.connect_volume(connection_info['data'])
dev_str = '/dev/disk/by-path/ip-%s-iscsi-%s-lun-1' % (location, iqn)
- self.assertEquals(device['type'], 'block')
- self.assertEquals(device['path'], dev_str)
+ self.assertEqual(device['type'], 'block')
+ self.assertEqual(device['path'], dev_str)
self.connector.disconnect_volume(connection_info['data'], device)
expected_commands = [('iscsiadm -m node -T %s -p %s' %
exp_wwn = wwn[0] if isinstance(wwn, list) else wwn
dev_str = ('/dev/disk/by-path/pci-0000:05:00.2-fc-0x%s-lun-1' %
exp_wwn)
- self.assertEquals(dev_info['type'], 'block')
- self.assertEquals(dev_info['path'], dev_str)
+ self.assertEqual(dev_info['type'], 'block')
+ self.assertEqual(dev_info['path'], dev_str)
self.connector.disconnect_volume(connection_info['data'], dev_info)
expected_commands = []
self.lfc.rescan_hosts(hbas)
expected_commands = ['tee -a /sys/class/scsi_host/foo/scan',
'tee -a /sys/class/scsi_host/bar/scan']
- self.assertEquals(expected_commands, self.cmds)
+ self.assertEqual(expected_commands, self.cmds)
def test_get_fc_hbas_fail(self):
def fake_exec1(a, b, c, d, run_as_root=True, root_helper='sudo'):
self.stubs.Set(self.lfc, "_execute", fake_exec1)
hbas = self.lfc.get_fc_hbas()
- self.assertEquals(0, len(hbas))
+ self.assertEqual(0, len(hbas))
self.stubs.Set(self.lfc, "_execute", fake_exec2)
hbas = self.lfc.get_fc_hbas()
- self.assertEquals(0, len(hbas))
+ self.assertEqual(0, len(hbas))
def test_get_fc_hbas(self):
def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):
return SYSTOOL_FC, None
self.stubs.Set(self.lfc, "_execute", fake_exec)
hbas = self.lfc.get_fc_hbas()
- self.assertEquals(2, len(hbas))
+ self.assertEqual(2, len(hbas))
hba1 = hbas[0]
- self.assertEquals(hba1["ClassDevice"], "host0")
+ self.assertEqual(hba1["ClassDevice"], "host0")
hba2 = hbas[1]
- self.assertEquals(hba2["ClassDevice"], "host2")
+ self.assertEqual(hba2["ClassDevice"], "host2")
def test_get_fc_hbas_info(self):
def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):
'host_device': 'host2',
'node_name': '50014380242b9753',
'port_name': '50014380242b9752'}, ]
- self.assertEquals(expected_info, hbas_info)
+ self.assertEqual(expected_info, hbas_info)
def test_get_fc_wwpns(self):
def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):
self.stubs.Set(self.lfc, "_execute", fake_exec)
wwpns = self.lfc.get_fc_wwpns()
expected_wwpns = ['50014380242b9750', '50014380242b9752']
- self.assertEquals(expected_wwpns, wwpns)
+ self.assertEqual(expected_wwpns, wwpns)
def test_get_fc_wwnns(self):
def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):
self.stubs.Set(self.lfc, "_execute", fake_exec)
wwnns = self.lfc.get_fc_wwpns()
expected_wwnns = ['50014380242b9750', '50014380242b9752']
- self.assertEquals(expected_wwnns, wwnns)
+ self.assertEqual(expected_wwnns, wwnns)
SYSTOOL_FC = """
Class = "fc_host"
def test_echo_scsi_command(self):
self.linuxscsi.echo_scsi_command("/some/path", "1")
expected_commands = ['tee -a /some/path']
- self.assertEquals(expected_commands, self.cmds)
+ self.assertEqual(expected_commands, self.cmds)
def test_get_name_from_path(self):
device_name = "/dev/sdc"
db.volume_update(self.ctxt, vol_ref['id'], {'name_id': 'fake'})
snap_ref = testutils.create_snapshot(self.ctxt, vol_ref['id'])
expected_name = CONF.volume_name_template % 'fake'
- self.assertEquals(snap_ref['volume_name'], expected_name)
+ self.assertEqual(snap_ref['volume_name'], expected_name)
specs_id = self._create_qos_specs('NewName')
query_id = db.qos_specs_get_by_name(
self.ctxt, 'NewName')['id']
- self.assertEquals(specs_id, query_id)
+ self.assertEqual(specs_id, query_id)
def test_qos_specs_get(self):
value = dict(consumer='front-end',
specs_id = self._create_qos_specs('FakeQos')
db.volume_type_qos_associate(self.ctxt, type_id, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 1)
- self.assertEquals(res[0]['id'], type_id)
- self.assertEquals(res[0]['qos_specs_id'], specs_id)
+ self.assertEqual(len(res), 1)
+ self.assertEqual(res[0]['id'], type_id)
+ self.assertEqual(res[0]['qos_specs_id'], specs_id)
def test_qos_associations_get(self):
self.assertRaises(exception.QoSSpecsNotFound,
type_id = volume_types.create(self.ctxt, 'TypeName')['id']
specs_id = self._create_qos_specs('FakeQos')
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 0)
+ self.assertEqual(len(res), 0)
db.volume_type_qos_associate(self.ctxt, type_id, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 1)
- self.assertEquals(res[0]['id'], type_id)
- self.assertEquals(res[0]['qos_specs_id'], specs_id)
+ self.assertEqual(len(res), 1)
+ self.assertEqual(res[0]['id'], type_id)
+ self.assertEqual(res[0]['qos_specs_id'], specs_id)
type0_id = volume_types.create(self.ctxt, 'Type0Name')['id']
db.volume_type_qos_associate(self.ctxt, type0_id, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 2)
- self.assertEquals(res[0]['qos_specs_id'], specs_id)
- self.assertEquals(res[1]['qos_specs_id'], specs_id)
+ self.assertEqual(len(res), 2)
+ self.assertEqual(res[0]['qos_specs_id'], specs_id)
+ self.assertEqual(res[1]['qos_specs_id'], specs_id)
def test_qos_specs_disassociate(self):
type_id = volume_types.create(self.ctxt, 'TypeName')['id']
specs_id = self._create_qos_specs('FakeQos')
db.volume_type_qos_associate(self.ctxt, type_id, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(res[0]['id'], type_id)
- self.assertEquals(res[0]['qos_specs_id'], specs_id)
+ self.assertEqual(res[0]['id'], type_id)
+ self.assertEqual(res[0]['qos_specs_id'], specs_id)
db.qos_specs_disassociate(self.ctxt, specs_id, type_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 0)
+ self.assertEqual(len(res), 0)
res = db.volume_type_get(self.ctxt, type_id)
- self.assertEquals(res['qos_specs_id'], None)
+ self.assertEqual(res['qos_specs_id'], None)
def test_qos_specs_disassociate_all(self):
specs_id = self._create_qos_specs('FakeQos')
db.volume_type_qos_associate(self.ctxt, type3_id, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 3)
+ self.assertEqual(len(res), 3)
db.qos_specs_disassociate_all(self.ctxt, specs_id)
res = db.qos_specs_associations_get(self.ctxt, specs_id)
- self.assertEquals(len(res), 0)
+ self.assertEqual(len(res), 0)
def test_qos_specs_update(self):
name = 'FakeName'
xfer_id1 = self._create_transfer(volume_id1)
xfer = db.transfer_get(self.ctxt, xfer_id1)
- self.assertEquals(xfer.volume_id, volume_id1, "Unexpected volume_id")
+ self.assertEqual(xfer.volume_id, volume_id1, "Unexpected volume_id")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
db.transfer_get, nctxt, xfer_id1)
xfer = db.transfer_get(nctxt.elevated(), xfer_id1)
- self.assertEquals(xfer.volume_id, volume_id1, "Unexpected volume_id")
+ self.assertEqual(xfer.volume_id, volume_id1, "Unexpected volume_id")
def test_transfer_get_all(self):
volume_id1 = utils.create_volume(self.ctxt)['id']
ids.append(self.service.create(self.context, fixture)['id'])
image_metas = self.service.detail(self.context, marker=ids[1])
- self.assertEquals(len(image_metas), 8)
+ self.assertEqual(len(image_metas), 8)
i = 2
for meta in image_metas:
expected = {
ids.append(self.service.create(self.context, fixture)['id'])
image_metas = self.service.detail(self.context, limit=5)
- self.assertEquals(len(image_metas), 5)
+ self.assertEqual(len(image_metas), 5)
def test_detail_default_limit(self):
fixtures = []
ids.append(self.service.create(self.context, fixture)['id'])
image_metas = self.service.detail(self.context, marker=ids[3], limit=5)
- self.assertEquals(len(image_metas), 5)
+ self.assertEqual(len(image_metas), 5)
i = 4
for meta in image_metas:
expected = {
self.service.update(self.context, image_id, fixture)
new_image_data = self.service.show(self.context, image_id)
- self.assertEquals('new image name', new_image_data['name'])
+ self.assertEqual('new image name', new_image_data['name'])
def test_delete(self):
fixture1 = self._make_fixture(name='test image 1')
fixtures = [fixture1, fixture2]
num_images = len(self.service.detail(self.context))
- self.assertEquals(0, num_images)
+ self.assertEqual(0, num_images)
ids = []
for fixture in fixtures:
ids.append(new_id)
num_images = len(self.service.detail(self.context))
- self.assertEquals(2, num_images)
+ self.assertEqual(2, num_images)
self.service.delete(self.context, ids[0])
num_images = len(self.service.detail(self.context))
- self.assertEquals(1, num_images)
+ self.assertEqual(1, num_images)
def test_show_passes_through_to_client(self):
fixture = self._make_fixture(name='image1', is_public=True)
image_id = self.service.create(self.context, fixture)['id']
(service, same_id) = glance.get_remote_image_service(self.context,
image_id)
- self.assertEquals(same_id, image_id)
+ self.assertEqual(same_id, image_id)
def test_glance_client_image_ref(self):
fixture = self._make_fixture(name='test image')
image_url = 'http://something-less-likely/%s' % image_id
(service, same_id) = glance.get_remote_image_service(self.context,
image_url)
- self.assertEquals(same_id, image_id)
+ self.assertEqual(same_id, image_id)
self.assertEquals(service._client.netloc,
'something-less-likely')
for ipv6_url in ('[::1]', '::1', '[::1]:444'):
image_url = 'http://%s/%s' % (ipv6_url, image_id)
(service, same_id) = glance.get_remote_image_service(self.context,
image_url)
- self.assertEquals(same_id, image_id)
- self.assertEquals(service._client.netloc, ipv6_url)
+ self.assertEqual(same_id, image_id)
+ self.assertEqual(service._client.netloc, ipv6_url)
class TestGlanceClientVersion(test.TestCase):
id=created_volume_id)
LOG.debug("Create_Actions: %s" % create_actions)
- self.assertEquals(1, len(create_actions))
+ self.assertEqual(1, len(create_actions))
create_action = create_actions[0]
- self.assertEquals(create_action['id'], created_volume_id)
- self.assertEquals(create_action['availability_zone'], 'nova')
- self.assertEquals(create_action['size'], 1)
+ self.assertEqual(create_action['id'], created_volume_id)
+ self.assertEqual(create_action['availability_zone'], 'nova')
+ self.assertEqual(create_action['size'], 1)
export_actions = fake_driver.LoggingVolumeDriver.logs_like(
'create_export',
id=created_volume_id)
- self.assertEquals(1, len(export_actions))
+ self.assertEqual(1, len(export_actions))
export_action = export_actions[0]
- self.assertEquals(export_action['id'], created_volume_id)
- self.assertEquals(export_action['availability_zone'], 'nova')
+ self.assertEqual(export_action['id'], created_volume_id)
+ self.assertEqual(export_action['availability_zone'], 'nova')
delete_actions = fake_driver.LoggingVolumeDriver.logs_like(
'delete_volume',
id=created_volume_id)
- self.assertEquals(1, len(delete_actions))
+ self.assertEqual(1, len(delete_actions))
delete_action = export_actions[0]
- self.assertEquals(delete_action['id'], created_volume_id)
+ self.assertEqual(delete_action['id'], created_volume_id)
def test_create_volume_with_metadata(self):
"""Creates a volume with metadata."""
super(SymmetricKeyTestCase, self).setUp()
def test_get_algorithm(self):
- self.assertEquals(self.key.get_algorithm(), self.algorithm)
+ self.assertEqual(self.key.get_algorithm(), self.algorithm)
def test_get_format(self):
- self.assertEquals(self.key.get_format(), 'RAW')
+ self.assertEqual(self.key.get_format(), 'RAW')
def test_get_encoded(self):
- self.assertEquals(self.key.get_encoded(), self.encoded)
+ self.assertEqual(self.key.get_encoded(), self.encoded)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
- self.assertEquals({}, fake.get_configuration())
+ self.assertEqual({}, fake.get_configuration())
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_first_time_empty_file(self):
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
- self.assertEquals({}, fake.get_configuration('foo.json'))
+ self.assertEqual({}, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)
def test_get_configuration_first_time_happy_day(self):
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
- self.assertEquals(data, fake.get_configuration('foo.json'))
+ self.assertEqual(data, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)
def test_get_configuration_second_time_no_change(self):
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
data, jdata)
- self.assertEquals(data, fake.get_configuration('foo.json'))
+ self.assertEqual(data, fake.get_configuration('foo.json'))
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_second_time_too_fast(self):
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
old_data, jdata)
- self.assertEquals(old_data, fake.get_configuration('foo.json'))
+ self.assertEqual(old_data, fake.get_configuration('foo.json'))
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_second_time_change(self):
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
old_data, jdata)
- self.assertEquals(data, fake.get_configuration('foo.json'))
+ self.assertEqual(data, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)
self.backup_mgr.init_host()
vol1 = db.volume_get(self.ctxt, vol1_id)
- self.assertEquals(vol1['status'], 'available')
+ self.assertEqual(vol1['status'], 'available')
vol2 = db.volume_get(self.ctxt, vol2_id)
- self.assertEquals(vol2['status'], 'error_restoring')
+ self.assertEqual(vol2['status'], 'error_restoring')
backup1 = db.backup_get(self.ctxt, backup1_id)
- self.assertEquals(backup1['status'], 'error')
+ self.assertEqual(backup1['status'], 'error')
backup2 = db.backup_get(self.ctxt, backup2_id)
- self.assertEquals(backup2['status'], 'available')
+ self.assertEqual(backup2['status'], 'available')
self.assertRaises(exception.BackupNotFound,
db.backup_get,
self.ctxt,
self.ctxt,
backup_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'available')
+ self.assertEqual(vol['status'], 'available')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'error')
+ self.assertEqual(backup['status'], 'error')
def test_create_backup(self):
"""Test normal backup creation"""
self.backup_mgr.create_backup(self.ctxt, backup_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'available')
+ self.assertEqual(vol['status'], 'available')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'available')
+ self.assertEqual(backup['status'], 'available')
self.assertEqual(backup['size'], vol_size)
def test_restore_backup_with_bad_volume_status(self):
backup_id,
vol_id)
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'available')
+ self.assertEqual(backup['status'], 'available')
def test_restore_backup_with_bad_backup_status(self):
"""Test error handling when restoring a backup with a backup
backup_id,
vol_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'error')
+ self.assertEqual(vol['status'], 'error')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'error')
+ self.assertEqual(backup['status'], 'error')
def test_restore_backup_with_driver_error(self):
"""Test error handling when an error occurs during backup restore"""
backup_id,
vol_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'error_restoring')
+ self.assertEqual(vol['status'], 'error_restoring')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'available')
+ self.assertEqual(backup['status'], 'available')
def test_restore_backup_with_bad_service(self):
"""Test error handling when attempting a restore of a backup
backup_id,
vol_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'error')
+ self.assertEqual(vol['status'], 'error')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'available')
+ self.assertEqual(backup['status'], 'available')
def test_restore_backup(self):
"""Test normal backup restoration"""
self.backup_mgr.restore_backup(self.ctxt, backup_id, vol_id)
vol = db.volume_get(self.ctxt, vol_id)
- self.assertEquals(vol['status'], 'available')
+ self.assertEqual(vol['status'], 'available')
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'available')
+ self.assertEqual(backup['status'], 'available')
def test_delete_backup_with_bad_backup_status(self):
"""Test error handling when deleting a backup with a backup
self.ctxt,
backup_id)
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'error')
+ self.assertEqual(backup['status'], 'error')
def test_delete_backup_with_error(self):
"""Test error handling when an error occurs during backup deletion."""
self.ctxt,
backup_id)
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'error')
+ self.assertEqual(backup['status'], 'error')
def test_delete_backup_with_bad_service(self):
"""Test error handling when attempting a delete of a backup
self.ctxt,
backup_id)
backup = db.backup_get(self.ctxt, backup_id)
- self.assertEquals(backup['status'], 'error')
+ self.assertEqual(backup['status'], 'error')
def test_delete_backup_with_no_service(self):
"""Test error handling when attempting a delete of a backup
oldformat, features = self.service._get_rbd_support()
self.assertTrue(oldformat)
- self.assertEquals(features, 0)
+ self.assertEqual(features, 0)
self.service.rbd.RBD_FEATURE_LAYERING = 1
oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
- self.assertEquals(features, 1)
+ self.assertEqual(features, 1)
self.service.rbd.RBD_FEATURE_STRIPINGV2 = 2
oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
- self.assertEquals(features, 1 | 2)
+ self.assertEqual(features, 1 | 2)
def _set_common_backup_stubs(self, service):
self.stubs.Set(self.service, '_get_rbd_support', lambda: (True, 3))
snap = self.service._get_most_recent_snap(self.service.rbd.Image())
- self.assertEquals(last, snap)
+ self.assertEqual(last, snap)
def test_get_backup_snap_name(self):
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
name = self.service._get_backup_snap_name(self.service.rbd.Image(),
'base_foo',
self.backup_id)
- self.assertEquals(name, snap_name)
+ self.assertEqual(name, snap_name)
def test_get_backup_snaps(self):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_transfer_data_from_rbd_to_rbd(self):
def rbd_size(inst):
'dest_foo', self.length)
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_transfer_data_from_file_to_rbd(self):
self._set_common_backup_stubs(self.service)
rbd_io, 'dest_foo', self.length)
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_transfer_data_from_file_to_file(self):
self._set_common_backup_stubs(self.service)
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_backup_volume_from_file(self):
self._create_volume_db_entry(self.volume_id, 1)
self.service.backup(backup, self.volume_file)
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_get_backup_base_name(self):
name = self.service._get_backup_base_name(self.volume_id,
diff_format=True)
- self.assertEquals(name, "volume-%s.backup.base" % (self.volume_id))
+ self.assertEqual(name, "volume-%s.backup.base" % (self.volume_id))
self.assertRaises(exception.InvalidParameterValue,
self.service._get_backup_base_name,
self.service.backup(backup, rbd_io)
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_backup_vol_length_0(self):
self._set_common_backup_stubs(self.service)
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
- self.assertEquals(checksum.digest(), self.checksum.digest())
+ self.assertEqual(checksum.digest(), self.checksum.digest())
def test_create_base_image_if_not_exists(self):
pass
rem = self.service._delete_backup_snapshots(mock_rados(), base_name,
self.backup_id)
- self.assertEquals(rem, 0)
+ self.assertEqual(rem, 0)
def test_try_delete_base_image_diff_format(self):
# don't create volume db entry since it should not be required
resp = self.service._diff_restore_allowed('foo', backup, alt_volume,
rbd_io, mock_rados())
- self.assertEquals(resp, is_allowed)
+ self.assertEqual(resp, is_allowed)
def test_diff_restore_allowed_false(self):
not_allowed = (False, None)
resp = self.service._diff_restore_allowed('foo', backup,
original_volume, rbd_io,
mock_rados())
- self.assertEquals(resp, not_allowed)
+ self.assertEqual(resp, not_allowed)
def tearDown(self):
self.volume_file.close()
backup = db.backup_get(self.ctxt, 123)
service.backup(backup, self.volume_file)
backup = db.backup_get(self.ctxt, 123)
- self.assertEquals(backup['container'], 'volumebackups')
+ self.assertEqual(backup['container'], 'volumebackups')
def test_backup_custom_container(self):
container_name = 'fake99'
backup = db.backup_get(self.ctxt, 123)
service.backup(backup, self.volume_file)
backup = db.backup_get(self.ctxt, 123)
- self.assertEquals(backup['container'], container_name)
+ self.assertEqual(backup['container'], container_name)
def test_create_backup_container_check_wraps_socket_error(self):
container_name = 'socket_error_on_head'
def test_get_compressor(self):
service = SwiftBackupDriver(self.ctxt)
compressor = service._get_compressor('None')
- self.assertEquals(compressor, None)
+ self.assertEqual(compressor, None)
compressor = service._get_compressor('zlib')
- self.assertEquals(compressor, zlib)
+ self.assertEqual(compressor, zlib)
compressor = service._get_compressor('bz2')
- self.assertEquals(compressor, bz2)
+ self.assertEqual(compressor, bz2)
self.assertRaises(ValueError, service._get_compressor, 'fake')
def test_check_container_exists(self):
service = SwiftBackupDriver(self.ctxt)
exists = service._check_container_exists('fake_container')
- self.assertEquals(exists, True)
+ self.assertEqual(exists, True)
exists = service._check_container_exists('missing_container')
- self.assertEquals(exists, False)
+ self.assertEqual(exists, False)
self.assertRaises(swift.ClientException,
service._check_container_exists,
'unauthorized_container')
TEST_CONNECTOR).AndReturn('data')
self.mox.ReplayAll()
data = self.drv.initialize_connection(TEST_VOLUME2, TEST_CONNECTOR)
- self.assertEquals(data, 'data')
+ self.assertEqual(data, 'data')
def test_delete_not_volume_provider_location(self):
TEST_VOLUME2 = {'provider_location': None}
path1 = self.drv.local_path(TEST_VOLUME1).AndReturn('/dev/loop1')
path2 = self.drv.local_path(TEST_VOLUME2).AndReturn('/dev/loop2')
self.mox.ReplayAll()
- self.assertEquals(self.drv._get_used_devices(), set([path1, path2]))
+ self.assertEqual(self.drv._get_used_devices(), set([path1, path2]))
def test_get_device_size(self):
dev_path = '/dev/loop1'
self.drv._execute('blockdev', '--getsz', dev_path,
run_as_root=True).AndReturn((out, None))
self.mox.ReplayAll()
- self.assertEquals(self.drv._get_device_size(dev_path), 1)
+ self.assertEqual(self.drv._get_device_size(dev_path), 1)
def test_devices_sizes(self):
self.mox.StubOutWithMock(self.drv, '_get_device_size')
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
- self.assertEquals(ctxt.is_admin, True)
+ self.assertEqual(ctxt.is_admin, True)
def test_request_context_sets_is_admin_upcase(self):
ctxt = context.RequestContext('111',
'222',
roles=['Admin', 'weasel'])
- self.assertEquals(ctxt.is_admin, True)
+ self.assertEqual(ctxt.is_admin, True)
def test_request_context_read_deleted(self):
ctxt = context.RequestContext('111',
'222',
read_deleted='yes')
- self.assertEquals(ctxt.read_deleted, 'yes')
+ self.assertEqual(ctxt.read_deleted, 'yes')
ctxt.read_deleted = 'no'
- self.assertEquals(ctxt.read_deleted, 'no')
+ self.assertEqual(ctxt.read_deleted, 'no')
def test_request_context_read_deleted_invalid(self):
self.assertRaises(ValueError,
metadata = {'a': 'b', 'c': 'd'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata})
- self.assertEquals(metadata, db.volume_metadata_get(self.ctxt, 1))
+ self.assertEqual(metadata, db.volume_metadata_get(self.ctxt, 1))
def test_volume_metadata_update(self):
metadata1 = {'a': '1', 'c': '2'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata1})
db.volume_metadata_update(self.ctxt, 1, metadata2, False)
- self.assertEquals(should_be, db.volume_metadata_get(self.ctxt, 1))
+ self.assertEqual(should_be, db.volume_metadata_get(self.ctxt, 1))
def test_volume_metadata_update_delete(self):
metadata1 = {'a': '1', 'c': '2'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata1})
db.volume_metadata_update(self.ctxt, 1, metadata2, True)
- self.assertEquals(should_be, db.volume_metadata_get(self.ctxt, 1))
+ self.assertEqual(should_be, db.volume_metadata_get(self.ctxt, 1))
class DBAPISnapshotTestCase(BaseTest):
db.snapshot_create(self.ctxt,
{'id': 1, 'volume_id': 1, 'metadata': metadata})
- self.assertEquals(metadata, db.snapshot_metadata_get(self.ctxt, 1))
+ self.assertEqual(metadata, db.snapshot_metadata_get(self.ctxt, 1))
def test_snapshot_metadata_update(self):
metadata1 = {'a': '1', 'c': '2'}
{'id': 1, 'volume_id': 1, 'metadata': metadata1})
db.snapshot_metadata_update(self.ctxt, 1, metadata2, False)
- self.assertEquals(should_be, db.snapshot_metadata_get(self.ctxt, 1))
+ self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
def test_snapshot_metadata_update_delete(self):
metadata1 = {'a': '1', 'c': '2'}
{'id': 1, 'volume_id': 1, 'metadata': metadata1})
db.snapshot_metadata_update(self.ctxt, 1, metadata2, True)
- self.assertEquals(should_be, db.snapshot_metadata_get(self.ctxt, 1))
+ self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
def test_snapshot_metadata_delete(self):
metadata = {'a': '1', 'c': '2'}
{'id': 1, 'volume_id': 1, 'metadata': metadata})
db.snapshot_metadata_delete(self.ctxt, 1, 'c')
- self.assertEquals(should_be, db.snapshot_metadata_get(self.ctxt, 1))
+ self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
class DBAPIEncryptionTestCase(BaseTest):
def test_rbd_old(self):
self._load_driver('cinder.volume.driver.RBDDriver')
- self.assertEquals(self._driver_module_name(), RBD_MODULE)
+ self.assertEqual(self._driver_module_name(), RBD_MODULE)
def test_rbd_new(self):
self._load_driver(RBD_MODULE)
- self.assertEquals(self._driver_module_name(), RBD_MODULE)
+ self.assertEqual(self._driver_module_name(), RBD_MODULE)
def test_sheepdog_old(self):
self._load_driver('cinder.volume.driver.SheepdogDriver')
- self.assertEquals(self._driver_module_name(), SHEEPDOG_MODULE)
+ self.assertEqual(self._driver_module_name(), SHEEPDOG_MODULE)
def test_sheepdog_new(self):
self._load_driver(SHEEPDOG_MODULE)
- self.assertEquals(self._driver_module_name(), SHEEPDOG_MODULE)
+ self.assertEqual(self._driver_module_name(), SHEEPDOG_MODULE)
def test_nexenta_old(self):
self._load_driver('cinder.volume.nexenta.volume.NexentaDriver')
- self.assertEquals(self._driver_module_name(), NEXENTA_MODULE)
+ self.assertEqual(self._driver_module_name(), NEXENTA_MODULE)
def test_nexenta_new(self):
self._load_driver(NEXENTA_MODULE)
- self.assertEquals(self._driver_module_name(), NEXENTA_MODULE)
+ self.assertEqual(self._driver_module_name(), NEXENTA_MODULE)
def test_san_old(self):
self._load_driver('cinder.volume.san.SanISCSIDriver')
- self.assertEquals(self._driver_module_name(), SAN_MODULE)
+ self.assertEqual(self._driver_module_name(), SAN_MODULE)
def test_san_new(self):
self._load_driver(SAN_MODULE)
- self.assertEquals(self._driver_module_name(), SAN_MODULE)
+ self.assertEqual(self._driver_module_name(), SAN_MODULE)
def test_solaris_old(self):
self._load_driver('cinder.volume.san.SolarisISCSIDriver')
- self.assertEquals(self._driver_module_name(), SOLARIS_MODULE)
+ self.assertEqual(self._driver_module_name(), SOLARIS_MODULE)
def test_solaris_new(self):
self._load_driver(SOLARIS_MODULE)
- self.assertEquals(self._driver_module_name(), SOLARIS_MODULE)
+ self.assertEqual(self._driver_module_name(), SOLARIS_MODULE)
def test_hp_lefthand_old(self):
self._load_driver('cinder.volume.san.HpSanISCSIDriver')
- self.assertEquals(self._driver_module_name(), LEFTHAND_MODULE)
+ self.assertEqual(self._driver_module_name(), LEFTHAND_MODULE)
def test_hp_lefthand_new(self):
self._load_driver(LEFTHAND_MODULE)
- self.assertEquals(self._driver_module_name(), LEFTHAND_MODULE)
+ self.assertEqual(self._driver_module_name(), LEFTHAND_MODULE)
def test_nfs_old(self):
self._load_driver('cinder.volume.nfs.NfsDriver')
- self.assertEquals(self._driver_module_name(), NFS_MODULE)
+ self.assertEqual(self._driver_module_name(), NFS_MODULE)
def test_nfs_new(self):
self._load_driver(NFS_MODULE)
- self.assertEquals(self._driver_module_name(), NFS_MODULE)
+ self.assertEqual(self._driver_module_name(), NFS_MODULE)
def test_solidfire_old(self):
self._load_driver('cinder.volume.solidfire.SolidFire')
- self.assertEquals(self._driver_module_name(), SOLIDFIRE_MODULE)
+ self.assertEqual(self._driver_module_name(), SOLIDFIRE_MODULE)
def test_solidfire_old2(self):
self._load_driver('cinder.volume.drivers.solidfire.SolidFire')
- self.assertEquals(self._driver_module_name(), SOLIDFIRE_MODULE)
+ self.assertEqual(self._driver_module_name(), SOLIDFIRE_MODULE)
def test_solidfire_new(self):
self._load_driver(SOLIDFIRE_MODULE)
- self.assertEquals(self._driver_module_name(), SOLIDFIRE_MODULE)
+ self.assertEqual(self._driver_module_name(), SOLIDFIRE_MODULE)
def test_storwize_svc_old(self):
self._load_driver('cinder.volume.storwize_svc.StorwizeSVCDriver')
- self.assertEquals(self._driver_module_name(), STORWIZE_SVC_MODULE)
+ self.assertEqual(self._driver_module_name(), STORWIZE_SVC_MODULE)
def test_storwize_svc_new(self):
self._load_driver(STORWIZE_SVC_MODULE)
- self.assertEquals(self._driver_module_name(), STORWIZE_SVC_MODULE)
+ self.assertEqual(self._driver_module_name(), STORWIZE_SVC_MODULE)
def test_windows_old(self):
self._load_driver('cinder.volume.windows.WindowsDriver')
- self.assertEquals(self._driver_module_name(), WINDOWS_MODULE)
+ self.assertEqual(self._driver_module_name(), WINDOWS_MODULE)
def test_windows_new(self):
self._load_driver(WINDOWS_MODULE)
- self.assertEquals(self._driver_module_name(), WINDOWS_MODULE)
+ self.assertEqual(self._driver_module_name(), WINDOWS_MODULE)
def test_xiv_old(self):
self._load_driver('cinder.volume.xiv.XIVDriver')
- self.assertEquals(self._driver_module_name(), XIV_DS8K_MODULE)
+ self.assertEqual(self._driver_module_name(), XIV_DS8K_MODULE)
def test_xiv_ds8k_new(self):
self._load_driver(XIV_DS8K_MODULE)
- self.assertEquals(self._driver_module_name(), XIV_DS8K_MODULE)
+ self.assertEqual(self._driver_module_name(), XIV_DS8K_MODULE)
def test_zadara_old(self):
self._load_driver('cinder.volume.zadara.ZadaraVPSAISCSIDriver')
- self.assertEquals(self._driver_module_name(), ZADARA_MODULE)
+ self.assertEqual(self._driver_module_name(), ZADARA_MODULE)
def test_zadara_new(self):
self._load_driver(ZADARA_MODULE)
- self.assertEquals(self._driver_module_name(), ZADARA_MODULE)
+ self.assertEqual(self._driver_module_name(), ZADARA_MODULE)
def test_netapp_7m_iscsi_old(self):
self._load_driver(
'cinder.volume.drivers.netapp.iscsi.NetAppISCSIDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_7m_iscsi_old_old(self):
self._load_driver('cinder.volume.netapp.NetAppISCSIDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_cm_iscsi_old_old(self):
self._load_driver('cinder.volume.netapp.NetAppCmodeISCSIDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_cm_iscsi_old(self):
self._load_driver(
'cinder.volume.drivers.netapp.iscsi.NetAppCmodeISCSIDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_7m_nfs_old_old(self):
self._load_driver('cinder.volume.netapp_nfs.NetAppNFSDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_7m_nfs_old(self):
self._load_driver('cinder.volume.drivers.netapp.nfs.NetAppNFSDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
def test_netapp_cm_nfs_old(self):
self._load_driver(
'cinder.volume.drivers.netapp.nfs.NetAppCmodeNfsDriver')
- self.assertEquals(self._driver_module_name(), NETAPP_MODULE)
+ self.assertEqual(self._driver_module_name(), NETAPP_MODULE)
message = "default message"
exc = FakeCinderException()
- self.assertEquals(unicode(exc), 'default message')
+ self.assertEqual(unicode(exc), 'default message')
def test_error_msg(self):
self.assertEquals(unicode(exception.CinderException('test')),
message = "default message: %(code)s"
exc = FakeCinderException(code=500)
- self.assertEquals(unicode(exc), 'default message: 500')
+ self.assertEqual(unicode(exc), 'default message: 500')
def test_error_msg_exception_with_kwargs(self):
# NOTE(dprince): disable format errors for this test
message = "default message: %(mispelled_code)s"
exc = FakeCinderException(code=500)
- self.assertEquals(unicode(exc), 'default message: %(mispelled_code)s')
+ self.assertEqual(unicode(exc), 'default message: %(mispelled_code)s')
def test_default_error_code(self):
class FakeCinderException(exception.CinderException):
code = 404
exc = FakeCinderException()
- self.assertEquals(exc.kwargs['code'], 404)
+ self.assertEqual(exc.kwargs['code'], 404)
def test_error_code_from_kwarg(self):
class FakeCinderException(exception.CinderException):
code = 500
exc = FakeCinderException(code=404)
- self.assertEquals(exc.kwargs['code'], 404)
+ self.assertEqual(exc.kwargs['code'], 404)
mox.ReplayAll()
- self.assertEquals(drv._get_file_format(qemu_img_info_output), 'qcow2')
+ self.assertEqual(drv._get_file_format(qemu_img_info_output), 'qcow2')
mox.VerifyAll()
def test_get_volume_stats(self):
stats = self.driver.get_volume_stats()
- self.assertEquals(stats['volume_backend_name'], 'GPFS')
- self.assertEquals(stats['storage_protocol'], 'file')
+ self.assertEqual(stats['volume_backend_name'], 'GPFS')
+ self.assertEqual(stats['storage_protocol'], 'file')
def test_check_for_setup_error_ok(self):
self.stubs.Set(GPFSDriver, '_get_gpfs_state',
# we should have a host and a vlun now.
host = self.fake_get_3par_host(self.FAKE_HOST)
- self.assertEquals(self.FAKE_HOST, host['name'])
- self.assertEquals(HP3PAR_DOMAIN, host['domain'])
+ self.assertEqual(self.FAKE_HOST, host['name'])
+ self.assertEqual(HP3PAR_DOMAIN, host['domain'])
vlun = self.driver.common.client.getVLUN(self.VOLUME_3PAR_NAME)
- self.assertEquals(self.VOLUME_3PAR_NAME, vlun['volumeName'])
- self.assertEquals(self.FAKE_HOST, vlun['hostname'])
+ self.assertEqual(self.VOLUME_3PAR_NAME, vlun['volumeName'])
+ self.assertEqual(self.FAKE_HOST, vlun['hostname'])
def test_get_volume_stats(self):
self.flags(lock_path=self.tempdir)
self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
stats = self.driver.get_volume_stats(True)
- self.assertEquals(stats['storage_protocol'], 'FC')
- self.assertEquals(stats['total_capacity_gb'], 'infinite')
- self.assertEquals(stats['free_capacity_gb'], 'infinite')
+ self.assertEqual(stats['storage_protocol'], 'FC')
+ self.assertEqual(stats['total_capacity_gb'], 'infinite')
+ self.assertEqual(stats['free_capacity_gb'], 'infinite')
#modify the CPG to have a limit
old_cpg = self.driver.common.client.getCPG(HP3PAR_CPG)
const = 0.0009765625
stats = self.driver.get_volume_stats(True)
- self.assertEquals(stats['storage_protocol'], 'FC')
+ self.assertEqual(stats['storage_protocol'], 'FC')
total_capacity_gb = 8192 * const
- self.assertEquals(stats['total_capacity_gb'], total_capacity_gb)
+ self.assertEqual(stats['total_capacity_gb'], total_capacity_gb)
free_capacity_gb = int((8192 - old_cpg['UsrUsage']['usedMiB']) * const)
- self.assertEquals(stats['free_capacity_gb'], free_capacity_gb)
+ self.assertEqual(stats['free_capacity_gb'], free_capacity_gb)
self.driver.common.client.deleteCPG(HP3PAR_CPG)
self.driver.common.client.createCPG(HP3PAR_CPG, {})
host = self.driver._create_host(self.volume, self.connector)
- self.assertEquals(host['name'], 'fakehost.foo')
+ self.assertEqual(host['name'], 'fakehost.foo')
def test_create_modify_host(self):
self.flags(lock_path=self.tempdir)
# we should have a host and a vlun now.
host = self.fake_get_3par_host(self.FAKE_HOST)
- self.assertEquals(self.FAKE_HOST, host['name'])
- self.assertEquals(HP3PAR_DOMAIN, host['domain'])
+ self.assertEqual(self.FAKE_HOST, host['name'])
+ self.assertEqual(HP3PAR_DOMAIN, host['domain'])
vlun = self.driver.common.client.getVLUN(self.VOLUME_3PAR_NAME)
- self.assertEquals(self.VOLUME_3PAR_NAME, vlun['volumeName'])
- self.assertEquals(self.FAKE_HOST, vlun['hostname'])
+ self.assertEqual(self.VOLUME_3PAR_NAME, vlun['volumeName'])
+ self.assertEqual(self.FAKE_HOST, vlun['hostname'])
def test_get_volume_stats(self):
self.flags(lock_path=self.tempdir)
self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
stats = self.driver.get_volume_stats(True)
- self.assertEquals(stats['storage_protocol'], 'iSCSI')
- self.assertEquals(stats['total_capacity_gb'], 'infinite')
- self.assertEquals(stats['free_capacity_gb'], 'infinite')
+ self.assertEqual(stats['storage_protocol'], 'iSCSI')
+ self.assertEqual(stats['total_capacity_gb'], 'infinite')
+ self.assertEqual(stats['free_capacity_gb'], 'infinite')
#modify the CPG to have a limit
old_cpg = self.driver.common.client.getCPG(HP3PAR_CPG)
const = 0.0009765625
stats = self.driver.get_volume_stats(True)
- self.assertEquals(stats['storage_protocol'], 'iSCSI')
+ self.assertEqual(stats['storage_protocol'], 'iSCSI')
total_capacity_gb = 8192 * const
- self.assertEquals(stats['total_capacity_gb'], total_capacity_gb)
+ self.assertEqual(stats['total_capacity_gb'], total_capacity_gb)
free_capacity_gb = int((8192 - old_cpg['UsrUsage']['usedMiB']) * const)
- self.assertEquals(stats['free_capacity_gb'], free_capacity_gb)
+ self.assertEqual(stats['free_capacity_gb'], free_capacity_gb)
self.driver.common.client.deleteCPG(HP3PAR_CPG)
self.driver.common.client.createCPG(HP3PAR_CPG, {})
host = self.driver._create_host(self.volume, self.connector)
- self.assertEquals(host['name'], 'fakehost.foo')
+ self.assertEqual(host['name'], 'fakehost.foo')
def test_create_modify_host(self):
self.flags(lock_path=self.tempdir)
inf = image_utils.qemu_img_info(TEST_PATH)
- self.assertEquals(inf.image, 'qemu.qcow2')
- self.assertEquals(inf.backing_file, 'qemu.qcow2')
- self.assertEquals(inf.file_format, 'qcow2')
- self.assertEquals(inf.virtual_size, 52428800)
- self.assertEquals(inf.cluster_size, 65536)
- self.assertEquals(inf.disk_size, 200704)
-
- self.assertEquals(inf.snapshots[0]['id'], '1')
- self.assertEquals(inf.snapshots[0]['tag'], 'snap1')
- self.assertEquals(inf.snapshots[0]['vm_size'], '1.7G')
- self.assertEquals(inf.snapshots[0]['date'], '2011-10-04')
+ self.assertEqual(inf.image, 'qemu.qcow2')
+ self.assertEqual(inf.backing_file, 'qemu.qcow2')
+ self.assertEqual(inf.file_format, 'qcow2')
+ self.assertEqual(inf.virtual_size, 52428800)
+ self.assertEqual(inf.cluster_size, 65536)
+ self.assertEqual(inf.disk_size, 200704)
+
+ self.assertEqual(inf.snapshots[0]['id'], '1')
+ self.assertEqual(inf.snapshots[0]['tag'], 'snap1')
+ self.assertEqual(inf.snapshots[0]['vm_size'], '1.7G')
+ self.assertEqual(inf.snapshots[0]['date'], '2011-10-04')
self.assertEquals(inf.snapshots[0]['vm_clock'],
'19:04:00 32:06:34.974')
- self.assertEquals(str(inf), TEST_STR)
+ self.assertEqual(str(inf), TEST_STR)
def test_fetch_to_raw(self):
TEST_RET = "image: qemu.qcow2\n"\
result = image_utils.get_vhd_size('vhdfile')
mox.VerifyAll()
- self.assertEquals(1024, result)
+ self.assertEqual(1024, result)
class TestResize(test.TestCase):
result = image_utils.coalesce_chain(['0.vhd'])
mox.VerifyAll()
- self.assertEquals('0.vhd', result)
+ self.assertEqual('0.vhd', result)
def test_chain_of_two_vhds(self):
self.mox.StubOutWithMock(image_utils, 'get_vhd_size')
result = image_utils.coalesce_chain(['0.vhd', '1.vhd'])
self.mox.VerifyAll()
- self.assertEquals('1.vhd', result)
+ self.assertEqual('1.vhd', result)
class TestDiscoverChain(test.TestCase):
fkey, = snapshots.c.volume_id.foreign_keys
- self.assertEquals(volumes.c.id, fkey.column)
+ self.assertEqual(volumes.c.id, fkey.column)
def test_downgrade_007_removes_fk(self):
for metadata in self.metadatas_downgraded_from(7):
snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
- self.assertEquals(0, len(snapshots.c.volume_id.foreign_keys))
+ self.assertEqual(0, len(snapshots.c.volume_id.foreign_keys))
def test_migration_008(self):
"""Test that adding and removing the backups table works correctly"""
loc = drv.create_volume_from_snapshot(volume, snapshot)
- self.assertEquals(loc, expected_result)
+ self.assertEqual(loc, expected_result)
mox.VerifyAll()
'prefix': self.configuration.nexenta_target_prefix,
'volume': self.TEST_VOLUME_NAME
}
- self.assertEquals(retval, {'provider_location': location})
+ self.assertEqual(retval, {'provider_location': location})
def __get_test(i):
def _test_create_export_fail(self):
'health|size|used|available').AndReturn(stats)
self.mox.ReplayAll()
stats = self.drv.get_volume_stats(True)
- self.assertEquals(stats['storage_protocol'], 'iSCSI')
- self.assertEquals(stats['total_capacity_gb'], 5368709120.0)
- self.assertEquals(stats['free_capacity_gb'], 5368709120.0)
- self.assertEquals(stats['reserved_percentage'], 0)
- self.assertEquals(stats['QoS_support'], False)
+ self.assertEqual(stats['storage_protocol'], 'iSCSI')
+ self.assertEqual(stats['total_capacity_gb'], 5368709120.0)
+ self.assertEqual(stats['free_capacity_gb'], 5368709120.0)
+ self.assertEqual(stats['reserved_percentage'], 0)
+ self.assertEqual(stats['QoS_support'], False)
class TestNexentaJSONRPC(test.TestCase):
'{"error": null, "result": "the result"}')
self.mox.ReplayAll()
result = self.proxy('arg1', 'arg2')
- self.assertEquals("the result", result)
+ self.assertEqual("the result", result)
def test_call_deep(self):
urllib2.Request(
'{"error": null, "result": "the result"}')
self.mox.ReplayAll()
result = self.proxy.obj1.subobj.meth('arg1', 'arg2')
- self.assertEquals("the result", result)
+ self.assertEqual("the result", result)
def test_call_auto(self):
urllib2.Request(
urllib2.urlopen(self.REQUEST).AndReturn(self.resp_mock)
self.mox.ReplayAll()
result = self.proxy('arg1', 'arg2')
- self.assertEquals("the result", result)
+ self.assertEqual("the result", result)
def test_call_error(self):
urllib2.Request(
qos_specs.associate_qos_with_type(self.ctxt, specs_id,
type_ref['id'])
res = qos_specs.get_associations(self.ctxt, specs_id)
- self.assertEquals(len(res), 1)
- self.assertEquals('TypeName', res[0]['name'])
- self.assertEquals(type_ref['id'], res[0]['id'])
+ self.assertEqual(len(res), 1)
+ self.assertEqual('TypeName', res[0]['name'])
+ self.assertEqual(type_ref['id'], res[0]['id'])
self.stubs.Set(db, 'qos_specs_associate',
fake_db_associate)
qos_specs.associate_qos_with_type(self.ctxt, specs_id,
type_ref['id'])
res = qos_specs.get_associations(self.ctxt, specs_id)
- self.assertEquals(len(res), 1)
+ self.assertEqual(len(res), 1)
qos_specs.disassociate_qos_specs(self.ctxt, specs_id, type_ref['id'])
res = qos_specs.get_associations(self.ctxt, specs_id)
- self.assertEquals(len(res), 0)
+ self.assertEqual(len(res), 0)
self.stubs.Set(db, 'qos_specs_disassociate',
fake_db_disassociate)
qos_specs.associate_qos_with_type(self.ctxt, specs_id,
type2_ref['id'])
res = qos_specs.get_associations(self.ctxt, specs_id)
- self.assertEquals(len(res), 2)
+ self.assertEqual(len(res), 2)
qos_specs.disassociate_all(self.ctxt, specs_id)
res = qos_specs.get_associations(self.ctxt, specs_id)
- self.assertEquals(len(res), 0)
+ self.assertEqual(len(res), 0)
self.stubs.Set(db, 'qos_specs_disassociate_all',
fake_db_disassociate_all)
'consumer': 'both'}
id = self._create_qos_specs('Specs1', input)
specs = qos_specs.get_qos_specs(self.ctxt, id)
- self.assertEquals(specs['specs']['key1'], one_time_value)
+ self.assertEqual(specs['specs']['key1'], one_time_value)
self.assertRaises(exception.InvalidQoSSpecs,
qos_specs.get_qos_specs, self.ctxt, None)
id = self._create_qos_specs(one_time_value, input)
specs = qos_specs.get_qos_specs_by_name(self.ctxt,
one_time_value)
- self.assertEquals(specs['specs']['key1'], one_time_value)
+ self.assertEqual(specs['specs']['key1'], one_time_value)
self.assertRaises(exception.InvalidQoSSpecs,
qos_specs.get_qos_specs_by_name, self.ctxt, None)
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: False)
actual = self.volume.driver.clone_image(object(), object(), object())
- self.assertEquals(expected, actual)
+ self.assertEqual(expected, actual)
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
self.assertEquals(expected,
self.stubs.Set(self.volume.driver, '_clone', lambda *args: None)
self.stubs.Set(self.volume.driver, '_resize', lambda *args: None)
actual = self.volume.driver.clone_image(object(), object(), object())
- self.assertEquals(expected, actual)
+ self.assertEqual(expected, actual)
def test_clone_success(self):
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
def test_service_random_port(self):
test_service = service.WSGIService("test_service")
- self.assertEquals(0, test_service.port)
+ self.assertEqual(0, test_service.port)
test_service.start()
self.assertNotEqual(0, test_service.port)
test_service.stop()
self.service = service.WSGIService("test_service")
def test_launch_app(self):
- self.assertEquals(0, self.service.port)
+ self.assertEqual(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
- self.assertEquals(0, self.service.port)
+ self.assertEqual(0, self.service.port)
launcher.stop()
stats = self.driver.get_volume_stats()
self.assertLessEqual(stats['free_capacity_gb'],
stats['total_capacity_gb'])
- self.assertEquals(stats['reserved_percentage'], 25)
+ self.assertEqual(stats['reserved_percentage'], 25)
pool = self.driver.configuration.local_conf.storwize_svc_volpool_name
if self.USESIM:
expected = 'storwize-svc-sim_' + pool
f = utils.get_from_path
input = []
- self.assertEquals([], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [None]
- self.assertEquals([], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': None}]
- self.assertEquals([], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': None}}]
- self.assertEquals([{'b': None}], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': None}], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': {'c': None}}}]
- self.assertEquals([{'b': {'c': None}}], f(input, "a"))
- self.assertEquals([{'c': None}], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': None}}], f(input, "a"))
+ self.assertEqual([{'c': None}], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': {'c': None}}}, {'a': None}]
- self.assertEquals([{'b': {'c': None}}], f(input, "a"))
- self.assertEquals([{'c': None}], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': None}}], f(input, "a"))
+ self.assertEqual([{'c': None}], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
- self.assertEquals([{'b': {'c': None}}, {'b': None}], f(input, "a"))
- self.assertEquals([{'c': None}], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': None}}, {'b': None}], f(input, "a"))
+ self.assertEqual([{'c': None}], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
def test_does_select(self):
f = utils.get_from_path
input = [{'a': 'a_1'}]
- self.assertEquals(['a_1'], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual(['a_1'], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': 'b_1'}}]
- self.assertEquals([{'b': 'b_1'}], f(input, "a"))
- self.assertEquals(['b_1'], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': 'b_1'}], f(input, "a"))
+ self.assertEqual(['b_1'], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': {'c': 'c_1'}}}]
- self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
- self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEquals(['c_1'], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
+ self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEqual(['c_1'], f(input, "a/b/c"))
input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
- self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
- self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEquals(['c_1'], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
+ self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEqual(['c_1'], f(input, "a/b/c"))
input = [{'a': {'b': {'c': 'c_1'}}},
{'a': {'b': None}}]
- self.assertEquals([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
- self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEquals(['c_1'], f(input, "a/b/c"))
+ self.assertEqual([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
+ self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEqual(['c_1'], f(input, "a/b/c"))
input = [{'a': {'b': {'c': 'c_1'}}},
{'a': {'b': {'c': 'c_2'}}}]
self.assertEquals([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
f(input, "a"))
- self.assertEquals([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
- self.assertEquals(['c_1', 'c_2'], f(input, "a/b/c"))
+ self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
+ self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
- self.assertEquals([], f(input, "a/b/c/d"))
- self.assertEquals([], f(input, "c/a/b/d"))
- self.assertEquals([], f(input, "i/r/t"))
+ self.assertEqual([], f(input, "a/b/c/d"))
+ self.assertEqual([], f(input, "c/a/b/d"))
+ self.assertEqual([], f(input, "i/r/t"))
def test_flattens_lists(self):
f = utils.get_from_path
input = [{'a': [1, 2, 3]}]
- self.assertEquals([1, 2, 3], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([1, 2, 3], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': [1, 2, 3]}}]
- self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
- self.assertEquals([1, 2, 3], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
+ self.assertEqual([1, 2, 3], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
- self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
- self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = [{'a': [1, 2, {'b': 'b_1'}]}]
- self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
- self.assertEquals(['b_1'], f(input, "a/b"))
+ self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
+ self.assertEqual(['b_1'], f(input, "a/b"))
def test_bad_xpath(self):
f = utils.get_from_path
private_ips = f(inst, 'fixed_ip/address')
public_ips = f(inst, 'fixed_ip/floating_ips/address')
- self.assertEquals(['192.168.0.3'], private_ips)
- self.assertEquals(['1.2.3.4'], public_ips)
+ self.assertEqual(['192.168.0.3'], private_ips)
+ self.assertEqual(['1.2.3.4'], public_ips)
def test_accepts_dictionaries(self):
f = utils.get_from_path
input = {'a': [1, 2, 3]}
- self.assertEquals([1, 2, 3], f(input, "a"))
- self.assertEquals([], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([1, 2, 3], f(input, "a"))
+ self.assertEqual([], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = {'a': {'b': [1, 2, 3]}}
- self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
- self.assertEquals([1, 2, 3], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
+ self.assertEqual([1, 2, 3], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
- self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEquals([], f(input, "a/b/c"))
+ self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEqual([], f(input, "a/b/c"))
input = {'a': [1, 2, {'b': 'b_1'}]}
- self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
- self.assertEquals(['b_1'], f(input, "a/b"))
+ self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
+ self.assertEqual(['b_1'], f(input, "a/b"))
class GenericUtilsTestCase(test.TestCase):
flo = StringIO.StringIO(data)
h1 = utils.hash_file(flo)
h2 = hashlib.sha1(data).hexdigest()
- self.assertEquals(h1, h2)
+ self.assertEqual(h1, h2)
def test_check_ssh_injection(self):
cmd_list = ['ssh', '-D', 'my_name@name_of_remote_computer']
test_obj = TestClass()
self.assertRaises(exception.CinderException, test_obj.fail)
- self.assertEquals(test_obj.counter1, 3)
+ self.assertEqual(test_obj.counter1, 3)
def test_create_session(self):
"""Test create_session."""
def test_get_volume_stats(self):
"""Test get_volume_stats."""
stats = self._driver.get_volume_stats()
- self.assertEquals(stats['vendor_name'], 'VMware')
- self.assertEquals(stats['driver_version'], '1.0')
- self.assertEquals(stats['storage_protocol'], 'LSI Logic SCSI')
- self.assertEquals(stats['reserved_percentage'], 0)
- self.assertEquals(stats['total_capacity_gb'], 'unknown')
- self.assertEquals(stats['free_capacity_gb'], 'unknown')
+ self.assertEqual(stats['vendor_name'], 'VMware')
+ self.assertEqual(stats['driver_version'], '1.0')
+ self.assertEqual(stats['storage_protocol'], 'LSI Logic SCSI')
+ self.assertEqual(stats['reserved_percentage'], 0)
+ self.assertEqual(stats['total_capacity_gb'], 'unknown')
+ self.assertEqual(stats['free_capacity_gb'], 'unknown')
def test_create_volume(self):
"""Test create_volume."""
m.ReplayAll()
ret = self._session.wait_for_task(mox.IgnoreArg())
- self.assertEquals(ret.result, result)
+ self.assertEqual(ret.result, result)
m.UnsetStubs()
m.VerifyAll()
# Not recursive
child = FakeMor('Parent', 'my_parent')
parent = self._volumeops._get_parent(child, 'Parent')
- self.assertEquals(parent, child)
+ self.assertEqual(parent, child)
# Recursive
m = mox.Mox()
m.ReplayAll()
ret = self._volumeops._get_parent(child, 'Parent')
- self.assertEquals(ret, parent)
+ self.assertEqual(ret, parent)
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
result = self._volumeops.get_datastore(backing)
- self.assertEquals(result, datastore)
+ self.assertEqual(result, datastore)
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
conn_info = self._driver.initialize_connection(volume, connector)
- self.assertEquals(conn_info['driver_volume_type'], 'vmdk')
- self.assertEquals(conn_info['data']['volume'], 'my_back')
- self.assertEquals(conn_info['data']['volume_id'], 'volume_id')
+ self.assertEqual(conn_info['driver_volume_type'], 'vmdk')
+ self.assertEqual(conn_info['data']['volume'], 'my_back')
+ self.assertEqual(conn_info['data']['volume_id'], 'volume_id')
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
summary = self._driver._select_datastore_summary(1, datastores)
- self.assertEquals(summary, summary1)
+ self.assertEqual(summary, summary1)
summary = self._driver._select_datastore_summary(10, datastores)
- self.assertEquals(summary, summary3)
+ self.assertEqual(summary, summary3)
summary = self._driver._select_datastore_summary(50, datastores)
- self.assertEquals(summary, summary4)
+ self.assertEqual(summary, summary4)
self.assertRaises(error_util.VimException,
self._driver._select_datastore_summary,
100, datastores)
m.ReplayAll()
conn_info = self._driver.initialize_connection(volume, connector)
- self.assertEquals(conn_info['driver_volume_type'], 'vmdk')
- self.assertEquals(conn_info['data']['volume'], 'my_back')
- self.assertEquals(conn_info['data']['volume_id'], 'volume_id')
+ self.assertEqual(conn_info['driver_volume_type'], 'vmdk')
+ self.assertEqual(conn_info['data']['volume'], 'my_back')
+ self.assertEqual(conn_info['data']['volume_id'], 'volume_id')
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
conn_info = self._driver.initialize_connection(volume, connector)
- self.assertEquals(conn_info['driver_volume_type'], 'vmdk')
- self.assertEquals(conn_info['data']['volume'], 'my_back')
- self.assertEquals(conn_info['data']['volume_id'], 'volume_id')
+ self.assertEqual(conn_info['driver_volume_type'], 'vmdk')
+ self.assertEqual(conn_info['data']['volume'], 'my_back')
+ self.assertEqual(conn_info['data']['volume_id'], 'volume_id')
m.UnsetStubs()
m.VerifyAll()
"""Test _get_snapshot_from_tree."""
volops = volumeops.VMwareVolumeOps
ret = volops._get_snapshot_from_tree(mox.IgnoreArg(), None)
- self.assertEquals(ret, None)
+ self.assertEqual(ret, None)
name = 'snapshot_name'
snapshot = FakeMor('VirtualMachineSnapshot', 'my_snap')
root = FakeSnapshotTree(name='snapshot_name', snapshot=snapshot)
ret = volops._get_snapshot_from_tree(name, root)
- self.assertEquals(ret, snapshot)
+ self.assertEqual(ret, snapshot)
snapshot1 = FakeMor('VirtualMachineSnapshot', 'my_snap_1')
root = FakeSnapshotTree(name='snapshot_name_1', snapshot=snapshot1,
childSnapshotList=[root])
ret = volops._get_snapshot_from_tree(name, root)
- self.assertEquals(ret, snapshot)
+ self.assertEqual(ret, snapshot)
def test_get_snapshot(self):
"""Test get_snapshot."""
actual_vmdk_path = self._volumeops.get_vmdk_path(backing)
self.assertEquals(backingInfo.__class__.__name__,
'VirtualDiskFlatVer2BackingInfo')
- self.assertEquals(virtualDisk.__class__.__name__, 'VirtualDisk')
- self.assertEquals(actual_vmdk_path, vmdk_path)
+ self.assertEqual(virtualDisk.__class__.__name__, 'VirtualDisk')
+ self.assertEqual(actual_vmdk_path, vmdk_path)
m.UnsetStubs()
m.VerifyAll()
(datastore,
folder,
file_name) = volumeops.split_datastore_path(test1)
- self.assertEquals(datastore, 'datastore1')
- self.assertEquals(folder, 'myfolder/mysubfolder/')
- self.assertEquals(file_name, 'myvm.vmx')
+ self.assertEqual(datastore, 'datastore1')
+ self.assertEqual(folder, 'myfolder/mysubfolder/')
+ self.assertEqual(file_name, 'myvm.vmx')
test2 = '[datastore2 ] myfolder/myvm.vmdk'
(datastore,
folder,
file_name) = volumeops.split_datastore_path(test2)
- self.assertEquals(datastore, 'datastore2')
- self.assertEquals(folder, 'myfolder/')
- self.assertEquals(file_name, 'myvm.vmdk')
+ self.assertEqual(datastore, 'datastore2')
+ self.assertEqual(folder, 'myfolder/')
+ self.assertEqual(file_name, 'myvm.vmdk')
test3 = 'myfolder/myvm.vmdk'
self.assertRaises(IndexError, volumeops.split_datastore_path, test3)
m.ReplayAll()
fol = self._volumeops.create_folder(parent_folder, 'child_folder_name')
- self.assertEquals(fol, child_folder)
+ self.assertEqual(fol, child_folder)
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
conn_info = self._driver.initialize_connection(volume, connector)
- self.assertEquals(conn_info['driver_volume_type'], 'vmdk')
- self.assertEquals(conn_info['data']['volume'], 'my_back')
- self.assertEquals(conn_info['data']['volume_id'], 'volume_id')
+ self.assertEqual(conn_info['driver_volume_type'], 'vmdk')
+ self.assertEqual(conn_info['data']['volume'], 'my_back')
+ self.assertEqual(conn_info['data']['volume_id'], 'volume_id')
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
conn_info = self._driver.initialize_connection(volume, connector)
- self.assertEquals(conn_info['driver_volume_type'], 'vmdk')
- self.assertEquals(conn_info['data']['volume'], 'my_back')
- self.assertEquals(conn_info['data']['volume_id'], 'volume_id')
+ self.assertEqual(conn_info['driver_volume_type'], 'vmdk')
+ self.assertEqual(conn_info['data']['volume'], 'my_back')
+ self.assertEqual(conn_info['data']['volume_id'], 'volume_id')
m.UnsetStubs()
m.VerifyAll()
m.ReplayAll()
ret = self._volumeops.clone_backing(name, backing, snapshot,
mox.IgnoreArg(), datastore)
- self.assertEquals(ret, clone)
+ self.assertEqual(ret, clone)
m.UnsetStubs()
m.VerifyAll()
volume_id = volume['id']
self.volume.init_host()
volume = db.volume_get(context.get_admin_context(), volume_id)
- self.assertEquals(volume['status'], "error")
+ self.assertEqual(volume['status'], "error")
self.volume.delete_volume(self.context, volume_id)
def test_create_delete_volume(self):
**self.volume_params)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 2)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEqual(msg['event_type'], 'volume.create.start')
expected = {
self.volume.delete_volume(self.context, volume_id)
vol = db.volume_get(context.get_admin_context(read_deleted='yes'),
volume_id)
- self.assertEquals(vol['status'], 'deleted')
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
+ self.assertEqual(vol['status'], 'deleted')
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 4)
msg = test_notifier.NOTIFICATIONS[2]
self.assertEqual(msg['event_type'], 'volume.delete.start')
self.assertDictMatch(msg['payload'], expected)
1,
'name',
'description')
- self.assertEquals(volume['volume_type_id'], None)
- self.assertEquals(volume['encryption_key_id'], None)
+ self.assertEqual(volume['volume_type_id'], None)
+ self.assertEqual(volume['encryption_key_id'], None)
# Create default volume type
vol_type = conf_fixture.def_vol_type
1,
'name',
'description')
- self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
self.assertIsNone(volume['encryption_key_id'])
# Create volume with specific volume type
'name',
'description',
volume_type=db_vol_type)
- self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
def test_create_volume_with_encrypted_volume_type(self):
self.stubs.Set(keymgr, "API", fake_keymgr.fake_api)
'name',
'description',
volume_type=db_vol_type)
- self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_delete_volume_with_encrypted_volume_type(self):
volume_type=db_vol_type)
self.assertIsNotNone(volume.get('encryption_key_id', None))
- self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
self.assertIsNotNone(volume['encryption_key_id'])
volume['host'] = 'fake_host'
volume_api.delete(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
- self.assertEquals('deleting', volume['status'])
+ self.assertEqual('deleting', volume['status'])
db.volume_destroy(self.context, volume['id'])
self.assertRaises(exception.NotFound,
self.assertEqual(vol['instance_uuid'], instance_uuid)
self.assertEqual(vol['attached_host'], None)
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'ro')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'ro')
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume_id, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'ro')
+ self.assertEqual(conn_info['data']['access_mode'], 'ro')
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual(vol['attached_host'], 'fake-host')
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'rw')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'rw')
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume_id, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'rw')
+ self.assertEqual(conn_info['data']['access_mode'], 'rw')
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.assertEqual(vol['instance_uuid'], instance_uuid)
self.assertEqual(vol['attached_host'], None)
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'ro')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'ro')
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume_id, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'ro')
+ self.assertEqual(conn_info['data']['access_mode'], 'ro')
self.volume.detach_volume(self.context, volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['instance_uuid'], None)
self.assertEqual(vol['attached_host'], None)
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'ro')
self.assertEqual(vol['instance_uuid'], None)
self.assertEqual(vol['attached_host'], 'fake-host')
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'ro')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'ro')
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume_id, connector)
- self.assertEquals(conn_info['data']['access_mode'], 'ro')
+ self.assertEqual(conn_info['data']['access_mode'], 'ro')
self.volume.detach_volume(self.context, volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['instance_uuid'], None)
self.assertEqual(vol['attached_host'], None)
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.VolumeNotFound,
self.assertEqual(vol['status'], "error_attaching")
self.assertEqual(vol['attach_status'], "detached")
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'rw')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'rw')
db.volume_update(self.context, volume_id, {'status': 'available'})
self.assertRaises(exception.InvalidVolumeAttachMode,
self.assertEqual(vol['status'], "error_attaching")
self.assertEqual(vol['attach_status'], "detached")
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 2)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
- self.assertEquals(admin_metadata[1]['key'], 'attached_mode')
- self.assertEquals(admin_metadata[1]['value'], 'rw')
+ self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
+ self.assertEqual(admin_metadata[1]['key'], 'attached_mode')
+ self.assertEqual(admin_metadata[1]['value'], 'rw')
def test_run_api_attach_detach_volume_with_wrong_attach_mode(self):
# Not allow using 'read-write' mode attach readonly volume
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['attach_status'], "detached")
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
db.volume_update(self.context, volume_id, {'status': 'available'})
self.assertRaises(exception.InvalidVolumeAttachMode,
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['attach_status'], "detached")
admin_metadata = vol['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'True')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'True')
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume['id'])
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 2)
snapshot_id = self._create_snapshot(volume['id'])['id']
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 4)
msg = test_notifier.NOTIFICATIONS[2]
- self.assertEquals(msg['event_type'], 'snapshot.create.start')
+ self.assertEqual(msg['event_type'], 'snapshot.create.start')
expected = {
'created_at': 'DONTCARE',
'deleted': '',
}
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[3]
- self.assertEquals(msg['event_type'], 'snapshot.create.end')
+ self.assertEqual(msg['event_type'], 'snapshot.create.end')
self.assertDictMatch(msg['payload'], expected)
self.volume.delete_snapshot(self.context, snapshot_id)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 6)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 6)
msg = test_notifier.NOTIFICATIONS[4]
- self.assertEquals(msg['event_type'], 'snapshot.delete.start')
+ self.assertEqual(msg['event_type'], 'snapshot.delete.start')
expected['status'] = 'available'
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[5]
- self.assertEquals(msg['event_type'], 'snapshot.delete.end')
+ self.assertEqual(msg['event_type'], 'snapshot.delete.end')
self.assertDictMatch(msg['payload'], expected)
snap = db.snapshot_get(context.get_admin_context(read_deleted='yes'),
snapshot_id)
- self.assertEquals(snap['status'], 'deleted')
+ self.assertEqual(snap['status'], 'deleted')
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
# status is deleting
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['status'], 'deleting')
+ self.assertEqual(volume['status'], 'deleting')
# clean up
self.volume.delete_volume(self.context, volume['id'])
size,
'name',
'description')
- self.assertEquals(volume['size'], int(size))
+ self.assertEqual(volume['size'], int(size))
def test_create_volume_int_size(self):
"""Test volume creation with int size."""
volume_api.update(self.context, volume, update_dict)
# read changes from db
vol = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(vol['display_name'], 'test update name')
+ self.assertEqual(vol['display_name'], 'test update name')
def test_volume_api_update_snapshot(self):
# create raw snapshot
volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'])
- self.assertEquals(snapshot['display_name'], None)
+ self.assertEqual(snapshot['display_name'], None)
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
volume_api.update_snapshot(self.context, snapshot, update_dict)
# read changes from db
snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
- self.assertEquals(snap['display_name'], 'test update name')
+ self.assertEqual(snap['display_name'], 'test update name')
def test_extend_volume(self):
"""Test volume can be extended at API level."""
volume_api.extend(self.context, volume, 3)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['status'], 'extending')
+ self.assertEqual(volume['status'], 'extending')
# clean up
self.volume.delete_volume(self.context, volume['id'])
volume['status'] = 'extending'
self.volume.extend_volume(self.context, volume['id'], '4')
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['size'], 2)
- self.assertEquals(volume['status'], 'error_extending')
+ self.assertEqual(volume['size'], 2)
+ self.assertEqual(volume['status'], 'error_extending')
# Test driver exception
self.stubs.Set(QUOTAS, 'reserve', fake_reserve)
volume['status'] = 'extending'
self.volume.extend_volume(self.context, volume['id'], '4')
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['size'], 2)
- self.assertEquals(volume['status'], 'error_extending')
+ self.assertEqual(volume['size'], 2)
+ self.assertEqual(volume['status'], 'error_extending')
# Test driver success
self.stubs.Set(self.volume.driver, 'extend_volume',
volume['status'] = 'extending'
self.volume.extend_volume(self.context, volume['id'], '4')
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['size'], 4)
- self.assertEquals(volume['status'], 'available')
+ self.assertEqual(volume['size'], 4)
+ self.assertEqual(volume['status'], 'available')
# clean up
self.volume.delete_volume(self.context, volume['id'])
for meta_src in src_glancemeta:
for meta_dst in dst_glancemeta:
if meta_dst.key == meta_src.key:
- self.assertEquals(meta_dst.value, meta_src.value)
+ self.assertEqual(meta_dst.value, meta_src.value)
self.volume.delete_volume(self.context, volume_src['id'])
self.volume.delete_volume(self.context, volume_dst['id'])
# check volume properties
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['host'], 'newhost')
- self.assertEquals(volume['migration_status'], None)
+ self.assertEqual(volume['host'], 'newhost')
+ self.assertEqual(volume['migration_status'], None)
def test_migrate_volume_generic(self):
def fake_migr(vol, host):
self.volume.migrate_volume(self.context, volume['id'],
host_obj, True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['host'], 'newhost')
- self.assertEquals(volume['migration_status'], None)
+ self.assertEqual(volume['host'], 'newhost')
+ self.assertEqual(volume['migration_status'], None)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
volume_api.update_readonly_flag(self.context, volume, False)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEquals(volume['status'], 'available')
+ self.assertEqual(volume['status'], 'available')
admin_metadata = volume['volume_admin_metadata']
- self.assertEquals(len(admin_metadata), 1)
- self.assertEquals(admin_metadata[0]['key'], 'readonly')
- self.assertEquals(admin_metadata[0]['value'], 'False')
+ self.assertEqual(len(admin_metadata), 1)
+ self.assertEqual(admin_metadata[0]['key'], 'readonly')
+ self.assertEqual(admin_metadata[0]['value'], 'False')
# clean up
self.volume.delete_volume(self.context, volume['id'])
# Test volume has 'size' field
volume = dict(fake_volume, size='123')
- self.assertEquals(True, lvm_driver.clear_volume(volume))
+ self.assertEqual(True, lvm_driver.clear_volume(volume))
# Test volume has 'volume_size' field
volume = dict(fake_volume, volume_size='123')
- self.assertEquals(True, lvm_driver.clear_volume(volume))
+ self.assertEqual(True, lvm_driver.clear_volume(volume))
# Test volume without 'size' field and 'volume_size' field
volume = dict(fake_volume)
- self.assertEquals(None, lvm_driver.clear_volume(volume))
+ self.assertEqual(None, lvm_driver.clear_volume(volume))
class ISCSITestCase(DriverTestCase):
iscsi_driver = driver.ISCSIDriver()
iscsi_driver._do_iscsi_discovery = lambda v: "0.0.0.0:0000,0 iqn:iqn 0"
result = iscsi_driver._get_iscsi_properties(volume)
- self.assertEquals(result["target_portal"], "0.0.0.0:0000")
- self.assertEquals(result["target_iqn"], "iqn:iqn")
- self.assertEquals(result["target_lun"], 0)
+ self.assertEqual(result["target_portal"], "0.0.0.0:0000")
+ self.assertEqual(result["target_iqn"], "iqn:iqn")
+ self.assertEqual(result["target_lun"], 0)
def test_get_volume_stats(self):
def _emulate_vgs_execute(_command, *_args, **_kwargs):
stats = self.volume.driver._stats
- self.assertEquals(stats['total_capacity_gb'], float('5.52'))
- self.assertEquals(stats['free_capacity_gb'], float('0.52'))
+ self.assertEqual(stats['total_capacity_gb'], float('5.52'))
+ self.assertEqual(stats['free_capacity_gb'], float('0.52'))
def test_validate_connector(self):
iscsi_driver = driver.ISCSIDriver()
iser_driver = driver.ISERDriver()
iser_driver._do_iser_discovery = lambda v: "0.0.0.0:0000,0 iqn:iqn 0"
result = iser_driver._get_iser_properties(volume)
- self.assertEquals(result["target_portal"], "0.0.0.0:0000")
- self.assertEquals(result["target_iqn"], "iqn:iqn")
- self.assertEquals(result["target_lun"], 0)
+ self.assertEqual(result["target_portal"], "0.0.0.0:0000")
+ self.assertEqual(result["target_iqn"], "iqn:iqn")
+ self.assertEqual(result["target_lun"], 0)
class FibreChannelTestCase(DriverTestCase):
def test_group_grafts_opts(self):
c = configuration.Configuration(volume_opts, config_group='foo')
- self.assertEquals(c.str_opt, CONF.foo.str_opt)
- self.assertEquals(c.bool_opt, CONF.foo.bool_opt)
+ self.assertEqual(c.str_opt, CONF.foo.str_opt)
+ self.assertEqual(c.bool_opt, CONF.foo.bool_opt)
def test_opts_no_group(self):
c = configuration.Configuration(volume_opts)
- self.assertEquals(c.str_opt, CONF.str_opt)
- self.assertEquals(c.bool_opt, CONF.bool_opt)
+ self.assertEqual(c.str_opt, CONF.str_opt)
+ self.assertEqual(c.bool_opt, CONF.bool_opt)
def test_grafting_multiple_opts(self):
c = configuration.Configuration(volume_opts, config_group='foo')
c.append_config_values(more_volume_opts)
- self.assertEquals(c.str_opt, CONF.foo.str_opt)
- self.assertEquals(c.bool_opt, CONF.foo.bool_opt)
- self.assertEquals(c.int_opt, CONF.foo.int_opt)
+ self.assertEqual(c.str_opt, CONF.foo.str_opt)
+ self.assertEqual(c.bool_opt, CONF.foo.bool_opt)
+ self.assertEqual(c.int_opt, CONF.foo.int_opt)
def test_safe_get(self):
c = configuration.Configuration(volume_opts, config_group='foo')
- self.assertEquals(c.safe_get('none_opt'), None)
+ self.assertEqual(c.safe_get('none_opt'), None)
for meta in db.volume_snapshot_glance_metadata_get(ctxt, 100):
for (key, value) in expected_meta.items():
- self.assertEquals(meta[key], value)
+ self.assertEqual(meta[key], value)
def test_vol_glance_metadata_copy_to_volume(self):
ctxt = context.get_admin_context()
for meta in db.volume_glance_metadata_get(ctxt, 100):
for (key, value) in expected_meta.items():
- self.assertEquals(meta[key], value)
+ self.assertEqual(meta[key], value)
updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
- self.assertEquals(t['id'], transfer['id'], 'Unexpected transfer id')
+ self.assertEqual(t['id'], transfer['id'], 'Unexpected transfer id')
ts = tx_api.get_all(self.ctxt)
- self.assertEquals(len(ts), 1, 'Unexpected number of transfers.')
+ self.assertEqual(len(ts), 1, 'Unexpected number of transfers.')
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
transfer['id'])
ts = tx_api.get_all(nctxt)
- self.assertEquals(len(ts), 0, 'Unexpected transfers listed.')
+ self.assertEqual(len(ts), 0, 'Unexpected transfers listed.')
type_ref = volume_types.create(self.ctxt, "type1", {"key2": "val2",
"key3": "val3"})
res = volume_types.get_volume_type_qos_specs(type_ref['id'])
- self.assertEquals(res['qos_specs'], {})
+ self.assertEqual(res['qos_specs'], {})
qos_specs.associate_qos_with_type(self.ctxt,
qos_ref['id'],
type_ref['id'])
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
- self.assertEquals(expected_specs, actual_specs)
+ self.assertEqual(expected_specs, actual_specs)
def test_volume_type_extra_specs_delete(self):
expected_specs = self.vol_type1_specs.copy()
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
- self.assertEquals(expected_specs, actual_specs)
+ self.assertEqual(expected_specs, actual_specs)
def test_volume_type_extra_specs_update(self):
expected_specs = self.vol_type1_specs.copy()
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
- self.assertEquals(expected_specs, actual_specs)
+ self.assertEqual(expected_specs, actual_specs)
def test_volume_type_extra_specs_create(self):
expected_specs = self.vol_type1_specs.copy()
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
- self.assertEquals(expected_specs, actual_specs)
+ self.assertEqual(expected_specs, actual_specs)
def test_volume_type_get_with_extra_specs(self):
volume_type = db.volume_type_get(
volume_type = db.volume_type_get(
context.get_admin_context(),
self.vol_type2_id)
- self.assertEquals(volume_type['extra_specs'], {})
+ self.assertEqual(volume_type['extra_specs'], {})
def test_volume_type_get_by_name_with_extra_specs(self):
volume_type = db.volume_type_get_by_name(
volume_type = db.volume_type_get_by_name(
context.get_admin_context(),
self.vol_type2_noextra['name'])
- self.assertEquals(volume_type['extra_specs'], {})
+ self.assertEqual(volume_type['extra_specs'], {})
def test_volume_type_get_all(self):
expected_specs = self.vol_type1_specs.copy()
volume = db.volume_get(self.context, volume_id)
volume_utils.notify_usage_exists(self.context, volume)
LOG.info("%r" % test_notifier.NOTIFICATIONS)
- self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
- self.assertEquals(msg['priority'], 'INFO')
- self.assertEquals(msg['event_type'], 'volume.exists')
+ self.assertEqual(msg['priority'], 'INFO')
+ self.assertEqual(msg['event_type'], 'volume.exists')
payload = msg['payload']
- self.assertEquals(payload['tenant_id'], self.project_id)
- self.assertEquals(payload['user_id'], self.user_id)
- self.assertEquals(payload['snapshot_id'], self.snapshot_id)
- self.assertEquals(payload['volume_id'], volume.id)
- self.assertEquals(payload['size'], self.volume_size)
+ self.assertEqual(payload['tenant_id'], self.project_id)
+ self.assertEqual(payload['user_id'], self.user_id)
+ self.assertEqual(payload['snapshot_id'], self.snapshot_id)
+ self.assertEqual(payload['volume_id'], volume.id)
+ self.assertEqual(payload['size'], self.volume_size)
for attr in ('display_name', 'created_at', 'launched_at',
'status', 'audit_period_beginning',
'audit_period_ending'):
# Test valid volume_dd_blocksize
CONF.set_override('volume_dd_blocksize', '10M')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '10M')
- self.assertEquals(count, 103)
+ self.assertEqual(bs, '10M')
+ self.assertEqual(count, 103)
CONF.set_override('volume_dd_blocksize', '1xBBB')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '1M')
- self.assertEquals(count, 1024)
+ self.assertEqual(bs, '1M')
+ self.assertEqual(count, 1024)
# Test 'volume_dd_blocksize' with fraction
CONF.set_override('volume_dd_blocksize', '1.3M')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '1M')
- self.assertEquals(count, 1024)
+ self.assertEqual(bs, '1M')
+ self.assertEqual(count, 1024)
# Test zero-size 'volume_dd_blocksize'
CONF.set_override('volume_dd_blocksize', '0M')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '1M')
- self.assertEquals(count, 1024)
+ self.assertEqual(bs, '1M')
+ self.assertEqual(count, 1024)
# Test negative 'volume_dd_blocksize'
CONF.set_override('volume_dd_blocksize', '-1M')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '1M')
- self.assertEquals(count, 1024)
+ self.assertEqual(bs, '1M')
+ self.assertEqual(count, 1024)
# Test non-digital 'volume_dd_blocksize'
CONF.set_override('volume_dd_blocksize', 'ABM')
bs, count = volume_utils._calculate_count(1024)
- self.assertEquals(bs, '1M')
- self.assertEquals(count, 1024)
+ self.assertEqual(bs, '1M')
+ self.assertEqual(count, 1024)
mox.VerifyAll()
- self.assertEquals(export_info['provider_location'], initiator_name)
+ self.assertEqual(export_info['provider_location'], initiator_name)
def test_initialize_connection(self):
mox = self._mox
self.addCleanup(self.config.close)
def test_config_found(self):
- self.assertEquals(self.config.name, self.loader.config_path)
+ self.assertEqual(self.config.name, self.loader.config_path)
def test_app_not_found(self):
self.assertRaises(
def test_app_found(self):
url_parser = self.loader.load_app("test_app")
- self.assertEquals("/tmp", url_parser.directory)
+ self.assertEqual("/tmp", url_parser.directory)
class TestWSGIServer(test.TestCase):
def test_no_app(self):
server = cinder.wsgi.Server("test_app", None)
- self.assertEquals("test_app", server.name)
+ self.assertEqual("test_app", server.name)
def test_start_random_port(self):
server = cinder.wsgi.Server("test_random_port", None, host="127.0.0.1")
server.start()
response = urllib2.urlopen('http://127.0.0.1:%d/' % server.port)
- self.assertEquals(greetings, response.read())
+ self.assertEqual(greetings, response.read())
server.stop()
server.start()
response = urllib2.urlopen('https://127.0.0.1:%d/' % server.port)
- self.assertEquals(greetings, response.read())
+ self.assertEqual(greetings, response.read())
server.stop()
server.start()
response = urllib2.urlopen('https://[::1]:%d/' % server.port)
- self.assertEquals(greetings, response.read())
+ self.assertEqual(greetings, response.read())
server.stop()
if hasattr(exception_type, 'headers'):
for (key, value) in exception_type.headers.iteritems():
self.assertIn(key, resp.headers)
- self.assertEquals(resp.headers[key], value)
+ self.assertEqual(resp.headers[key], value)
def test_quota_error_mapping(self):
self._do_test_exception_mapping(exception.QuotaError, 'too many used')
drv.do_setup('context')
mock.VerifyAll()
- self.assertEquals(nfsops, drv.nfs_ops)
+ self.assertEqual(nfsops, drv.nfs_ops)
def test_create_volume(self):
mock = mox.Mox()
size=1, display_name='name', display_description='desc'))
mock.VerifyAll()
- self.assertEquals(dict(provider_location='sr_uuid/vdi_uuid'), result)
+ self.assertEqual(dict(provider_location='sr_uuid/vdi_uuid'), result)
def test_delete_volume(self):
mock = mox.Mox()
result = drv.copy_volume_to_image(
context, "volume", "image_service", "image_meta")
- self.assertEquals('result', result)
+ self.assertEqual('result', result)
mock.VerifyAll()
result = drv.copy_volume_to_image(
context, "volume", "image_service", "image_meta")
- self.assertEquals('result', result)
+ self.assertEqual('result', result)
mock.VerifyAll()
mock.ReplayAll()
result = drv.copy_image_to_volume(
context, "volume", "image_service", "image_id")
- self.assertEquals('result', result)
+ self.assertEqual('result', result)
mock.VerifyAll()
def test_copy_image_to_volume_non_xenserver_case(self):
stats = drv.get_volume_stats()
- self.assertEquals('unknown', stats['free_capacity_gb'])
+ self.assertEqual('unknown', stats['free_capacity_gb'])
def test_reported_driver_type(self):
drv = get_configured_driver()
stats = drv.get_volume_stats()
- self.assertEquals('xensm', stats['storage_protocol'])
+ self.assertEqual('xensm', stats['storage_protocol'])
class ToolsTest(test.TestCase):
@mock.patch('cinder.volume.drivers.xenapi.tools._stripped_first_line_of')
def test_get_this_vm_uuid(self, mock_read_first_line):
mock_read_first_line.return_value = 'someuuid'
- self.assertEquals('someuuid', tools.get_this_vm_uuid())
+ self.assertEqual('someuuid', tools.get_this_vm_uuid())
mock_read_first_line.assert_called_once_with('/sys/hypervisor/uuid')
def test_stripped_first_line_of(self):