res_dict = json.loads(res.body)
self.assertEqual(200, res.status_int)
- self.assertEqual(res_dict['cgsnapshots'][0]['id'],
- cgsnapshot_id1)
+ self.assertEqual(cgsnapshot_id1,
+ res_dict['cgsnapshots'][0]['id'])
self.assertEqual('test_cgsnapshot',
res_dict['cgsnapshots'][0]['name'])
- self.assertEqual(res_dict['cgsnapshots'][1]['id'],
- cgsnapshot_id2)
+ self.assertEqual(cgsnapshot_id2,
+ res_dict['cgsnapshots'][1]['id'])
self.assertEqual('test_cgsnapshot',
res_dict['cgsnapshots'][1]['name'])
- self.assertEqual(res_dict['cgsnapshots'][2]['id'],
- cgsnapshot_id3)
+ self.assertEqual(cgsnapshot_id3,
+ res_dict['cgsnapshots'][2]['id'])
self.assertEqual('test_cgsnapshot',
res_dict['cgsnapshots'][2]['name'])
dom = minidom.parseString(res.body)
cgsnapshot_list = dom.getElementsByTagName('cgsnapshot')
- self.assertEqual(cgsnapshot_list.item(0).getAttribute('id'),
- cgsnapshot_id1)
- self.assertEqual(cgsnapshot_list.item(1).getAttribute('id'),
- cgsnapshot_id2)
- self.assertEqual(cgsnapshot_list.item(2).getAttribute('id'),
- cgsnapshot_id3)
+ self.assertEqual(cgsnapshot_id1,
+ cgsnapshot_list.item(0).getAttribute('id'))
+ self.assertEqual(cgsnapshot_id2,
+ cgsnapshot_list.item(1).getAttribute('id'))
+ self.assertEqual(cgsnapshot_id3,
+ cgsnapshot_list.item(2).getAttribute('id'))
db.cgsnapshot_destroy(context.get_admin_context(),
cgsnapshot_id3)
res_dict['cgsnapshots'][0]['description'])
self.assertEqual('test_cgsnapshot',
res_dict['cgsnapshots'][0]['name'])
- self.assertEqual(res_dict['cgsnapshots'][0]['id'],
- cgsnapshot_id1)
+ self.assertEqual(cgsnapshot_id1,
+ res_dict['cgsnapshots'][0]['id'])
self.assertEqual('creating',
res_dict['cgsnapshots'][0]['status'])
res_dict['cgsnapshots'][1]['description'])
self.assertEqual('test_cgsnapshot',
res_dict['cgsnapshots'][1]['name'])
- self.assertEqual(res_dict['cgsnapshots'][1]['id'],
- cgsnapshot_id2)
+ self.assertEqual(cgsnapshot_id2,
+ res_dict['cgsnapshots'][1]['id'])
self.assertEqual('creating',
res_dict['cgsnapshots'][1]['status'])
self.assertEqual('this is a test cgsnapshot',
res_dict['cgsnapshots'][2]['description'])
- self.assertEqual(res_dict['cgsnapshots'][2]['name'],
- 'test_cgsnapshot')
- self.assertEqual(res_dict['cgsnapshots'][2]['id'],
- cgsnapshot_id3)
+ self.assertEqual('test_cgsnapshot',
+ res_dict['cgsnapshots'][2]['name'])
+ self.assertEqual(cgsnapshot_id3,
+ res_dict['cgsnapshots'][2]['id'])
self.assertEqual('creating',
res_dict['cgsnapshots'][2]['status'])
db.cgsnapshot_destroy(context.get_admin_context(),
cgsnapshot_detail.item(0).getAttribute('description'))
self.assertEqual('test_cgsnapshot',
cgsnapshot_detail.item(0).getAttribute('name'))
- self.assertEqual(cgsnapshot_detail.item(0).getAttribute('id'),
- cgsnapshot_id1)
+ self.assertEqual(cgsnapshot_id1,
+ cgsnapshot_detail.item(0).getAttribute('id'))
self.assertEqual('creating',
cgsnapshot_detail.item(0).getAttribute('status'))
- self.assertEqual(cgsnapshot_detail.item(1).getAttribute('description'),
- 'this is a test cgsnapshot')
+ self.assertEqual('this is a test cgsnapshot',
+ cgsnapshot_detail.item(1).getAttribute('description'))
self.assertEqual('test_cgsnapshot',
cgsnapshot_detail.item(1).getAttribute('name'))
- self.assertEqual(cgsnapshot_detail.item(1).getAttribute('id'),
- cgsnapshot_id2)
+ self.assertEqual(cgsnapshot_id2,
+ cgsnapshot_detail.item(1).getAttribute('id'))
self.assertEqual('creating',
cgsnapshot_detail.item(1).getAttribute('status'))
- self.assertEqual(cgsnapshot_detail.item(2).getAttribute('description'),
- 'this is a test cgsnapshot')
+ self.assertEqual('this is a test cgsnapshot',
+ cgsnapshot_detail.item(2).getAttribute('description'))
self.assertEqual('test_cgsnapshot',
cgsnapshot_detail.item(2).getAttribute('name'))
- self.assertEqual(cgsnapshot_detail.item(2).getAttribute('id'),
- cgsnapshot_id3)
+ self.assertEqual(cgsnapshot_id3,
+ cgsnapshot_detail.item(2).getAttribute('id'))
self.assertEqual('creating',
cgsnapshot_detail.item(2).getAttribute('status'))
def test_get_volume(self):
res = self._make_request('/v2/fake/volumes/%s' % self.UUID)
self.assertEqual(200, res.status_int)
- self.assertEqual(self._get_image_metadata(res.body),
- fake_image_metadata)
+ self.assertEqual(fake_image_metadata,
+ self._get_image_metadata(res.body))
def test_list_detail_volumes(self):
res = self._make_request('/v2/fake/volumes/detail')
self.assertEqual(200, res.status_int)
- self.assertEqual(self._get_image_metadata_list(res.body)[0],
- fake_image_metadata)
+ self.assertEqual(fake_image_metadata,
+ self._get_image_metadata_list(res.body)[0])
def test_create_image_metadata(self):
self.stubs.Set(volume.API, 'get_volume_image_metadata',
def test_promote_bad_id(self):
(req, res) = self._get_resp('promote', 'fake')
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 404, msg)
+ self.assertEqual(404, res.status_int, msg)
def test_promote_bad_id_xml(self):
(req, res) = self._get_resp('promote', 'fake', xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 404, msg)
+ self.assertEqual(404, res.status_int, msg)
def test_promote_volume_not_replicated(self):
volume = tests_utils.create_volume(
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
def test_promote_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status(self,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status_xml(self,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status(self,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status_xml(self,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
def test_reenable_bad_id(self):
(req, res) = self._get_resp('reenable', 'fake')
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 404, msg)
+ self.assertEqual(404, res.status_int, msg)
def test_reenable_bad_id_xml(self):
(req, res) = self._get_resp('reenable', 'fake', xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 404, msg)
+ self.assertEqual(404, res.status_int, msg)
def test_reenable_volume_not_replicated(self):
volume = tests_utils.create_volume(
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
def test_reenable_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status(self,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status_xml(self,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 400, msg)
+ self.assertEqual(400, res.status_int, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
- self.assertEqual(res.status_int, 202, msg)
+ self.assertEqual(202, res.status_int, msg)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volume']
- self.assertEqual(vol['os-vol-tenant-attr:tenant_id'], PROJECT_ID)
+ self.assertEqual(PROJECT_ID, vol['os-vol-tenant-attr:tenant_id'])
def test_get_volume_unallowed(self):
ctx = context.RequestContext('non-admin', 'fake', False)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertEqual(vol[0]['os-vol-tenant-attr:tenant_id'], PROJECT_ID)
+ self.assertEqual(PROJECT_ID, vol[0]['os-vol-tenant-attr:tenant_id'])
def test_list_detail_volumes_unallowed(self):
ctx = context.RequestContext('non-admin', 'fake', False)
vol = etree.XML(res.body)
tenant_key = ('{http://docs.openstack.org/volume/ext/'
'volume_tenant_attribute/api/v2}tenant_id')
- self.assertEqual(vol.get(tenant_key), PROJECT_ID)
+ self.assertEqual(PROJECT_ID, vol.get(tenant_key))
def test_list_volumes_detail_xml(self):
ctx = context.RequestContext('admin', 'fake', True)
vol = list(etree.XML(res.body))[0]
tenant_key = ('{http://docs.openstack.org/volume/ext/'
'volume_tenant_attribute/api/v2}tenant_id')
- self.assertEqual(vol.get(tenant_key), PROJECT_ID)
+ self.assertEqual(PROJECT_ID, vol.get(tenant_key))
# volume_update is (context, id, new_data)
self.assertEqual(1, mock_db.call_count)
self.assertEqual(3, len(mock_db.call_args[0]), mock_db.call_args)
- self.assertEqual(mock_db.call_args[0][1], detached_vol_id)
+ self.assertEqual(detached_vol_id, mock_db.call_args[0][1])
# delete_volume is (context, status, unmanageOnly)
self.assertEqual(1, mock_rpcapi.call_count)
self.assertEqual([], common.limited(self.small, req))
self.assertEqual([], common.limited(self.medium, req))
self.assertEqual(
- common.limited(self.large, req), self.large[1001:2001])
+ self.large[1001:2001], common.limited(self.large, req))
def test_limiter_offset_blank(self):
"""Test offset key works with a blank offset."""
request = webob.Request.blank(url)
response = request.get_response(app)
output = jsonutils.loads(response.body)
- self.assertEqual(output['extension']['alias'],
- ext['alias'])
+ self.assertEqual(ext['alias'], output['extension']['alias'])
def test_get_extension_json(self):
app = router.APIRouter()
self.assertEqual(200, response.status_int)
root = etree.XML(response.body)
- self.assertEqual(root.tag.split('extensions')[0], NS)
+ self.assertEqual(NS, root.tag.split('extensions')[0])
# Make sure we have all the extensions, extras extensions being OK.
exts = root.findall('{0}extension'.format(NS))
xml = response.body
root = etree.XML(xml)
- self.assertEqual(root.tag.split('extension')[0], NS)
+ self.assertEqual(NS, root.tag.split('extension')[0])
self.assertEqual('FOXNSOX', root.get('alias'))
self.assertEqual('Fox In Socks', root.get('name'))
self.assertEqual(
data = response.raw
LOG.warning("data: %s", data)
root = etree.parse(data).getroot()
- self.assertEqual(root.nsmap.get(None), common.XML_NS_V2)
+ self.assertEqual(common.XML_NS_V2, root.nsmap.get(None))
returned_uuid = self.key_mgr.create_key(self.ctxt)
self.mock_barbican.orders.get.assert_called_once_with(order_ref_url)
- self.assertEqual(returned_uuid, self.key_id)
+ self.assertEqual(self.key_id, returned_uuid)
def test_create_null_context(self):
self.key_mgr._barbican_client = None
self.get.assert_called_once_with(self.secret_ref)
encoded = array.array('B', binascii.unhexlify(self.hex)).tolist()
- self.assertEqual(key.get_encoded(), encoded)
+ self.assertEqual(encoded, key.get_encoded())
def test_get_null_context(self):
self.key_mgr._barbican_client = None
'base64',
'AES', 32, 'CBC',
None)
- self.assertEqual(returned_uuid, self.key_id)
+ self.assertEqual(self.key_id, returned_uuid)
def test_store_key_plaintext(self):
# Create the plaintext key
b2 = self._create_backup_db_entry(project_id='project1')
backups = db.backup_get_all_by_project(self.ctxt, 'project1')
self.assertEqual(1, len(backups))
- self.assertEqual(backups[0].id, b2.id)
+ self.assertEqual(b2.id, backups[0].id)
def test_backup_get_all_by_project_with_deleted(self):
"""Test deleted backups.
backups = db.backup_get_all_by_project(self.ctxt, 'fake')
self.assertEqual(1, len(backups))
- self.assertEqual(backups[0].id, backup_keep.id)
+ self.assertEqual(backup_keep.id, backups[0].id)
ctxt_read_deleted = context.get_admin_context('yes')
backups = db.backup_get_all_by_project(ctxt_read_deleted, 'fake')
backups = db.backup_get_all_by_host(self.ctxt, 'testhost')
self.assertEqual(1, len(backups))
- self.assertEqual(backups[0].id, backup_keep.id)
+ self.assertEqual(backup_keep.id, backups[0].id)
ctxt_read_deleted = context.get_admin_context('yes')
backups = db.backup_get_all_by_host(ctxt_read_deleted, 'testhost')