if t.dump()['wwn'] == target_iqn:
target = t
break
- if target == None:
+ if target is None:
raise RtstoolError(_('Could not find target %s') % target_iqn)
tpg = target.tpgs.next() # get the first one
query = _generate_paginate_query(context, session, marker, limit,
sort_key, sort_dir, filters)
# No volumes would match, return empty list
- if query == None:
+ if query is None:
return []
return query.all()
query = _generate_paginate_query(context, session, marker, limit,
sort_key, sort_dir, filters)
# No volumes would match, return empty list
- if query == None:
+ if query is None:
return []
return query.all()
# 'no_migration_targets' is unique, must be either NULL or
# not start with 'target:'
if ('no_migration_targets' in filters and
- filters['no_migration_targets'] == True):
+ filters['no_migration_targets'] is True):
filters.pop('no_migration_targets')
try:
column_attr = getattr(models.Volume, 'migration_status')
- conditions = [column_attr == None,
+ conditions = [column_attr == None, # noqa
column_attr.op('NOT LIKE')('target:%')]
query = query.filter(or_(*conditions))
except AttributeError:
"""Return snapshots that were active during window."""
query = model_query(context, models.Snapshot, read_deleted="yes")
- query = query.filter(or_(models.Snapshot.deleted_at == None,
+ query = query.filter(or_(models.Snapshot.deleted_at == None, # noqa
models.Snapshot.deleted_at > begin))
query = query.options(joinedload(models.Snapshot.volume))
if end:
project_id=None):
"""Return volumes that were active during window."""
query = model_query(context, models.Volume, read_deleted="yes")
- query = query.filter(or_(models.Volume.deleted_at == None,
+ query = query.filter(or_(models.Volume.deleted_at == None, # noqa
models.Volume.deleted_at > begin))
if end:
query = query.filter(models.Volume.created_at < end)
@classmethod
def blank(cls, *args, **kwargs):
- if args != None:
+ if args is not None:
if args[0].find('v1') == 0:
kwargs['base_url'] = 'http://localhost/v1'
else:
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
for i in xrange(CONF.osapi_max_limit)]
- if limit == None or limit >= len(vols):
+ if limit is None or limit >= len(vols):
return vols
return vols[:limit]
self.stubs.Set(db, 'volume_get_all', stub_volume_get_all)
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
for i in xrange(100)]
- if limit == None or limit >= len(vols):
+ if limit is None or limit >= len(vols):
return vols
return vols[:limit]
self.stubs.Set(db, 'volume_get_all', stub_volume_get_all2)
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
for i in xrange(CONF.osapi_max_limit + 100)]
- if limit == None or limit >= len(vols):
+ if limit is None or limit >= len(vols):
return vols
return vols[:limit]
self.stubs.Set(db, 'volume_get_all', stub_volume_get_all3)
def test___eq__(self):
self.assertTrue(self.key == self.key)
- self.assertFalse(self.key == None)
+ self.assertFalse(self.key is None)
self.assertFalse(None == self.key)
def test___ne__(self):
self.assertFalse(self.key != self.key)
- self.assertTrue(self.key != None)
+ self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)
self.driver.delete_volume(vol)
# should not be deletable twice
prov_loc = self.backend.getVolumebyProvider(vol['provider_location'])
- self.assertTrue(prov_loc == None)
+ self.assertTrue(prov_loc is None)
def test_extend_volume(self):
vol = self._create_volume()
lun = svol['provider_location']
m_id_to_vol.return_value = svol
self.driver.delete_snapshot(svol)
- self.assertTrue(self.backend.getVolumebyProvider(lun) == None)
+ self.assertTrue(self.backend.getVolumebyProvider(lun) is None)
def test_create_volume_from_snapshot(self):
svol = self._create_volume()
def execute(*cmd, **kwargs):
"""Convenience wrapper around oslo's execute() method."""
- if 'run_as_root' in kwargs and not 'root_helper' in kwargs:
+ if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
kwargs['root_helper'] = get_root_helper()
return processutils.execute(*cmd, **kwargs)
def get_all(self, context, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None, viewable_admin_meta=False):
check_policy(context, 'get_all')
- if filters == None:
+ if filters is None:
filters = {}
try:
def _get_storage_type_conffile(self, filename=None):
"""Get the storage type from the config file."""
- if filename == None:
+ if filename is None:
filename = self.configuration.cinder_emc_config_file
file = open(filename, 'r')
out = self._eql_execute('volume', 'select', volume['name'],
'access', 'show')
connection_id = self._parse_connection(connector, out)
- if connection_id != None:
+ if connection_id is not None:
self._eql_execute('volume', 'select', volume['name'],
'access', 'delete', connection_id)
except Exception:
vols = self._get_filer_volumes()
for vol in vols:
volume = vol.get_child_content('name')
- if self.volume_list and not volume in self.volume_list:
+ if self.volume_list and volume not in self.volume_list:
continue
state = vol.get_child_content('state')
inconsistent = vol.get_child_content('is-inconsistent')
# F841,H302,H305,H307,H405
-ignore = E251,E265,E711,E712,E713,F402,F841,H104,H302,H305,H307,H402,H405,H803,H904
+ignore = E251,E265,F402,F841,H104,H302,H305,H307,H402,H405,H803,H904
builtins = _
exclude = .git,.venv,.tox,dist,tools,doc,common,*egg,build