LOG.info(res_dict)
self.assertEqual(res.status_int, 202)
- self.assertTrue('id' in res_dict['backup'])
+ self.assertIn('id', res_dict['backup'])
db.volume_destroy(context.get_admin_context(), volume_id)
self.assertEqual(tree.get('id'), quota_set['id'])
body = make_body(root=False, tenant_id=None)
for node in tree:
- self.assertTrue(node.tag in body)
+ self.assertIn(node.tag, body)
self.assertEquals(str(body[node.tag]), node.text)
self.assertEqual(len(extra_specs), len(tree))
seen = set(extra_specs.keys())
for child in tree:
- self.assertTrue(child.tag in seen)
+ self.assertIn(child.tag, seen)
self.assertEqual(extra_specs[child.tag], child.text)
seen.remove(child.tag)
self.assertEqual(len(seen), 0)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volume']
- self.assertFalse('os-vol-host-attr:host' in vol)
+ self.assertNotIn('os-vol-host-attr:host', vol)
def test_list_detail_volumes_allowed(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-host-attr:host' in vol[0])
+ self.assertNotIn('os-vol-host-attr:host', vol[0])
def test_list_simple_volumes_no_host(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-host-attr:host' in vol[0])
+ self.assertNotIn('os-vol-host-attr:host', vol[0])
def test_get_volume_xml(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volume']
- self.assertFalse('os-vol-mig-status-attr:migstat' in vol)
- self.assertFalse('os-vol-mig-status-attr:name_id' in vol)
+ self.assertNotIn('os-vol-mig-status-attr:migstat', vol)
+ self.assertNotIn('os-vol-mig-status-attr:name_id', vol)
def test_list_detail_volumes_allowed(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-mig-status-attr:migstat' in vol[0])
- self.assertFalse('os-vol-mig-status-attr:name_id' in vol[0])
+ self.assertNotIn('os-vol-mig-status-attr:migstat', vol[0])
+ self.assertNotIn('os-vol-mig-status-attr:name_id', vol[0])
def test_list_simple_volumes_no_migration_status(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-mig-status-attr:migstat' in vol[0])
- self.assertFalse('os-vol-mig-status-attr:name_id' in vol[0])
+ self.assertNotIn('os-vol-mig-status-attr:migstat', vol[0])
+ self.assertNotIn('os-vol-mig-status-attr:name_id', vol[0])
def test_get_volume_xml(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volume']
- self.assertFalse('os-vol-tenant-attr:tenant_id' in vol)
+ self.assertNotIn('os-vol-tenant-attr:tenant_id', vol)
def test_list_detail_volumes_allowed(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-tenant-attr:tenant_id' in vol[0])
+ self.assertNotIn('os-vol-tenant-attr:tenant_id', vol[0])
def test_list_simple_volumes_no_tenant_id(self):
ctx = context.RequestContext('admin', 'fake', True)
req.environ['cinder.context'] = ctx
res = req.get_response(app())
vol = json.loads(res.body)['volumes']
- self.assertFalse('os-vol-tenant-attr:tenant_id' in vol[0])
+ self.assertNotIn('os-vol-tenant-attr:tenant_id', vol[0])
def test_get_volume_xml(self):
ctx = context.RequestContext('admin', 'fake', True)
LOG.info(res_dict)
self.assertEqual(res.status_int, 202)
- self.assertTrue('id' in res_dict['transfer'])
- self.assertTrue('auth_key' in res_dict['transfer'])
- self.assertTrue('created_at' in res_dict['transfer'])
- self.assertTrue('name' in res_dict['transfer'])
- self.assertTrue('volume_id' in res_dict['transfer'])
+ self.assertIn('id', res_dict['transfer'])
+ self.assertIn('auth_key', res_dict['transfer'])
+ self.assertIn('created_at', res_dict['transfer'])
+ self.assertIn('name', res_dict['transfer'])
+ self.assertIn('volume_id', res_dict['transfer'])
db.volume_destroy(context.get_admin_context(), volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
# check response
- self.assertTrue('encryption' in res_dict)
+ self.assertIn('encryption', res_dict)
self.assertEqual(cipher, res_dict['encryption']['cipher'])
self.assertEqual(control_location,
res_dict['encryption']['control_location'])
resp = req.get_response(raiser)
self.assertEqual(resp.content_type, "application/xml")
self.assertEqual(resp.status_int, 404)
- self.assertTrue('whut?' in resp.body)
+ self.assertIn('whut?', resp.body)
def test_raise_403(self):
"""Ensure the ability to raise :class:`Fault` in WSGI-ified methods."""
resp = req.get_response(raiser)
self.assertEqual(resp.content_type, "application/xml")
self.assertEqual(resp.status_int, 403)
- self.assertTrue('resizeNotAllowed' not in resp.body)
- self.assertTrue('forbidden' in resp.body)
+ self.assertNotIn('resizeNotAllowed', resp.body)
+ self.assertIn('forbidden', resp.body)
def test_raise_localized_explanation(self):
params = ('blah', )
resp = req.get_response(raiser)
self.assertEqual(resp.content_type, "application/xml")
self.assertEqual(resp.status_int, 404)
- self.assertTrue(("Mensaje traducido") in resp.body)
+ self.assertIn(("Mensaje traducido"), resp.body)
self.stubs.UnsetAll()
def test_fault_has_status_int(self):
fault = wsgi.Fault(webob.exc.HTTPBadRequest(explanation='scram'))
response = request.get_response(fault)
- self.assertTrue(common.XML_NS_V1 in response.body)
+ self.assertIn(common.XML_NS_V1, response.body)
self.assertEqual(response.content_type, "application/xml")
self.assertEqual(response.status_int, 400)
robj = wsgi.ResponseObject({})
robj['Header'] = 'foo'
del robj['hEADER']
- self.assertFalse('header' in robj.headers)
+ self.assertNotIn('header', robj.headers)
def test_header_isolation(self):
robj = wsgi.ResponseObject({})
# Verify that the child was added
self.assertEqual(len(elem), 1)
self.assertEqual(elem[0], child)
- self.assertEqual('child' in elem, True)
+ self.assertIn('child', elem)
self.assertEqual(elem['child'], child)
# Ensure that multiple children of the same name are rejected
self.assertEqual(len(elem), 3)
for idx in range(len(elem)):
self.assertEqual(children[idx], elem[idx])
- self.assertEqual(children[idx].tag in elem, True)
+ self.assertIn(children[idx].tag, elem)
self.assertEqual(elem[children[idx].tag], children[idx])
# Ensure that multiple children of the same name are rejected
children.insert(1, child)
for idx in range(len(elem)):
self.assertEqual(children[idx], elem[idx])
- self.assertEqual(children[idx].tag in elem, True)
+ self.assertIn(children[idx].tag, elem)
self.assertEqual(elem[children[idx].tag], children[idx])
# Ensure that multiple children of the same name are rejected
self.assertEqual(len(elem), 2)
self.assertEqual(elem[0], children[0])
self.assertEqual(elem[1], children[2])
- self.assertEqual('child2' in elem, False)
+ self.assertNotIn('child2', elem)
# Ensure the child cannot be retrieved by name
def get_key(elem, key):
response = request.get_response(self.app)
self.assertEqual(response.status_int, 413)
- self.assertTrue('Retry-After' in response.headers)
+ self.assertIn('Retry-After', response.headers)
retry_after = int(response.headers['Retry-After'])
self.assertAlmostEqual(retry_after, 60, 1)
req = fakes.HTTPRequest.blank('/v1/snapshots')
resp_dict = self.controller.create(req, body)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['display_name'],
snapshot['display_name'])
self.assertEqual(resp_dict['snapshot']['display_description'],
req = fakes.HTTPRequest.blank('/v1/snapshots')
resp_dict = self.controller.create(req, body)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['display_name'],
snapshot['display_name'])
self.assertEqual(resp_dict['snapshot']['display_description'],
req = fakes.HTTPRequest.blank('/v1/snapshots/%s' % UUID)
resp_dict = self.controller.show(req, UUID)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['id'], UUID)
def test_snapshot_show_invalid_id(self):
req = fakes.HTTPRequest.blank('/v1/snapshots/detail')
resp_dict = self.controller.detail(req)
- self.assertTrue('snapshots' in resp_dict)
+ self.assertIn('snapshots', resp_dict)
resp_snapshots = resp_dict['snapshots']
self.assertEqual(len(resp_snapshots), 1)
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def test_list_snapshots_with_limit_and_offset(self):
use_admin_context=is_admin)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(2, res['snapshots'][0]['id'])
req = fakes.HTTPRequest.blank('/v1/fake/snapshots?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
def test_all_tenants_non_admin_gets_all_tenants(self):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots?all_tenants=1')
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def test_non_admin_get_by_project(self):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots')
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
request = fakes.HTTPRequest.blank("/v1")
output = view_builder.show(request, raw_volume_type)
- self.assertTrue('volume_type' in output)
+ self.assertIn('volume_type', output)
expected_volume_type = dict(name='new_type',
extra_specs={},
id=42)
request = fakes.HTTPRequest.blank("/v1")
output = view_builder.index(request, raw_volume_types)
- self.assertTrue('volume_types' in output)
+ self.assertIn('volume_types', output)
for i in range(0, 10):
expected_volume_type = dict(name='new_type',
extra_specs={},
self.assertEqual('extra_specs', extra_specs.tag)
seen = set(vtype['extra_specs'].keys())
for child in extra_specs:
- self.assertTrue(child.tag in seen)
+ self.assertIn(child.tag, seen)
self.assertEqual(vtype['extra_specs'][child.tag], child.text)
seen.remove(child.tag)
self.assertEqual(len(seen), 0)
self.assertEqual(len(vtypes), len(tree))
for child in tree:
name = child.get('name')
- self.assertTrue(name in vtypes)
+ self.assertIn(name, vtypes)
self._verify_volume_type(vtypes[name], child)
def test_voltype_serializer(self):
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
def test_admin_list_volumes_all_tenants(self):
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(3, len(res['volumes']))
def test_all_tenants_non_admin_gets_all_tenants(self):
req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1')
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
def test_non_admin_get_by_project(self):
req = fakes.HTTPRequest.blank('/v1/fake/volumes')
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
self.assertEqual(str(vol[attr]), tree.get(attr))
for child in tree:
- self.assertTrue(child.tag in (NS + 'attachments', NS + 'metadata'))
+ self.assertIn(child.tag, (NS + 'attachments', NS + 'metadata'))
if child.tag == 'attachments':
self.assertEqual(1, len(child))
self.assertEqual('attachment', child[0].tag)
elif child.tag == 'metadata':
not_seen = set(vol['metadata'].keys())
for gr_child in child:
- self.assertTrue(gr_child.get("key") in not_seen)
+ self.assertIn(gr_child.get("key"), not_seen)
self.assertEqual(str(vol['metadata'][gr_child.get("key")]),
gr_child.text)
not_seen.remove(gr_child.get('key'))
response = request.get_response(self.app)
self.assertEqual(response.status_int, 413)
- self.assertTrue('Retry-After' in response.headers)
+ self.assertIn('Retry-After', response.headers)
retry_after = int(response.headers['Retry-After'])
self.assertAlmostEqual(retry_after, 60, 1)
req = fakes.HTTPRequest.blank('/v2/snapshots')
resp_dict = self.controller.create(req, body)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['name'],
snapshot_name)
self.assertEqual(resp_dict['snapshot']['description'],
req = fakes.HTTPRequest.blank('/v2/snapshots')
resp_dict = self.controller.create(req, body)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['name'],
snapshot_name)
self.assertEqual(resp_dict['snapshot']['description'],
req = fakes.HTTPRequest.blank('/v2/snapshots/%s' % UUID)
resp_dict = self.controller.show(req, UUID)
- self.assertTrue('snapshot' in resp_dict)
+ self.assertIn('snapshot', resp_dict)
self.assertEqual(resp_dict['snapshot']['id'], UUID)
def test_snapshot_show_invalid_id(self):
req = fakes.HTTPRequest.blank('/v2/snapshots/detail')
resp_dict = self.controller.detail(req)
- self.assertTrue('snapshots' in resp_dict)
+ self.assertIn('snapshots', resp_dict)
resp_snapshots = resp_dict['snapshots']
self.assertEqual(len(resp_snapshots), 1)
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def test_list_snapshots_with_limit_and_offset(self):
use_admin_context=is_admin)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(2, res['snapshots'][0]['id'])
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(3, len(res['snapshots']))
def test_all_tenants_non_admin_gets_all_tenants(self):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots?all_tenants=1')
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def test_non_admin_get_by_project(self):
req = fakes.HTTPRequest.blank('/v2/fake/snapshots')
res = self.controller.index(req)
- self.assertTrue('snapshots' in res)
+ self.assertIn('snapshots', res)
self.assertEqual(1, len(res['snapshots']))
def _create_snapshot_bad_body(self, body):
request = fakes.HTTPRequest.blank("/v2")
output = view_builder.show(request, raw_volume_type)
- self.assertTrue('volume_type' in output)
+ self.assertIn('volume_type', output)
expected_volume_type = dict(
name='new_type',
extra_specs={},
request = fakes.HTTPRequest.blank("/v2")
output = view_builder.index(request, raw_volume_types)
- self.assertTrue('volume_types' in output)
+ self.assertIn('volume_types', output)
for i in range(0, 10):
expected_volume_type = dict(
name='new_type',
self.assertEqual('extra_specs', extra_specs.tag)
seen = set(vtype['extra_specs'].keys())
for child in extra_specs:
- self.assertTrue(child.tag in seen)
+ self.assertIn(child.tag, seen)
self.assertEqual(vtype['extra_specs'][child.tag], child.text)
seen.remove(child.tag)
self.assertEqual(len(seen), 0)
self.assertEqual(len(vtypes), len(tree))
for child in tree:
name = child.get('name')
- self.assertTrue(name in vtypes)
+ self.assertIn(name, vtypes)
self._verify_volume_type(vtypes[name], child)
def test_voltype_serializer(self):
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
def test_admin_list_volumes_all_tenants(self):
req = fakes.HTTPRequest.blank('/v2/fake/volumes?all_tenants=1',
use_admin_context=True)
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(3, len(res['volumes']))
def test_all_tenants_non_admin_gets_all_tenants(self):
req = fakes.HTTPRequest.blank('/v2/fake/volumes?all_tenants=1')
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
def test_non_admin_get_by_project(self):
req = fakes.HTTPRequest.blank('/v2/fake/volumes')
res = self.controller.index(req)
- self.assertTrue('volumes' in res)
+ self.assertIn('volumes', res)
self.assertEqual(1, len(res['volumes']))
def _create_volume_bad_request(self, body):
self.assertEqual(str(vol[attr]), tree.get(attr))
for child in tree:
- self.assertTrue(child.tag in (NS + 'attachments', NS + 'metadata'))
+ self.assertIn(child.tag, (NS + 'attachments', NS + 'metadata'))
if child.tag == 'attachments':
self.assertEqual(1, len(child))
self.assertEqual('attachment', child[0].tag)
elif child.tag == 'metadata':
not_seen = set(vol['metadata'].keys())
for gr_child in child:
- self.assertTrue(gr_child.get("key") in not_seen)
+ self.assertIn(gr_child.get("key"), not_seen)
self.assertEqual(str(vol['metadata'][gr_child.get("key")]),
gr_child.text)
not_seen.remove(gr_child.get('key'))
def __init__(inst, version, *args, **kwargs):
self.assertEqual('1', version)
self.assertEqual('http://fake_host:9292', args[0])
- self.assertFalse('token' in kwargs)
+ self.assertNotIn('token', kwargs)
self.assertEqual(60, kwargs['timeout'])
self.stubs.Set(glance.glanceclient, 'Client', MyGlanceStubClient)
self.assertEqual("1", version)
self.assertEqual("http://fake_host:9292", args[0])
self.assertEqual(True, kwargs['token'])
- self.assertFalse('timeout' in kwargs)
+ self.assertNotIn('timeout', kwargs)
self.stubs.Set(glance.glanceclient, 'Client', MyGlanceStubClient)
client = glance._create_glance_client(self.context, 'fake_host:9292',
# It should also be in the all-volume list
volumes = self.api.get_volumes()
volume_names = [volume['id'] for volume in volumes]
- self.assertTrue(created_volume_id in volume_names)
+ self.assertIn(created_volume_id, volume_names)
# Wait (briefly) for creation. Delay is due to the 'message queue'
found_volume = self._poll_while(created_volume_id, ['creating'])
filter_properties=filter_properties)
# should not have retry info in the populated filter properties:
- self.assertFalse("retry" in filter_properties)
+ self.assertNotIn("retry", filter_properties)
def test_retry_attempt_one(self):
# Test retry logic on initial scheduling attempt.
super(ConfigTestCase, self).setUp()
def test_declare(self):
- self.assert_('answer' not in CONF)
+ self.assertNotIn('answer', CONF)
CONF.import_opt('answer', 'cinder.tests.declare_conf')
- self.assert_('answer' in CONF)
+ self.assertIn('answer', CONF)
self.assertEqual(CONF.answer, 42)
# Make sure we don't overwrite anything
self.assertEqual(CONF.answer, 256)
def test_runtime_and_unknown_conf(self):
- self.assert_('runtime_answer' not in CONF)
+ self.assertNotIn('runtime_answer', CONF)
import cinder.tests.runtime_conf
- self.assert_('runtime_answer' in CONF)
+ self.assertIn('runtime_answer', CONF)
self.assertEqual(CONF.runtime_answer, 54)
def test_long_vs_short_conf(self):
res_names = ['gigabytes', 'volumes']
for uuid in reservations:
reservation = db.reservation_get(self.ctxt, uuid)
- self.assertTrue(reservation.resource in res_names)
+ self.assertIn(reservation.resource, res_names)
res_names.remove(reservation.resource)
def test_quota_destroy_all_by_project(self):
vol = self.test_create_volume()
self.mox.StubOutWithMock(self.driver, '_update_vol_location')
conn = self.driver.initialize_connection(vol, connector)
- self.assertTrue('hitachi' in conn['data']['target_iqn'])
- self.assertTrue('3260' in conn['data']['target_portal'])
+ self.assertIn('hitachi', conn['data']['target_iqn'])
+ self.assertIn('3260', conn['data']['target_portal'])
vol['provider_location'] = conn['data']['provider_location']
return (vol, connector)
for metadata in self.metadatas_downgraded_from(6):
snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
- self.assertTrue('provider_location' not in snapshots.c)
+ self.assertNotIn('provider_location', snapshots.c)
def test_upgrade_007_adds_fk(self):
for metadata in self.metadatas_upgraded_to(7):
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
- self.assertTrue('bootable' not in volumes.c)
+ self.assertNotIn('bootable', volumes.c)
# Make sure we put all the columns back
for column in volumes_v10.c:
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
- self.assertTrue('attached_host' not in volumes.c)
+ self.assertNotIn('attached_host', volumes.c)
def test_migration_013(self):
"""Test that adding provider_geometry column works correctly."""
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
- self.assertTrue('provider_geometry' not in volumes.c)
+ self.assertNotIn('provider_geometry', volumes.c)
def test_migration_014(self):
"""Test that adding _name_id column works correctly."""
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
- self.assertTrue('_name_id' not in volumes.c)
+ self.assertNotIn('_name_id', volumes.c)
def test_migration_015(self):
"""Test removing migrations table works correctly."""
# encryption key UUID
volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
- self.assertTrue('encryption_key_id' in volumes.c)
+ self.assertIn('encryption_key_id', volumes.c)
self.assertTrue(isinstance(volumes.c.encryption_key_id.type,
sqlalchemy.types.VARCHAR))
snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
- self.assertTrue('encryption_key_id' in snapshots.c)
+ self.assertIn('encryption_key_id', snapshots.c)
self.assertTrue(isinstance(snapshots.c.encryption_key_id.type,
sqlalchemy.types.VARCHAR))
- self.assertTrue('volume_type_id' in snapshots.c)
+ self.assertIn('volume_type_id', snapshots.c)
self.assertTrue(isinstance(snapshots.c.volume_type_id.type,
sqlalchemy.types.VARCHAR))
metadata.bind = engine
volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
- self.assertTrue('encryption_key_id' not in volumes.c)
+ self.assertNotIn('encryption_key_id', volumes.c)
snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
- self.assertTrue('encryption_key_id' not in snapshots.c)
+ self.assertNotIn('encryption_key_id', snapshots.c)
self.assertFalse(engine.dialect.has_table(engine.connect(),
'encryption'))
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
- self.assertTrue('migration_status' not in volumes.c)
+ self.assertNotIn('migration_status', volumes.c)
def test_migration_020(self):
"""Test adding volume_admin_metadata table works correctly."""
type_ref['id'])
res = qos_specs.get_associations(self.ctxt, specs_id)
self.assertEquals(len(res[specs_id].keys()), 1)
- self.assertTrue('TypeName' in res[specs_id].keys())
- self.assertTrue(type_ref['id'] in res[specs_id].values())
+ self.assertIn('TypeName', res[specs_id].keys())
+ self.assertIn(type_ref['id'], res[specs_id].values())
self.stubs.Set(db, 'qos_specs_associate',
fake_db_associate)
admin_context = context.get_admin_context()
iscsi_target = db.volume_get_iscsi_target_num(admin_context,
volume_id)
- self.assert_(iscsi_target not in targets)
+ self.assertNotIn(iscsi_target, targets)
targets.append(iscsi_target)
total_slots = CONF.iscsi_num_targets
def fake_reschedule_or_error(self, context, *args, **kwargs):
self.assertFalse(context.is_admin)
- self.assertFalse('admin' in context.roles)
+ self.assertNotIn('admin', context.roles)
#compare context passed in with the context we saved
self.assertDictMatch(self.saved_ctxt.__dict__,
context.__dict__)
self.fake_snapshot = jsonutils.to_primitive(snapshot)
def test_serialized_volume_has_id(self):
- self.assertTrue('id' in self.fake_volume)
+ self.assertIn('id', self.fake_volume)
def _test_volume_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
search_opts={'extra_specs': {"key1": "val1"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
- self.assertTrue("type1" in vol_types.keys())
+ self.assertIn("type1", vol_types.keys())
self.assertEqual(vol_types['type1']['extra_specs'],
{"key1": "val1", "key2": "val2"})
search_opts={'extra_specs': {"key2": "val2"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
- self.assertTrue("type1" in vol_types.keys())
- self.assertTrue("type2" in vol_types.keys())
+ self.assertIn("type1", vol_types.keys())
+ self.assertIn("type2", vol_types.keys())
vol_types = volume_types.get_all_types(
self.ctxt,
search_opts={'extra_specs': {"key3": "val3"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
- self.assertTrue("type2" in vol_types.keys())
+ self.assertIn("type2", vol_types.keys())
def test_volume_type_search_by_extra_spec_multiple(self):
"""Ensure volume types get by extra spec returns correct type."""
"key3": "val3"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
- self.assertTrue("type1" in vol_types.keys())
- self.assertTrue("type3" in vol_types.keys())
+ self.assertIn("type1", vol_types.keys())
+ self.assertIn("type3", vol_types.keys())
self.assertEqual(vol_types['type1']['extra_specs'],
{"key1": "val1", "key2": "val2", "key3": "val3"})
self.assertEqual(vol_types['type3']['extra_specs'],
for attr in ('display_name', 'created_at', 'launched_at',
'status', 'audit_period_beginning',
'audit_period_ending'):
- self.assertTrue(attr in payload,
- msg="Key %s not in payload" % attr)
+ self.assertIn(attr, payload)
db.volume_destroy(context.get_admin_context(), volume['id'])
def test_get_host_from_queue_simple(self):
api = self._wsgi_app(fail)
resp = webob.Request.blank('/').get_response(api)
- self.assertTrue('{"computeFault' in resp.body, resp.body)
+ self.assertIn('{"computeFault', resp.body)
expected = ('ExceptionWithSafety: some explanation' if expose else
'The server has either erred or is incapable '
'of performing the requested operation.')
- self.assertTrue(expected in resp.body, resp.body)
+ self.assertIn(expected, resp.body)
self.assertEqual(resp.status_int, 500, resp.body)
def test_safe_exceptions_are_described_in_faults(self):
api = self._wsgi_app(fail)
resp = webob.Request.blank('/').get_response(api)
- self.assertTrue(msg in resp.body, resp.body)
+ self.assertIn(msg, resp.body)
self.assertEqual(resp.status_int, exception_type.code, resp.body)
if hasattr(exception_type, 'headers'):
for (key, value) in exception_type.headers.iteritems():
- self.assertTrue(key in resp.headers)
+ self.assertIn(key, resp.headers)
self.assertEquals(resp.headers[key], value)
def test_quota_error_mapping(self):
]
for metric in required_metrics:
- self.assertTrue(metric in stats)
+ self.assertIn(metric, stats)
def test_get_volume_stats_reports_unknown_cap(self):
drv = get_configured_driver()