LOG.debug('_read_metadata started, container name: %(container)s, '
'metadata filename: %(filename)s' %
{'container': container, 'filename': filename})
- (resp, body) = self.conn.get_object(container, filename)
+ (_resp, body) = self.conn.get_object(container, filename)
metadata = json.loads(body)
LOG.debug('_read_metadata finished (%s)' % metadata)
return metadata
'volume_id': volume_id,
})
try:
- (resp, body) = self.conn.get_object(container, object_name)
+ (_resp, body) = self.conn.get_object(container, object_name)
except socket.error as err:
raise exception.SwiftConnectionFailed(reason=err)
compression_algorithm = metadata_object[object_name]['compression']
"""Get the Fibre Channel HBA information."""
out = None
try:
- out, err = self._execute('systool', '-c', 'fc_host', '-v',
- run_as_root=True,
- root_helper=self._root_helper)
+ out, _err = self._execute('systool', '-c', 'fc_host', '-v',
+ run_as_root=True,
+ root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
# This handles the case where rootwrap is used
# and systool is not installed
self.echo_scsi_command(path, "1")
def get_device_info(self, device):
- (out, err) = self._execute('sg_scan', device, run_as_root=True,
- root_helper=self._root_helper)
+ (out, _err) = self._execute('sg_scan', device, run_as_root=True,
+ root_helper=self._root_helper)
dev_info = {'device': device, 'host': None,
'channel': None, 'id': None, 'lun': None}
if out:
devices = []
out = None
try:
- (out, err) = self._execute('multipath', '-l', device,
- run_as_root=True,
- root_helper=self._root_helper)
+ (out, _err) = self._execute('multipath', '-l', device,
+ run_as_root=True,
+ root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
LOG.warn(_("multipath call failed exit (%(code)s)")
% {'code': exc.exit_code})
self.volumes_dir = volumes_dir
def _get_target(self, iqn):
- (out, err) = self._execute('tgt-admin', '--show', run_as_root=True)
+ (out, _err) = self._execute('tgt-admin', '--show', run_as_root=True)
lines = out.split('\n')
for line in lines:
if iqn in line:
capture = False
target_info = []
- (out, err) = self._execute('tgt-admin', '--show', run_as_root=True)
+ (out, _err) = self._execute('tgt-admin', '--show', run_as_root=True)
lines = out.split('\n')
for line in lines:
raise
def _get_target(self, iqn):
- (out, err) = self._execute('cinder-rtstool',
- 'get-targets',
- run_as_root=True)
+ (out, _err) = self._execute('cinder-rtstool',
+ 'get-targets',
+ run_as_root=True)
lines = out.split('\n')
for line in lines:
if iqn in line:
def initialize_connection(self, volume, connector):
volume_iqn = volume['provider_location'].split(' ')[1]
- (auth_method, auth_user, auth_pass) = \
+ (_auth_method, auth_user, auth_pass) = \
volume['provider_auth'].split(' ', 3)
# Add initiator iqns to target ACL
"""
exists = False
- (out, err) = self._execute(
+ (out, _err) = self._execute(
'env', 'LC_ALL=C', 'vgs', '--noheadings', '-o', 'name',
self.vg_name, root_helper=self._root_helper, run_as_root=True)
self._execute(*cmd, root_helper=self._root_helper, run_as_root=True)
def _get_vg_uuid(self):
- (out, err) = self._execute('env', 'LC_ALL=C', 'vgs', '--noheadings',
- '-o uuid', self.vg_name)
+ (out, _err) = self._execute('env', 'LC_ALL=C', 'vgs', '--noheadings',
+ '-o uuid', self.vg_name)
if out is not None:
return out.split()
else:
"""
cmd = ['env', 'LC_ALL=C', 'vgs', '--version']
- (out, err) = putils.execute(*cmd,
- root_helper=root_helper,
- run_as_root=True)
+ (out, _err) = putils.execute(*cmd,
+ root_helper=root_helper,
+ run_as_root=True)
lines = out.split('\n')
for line in lines:
cmd.append(vg_name)
lvs_start = time.time()
- (out, err) = putils.execute(*cmd,
- root_helper=root_helper,
- run_as_root=True)
+ (out, _err) = putils.execute(*cmd,
+ root_helper=root_helper,
+ run_as_root=True)
total_time = time.time() - lvs_start
if total_time > 60:
LOG.warning(_('Took %s seconds to get logical volumes.'),
'--separator', ':',
'--nosuffix']
- (out, err) = putils.execute(*cmd,
- root_helper=root_helper,
- run_as_root=True)
+ (out, _err) = putils.execute(*cmd,
+ root_helper=root_helper,
+ run_as_root=True)
pvs = out.split()
if vg_name is not None:
cmd.append(vg_name)
start_vgs = time.time()
- (out, err) = putils.execute(*cmd,
- root_helper=root_helper,
- run_as_root=True)
+ (out, _err) = putils.execute(*cmd,
+ root_helper=root_helper,
+ run_as_root=True)
total_time = time.time() - start_vgs
if total_time > 60:
LOG.warning(_('Took %s seconds to get volume groups.'), total_time)
run_as_root=True)
def lv_has_snapshot(self, name):
- out, err = self._execute(
+ out, _err = self._execute(
'env', 'LC_ALL=C', 'lvdisplay', '--noheading',
'-C', '-o', 'Attr', '%s/%s' % (self.vg_name, name),
root_helper=self._root_helper, run_as_root=True)
self._get_hash_str(device_name))
def _read_mounts(self):
- (out, err) = self._execute('mount', check_exit_code=0)
+ (out, _err) = self._execute('mount', check_exit_code=0)
lines = out.split('\n')
mounts = {}
for line in lines:
try:
csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
csock.connect(('8.8.8.8', 80))
- (addr, port) = csock.getsockname()
+ (addr, _port) = csock.getsockname()
csock.close()
return addr
except socket.error:
def _sync_volumes(context, project_id, session, volume_type_id=None,
volume_type_name=None):
- (volumes, gigs) = _volume_data_get_for_project(
+ (volumes, _gigs) = _volume_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'volumes'
if volume_type_name:
def _sync_snapshots(context, project_id, session, volume_type_id=None,
volume_type_name=None):
- (snapshots, gigs) = _snapshot_data_get_for_project(
+ (snapshots, _gigs) = _snapshot_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'snapshots'
if volume_type_name:
def _sync_backups(context, project_id, session, volume_type_id=None,
volume_type_name=None):
- (backups, gigs) = _backup_data_get_for_project(
+ (backups, _gigs) = _backup_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'backups'
return {key: backups}
def consistencygroup_update(context, consistencygroup_id, values):
session = get_session()
with session.begin():
- result = model_query(context, models.ConsistencyGroup, project_only=True).\
+ result = model_query(context, models.ConsistencyGroup,
+ project_only=True).\
filter_by(id=consistencygroup_id).\
first()
def _reraise_translated_image_exception(image_id):
"""Transform the exception for the image but keep its traceback intact."""
- exc_type, exc_value, exc_trace = sys.exc_info()
+ _exc_type, exc_value, exc_trace = sys.exc_info()
new_exc = _translate_image_exception(image_id, exc_value)
raise new_exc, None, exc_trace
def _reraise_translated_exception():
"""Transform the exception but keep its traceback intact."""
- exc_type, exc_value, exc_trace = sys.exc_info()
+ _exc_type, exc_value, exc_trace = sys.exc_info()
new_exc = _translate_plain_exception(exc_value)
raise new_exc, None, exc_trace
cmd = ('env', 'LC_ALL=C', 'qemu-img', 'info', path)
if os.name == 'nt':
cmd = cmd[2:]
- out, err = utils.execute(*cmd, run_as_root=run_as_root)
+ out, _err = utils.execute(*cmd, run_as_root=run_as_root)
return imageutils.QemuImgInfo(out)
def get_vhd_size(vhd_path):
- out, err = utils.execute('vhd-util', 'query', '-n', vhd_path, '-v')
+ out, _err = utils.execute('vhd-util', 'query', '-n', vhd_path, '-v')
return int(out)
controller = Controller()
resource = wsgi.Resource(controller)
- method, extensions = resource.get_method(None, 'index', None, '')
+ method, _extensions = resource.get_method(None, 'index', None, '')
actual = resource.dispatch(method, None, {'pants': 'off'})
expected = 'off'
self.assertEqual(actual, expected)
controller = Controller()
resource = wsgi.Resource(controller)
- method, extensions = resource.get_method(None, 'action',
- 'application/json',
- '{"fooAction": true}')
+ method, _extensions = resource.get_method(None, 'action',
+ 'application/json',
+ '{"fooAction": true}')
self.assertEqual(controller._action_foo, method)
def test_get_method_action_xml(self):
controller = Controller()
resource = wsgi.Resource(controller)
- method, extensions = resource.get_method(None, 'action',
- 'application/xml',
- '<fooAction>true</fooAction>')
+ method, _extensions = resource.get_method(
+ None, 'action', 'application/xml', '<fooAction>true</fooAction>')
self.assertEqual(controller._action_foo, method)
def test_get_method_action_bad_body(self):
controller = Controller()
resource = wsgi.Resource(controller)
- method, extensions = resource.get_method(None, 'action',
- 'application/xml',
- '<fooAction>true</fooAction')
+ method, _extensions = resource.get_method(None, 'action',
+ 'application/xml',
+ '<fooAction>true</fooAction')
self.assertEqual(controller.action, method)
def test_get_action_args(self):
self.assertDictMatch(volume_info, expected_info)
def test_connect_volume_could_not_discover_path(self):
- aoe_device, aoe_path = self.connector._get_aoe_info(
+ _aoe_device, aoe_path = self.connector._get_aoe_info(
self.connection_properties)
number_of_calls = 4
self._mock_path_exists(aoe_path, [False] * (number_of_calls + 1))
self.mox.StubOutWithMock(self.connector, '_execute')
- for i in xrange(number_of_calls):
+ for _i in xrange(number_of_calls):
self.connector._execute('aoe-discover',
run_as_root=True,
root_helper='sudo',
def test_glance_client_image_id(self):
fixture = self._make_fixture(name='test image')
image_id = self.service.create(self.context, fixture)['id']
- (service, same_id) = glance.get_remote_image_service(self.context,
- image_id)
+ (_service, same_id) = glance.get_remote_image_service(self.context,
+ image_id)
self.assertEqual(same_id, image_id)
def test_glance_client_image_ref(self):
# Create a file with some data in it.
self.volume_file = tempfile.NamedTemporaryFile()
self.addCleanup(self.volume_file.close)
- for i in xrange(0, self.num_chunks):
+ for _i in xrange(0, self.num_chunks):
data = os.urandom(self.chunk_size)
self.checksum.update(data)
self.volume_file.write(data)
checksum = hashlib.sha256()
test_file.seek(0)
- for c in xrange(0, self.num_chunks):
+ for _c in xrange(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
checksum = hashlib.sha256()
test_file.seek(0)
- for c in xrange(0, self.num_chunks):
+ for _c in xrange(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
checksum = hashlib.sha256()
test_file.seek(0)
- for c in xrange(0, self.num_chunks):
+ for _c in xrange(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
self._create_volume_db_entry()
self.volume_file = tempfile.NamedTemporaryFile()
self.addCleanup(self.volume_file.close)
- for i in xrange(0, 128):
+ for _i in xrange(0, 128):
self.volume_file.write(os.urandom(1024))
def test_backup_swift_url(self):
mock.patch.object(self._driver, '_load_shares_config'),
mock.patch.object(self._driver, '_do_umount'),
mock.patch.object(glusterfs, 'LOG')
- ) as (mock_load_shares_config, mock_do_umount, mock_logger):
+ ) as (_mock_load_shares_config, mock_do_umount, mock_logger):
mock_do_umount.side_effect = Exception()
self._driver._unmount_shares()
with contextlib.nested(
mock.patch.object(self._driver, '_load_shares_config'),
mock.patch.object(self._driver, '_do_umount')
- ) as (mock_load_shares_config, mock_do_umount):
+ ) as (_mock_load_shares_config, mock_do_umount):
self._driver._unmount_shares()
self.assertTrue(mock_do_umount.called)
with contextlib.nested(
mock.patch.object(self._driver, '_load_shares_config'),
mock.patch.object(self._driver, '_do_umount')
- ) as (mock_load_shares_config, mock_do_umount):
+ ) as (_mock_load_shares_config, mock_do_umount):
self._driver._unmount_shares()
mock_do_umount.assert_any_call(True,
volume_types.get_volume_type(ctxt, old_type_ref['id'])
new_type = volume_types.get_volume_type(ctxt, new_type_ref['id'])
- diff, equal = volume_types.volume_types_diff(ctxt,
- old_type_ref['id'],
- new_type_ref['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt,
+ old_type_ref['id'],
+ new_type_ref['id'])
volume = {}
volume['name'] = 'test'
Walks all version scripts for each tested database, ensuring
that there are no errors in the version scripts for each engine
"""
- for key, engine in self.engines.items():
+ for _key, engine in self.engines.items():
self._walk_versions(engine, self.snake_walk)
def test_mysql_connect_fail(self):
def test_migration_005(self):
"""Test that adding source_volid column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
sqlalchemy.types.VARCHAR)
def _metadatas(self, upgrade_to, downgrade_to=None):
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_008(self):
"""Test that adding and removing the backups table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_009(self):
"""Test adding snapshot_metadata table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_010(self):
"""Test adding transfers table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_011(self):
"""Test adding transfers table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_012(self):
"""Test that adding attached_host column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_013(self):
"""Test that adding provider_geometry column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_014(self):
"""Test that adding _name_id column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_015(self):
"""Test removing migrations table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_016(self):
"""Test that dropping xen storage manager tables works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
"""Test that added encryption information works correctly."""
# upgrade schema
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_018(self):
"""Test that added qos_specs table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_019(self):
"""Test that adding migration_status column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_020(self):
"""Test adding volume_admin_metadata table works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_021(self):
"""Test adding default data for quota classes works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_022(self):
"""Test that adding disabled_reason column works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_023(self):
"""Test that adding reservations index works correctly."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_024(self):
"""Test adding replication columns to volume table."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_025(self):
"""Test adding table and columns for consistencygroups."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_migration_026(self):
"""Test adding default data for consistencygroups quota class."""
- for (key, engine) in self.engines.items():
+ for (_key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
def test_too_many_volumes(self):
volume_ids = []
- for i in range(CONF.quota_volumes):
+ for _i in range(CONF.quota_volumes):
vol_ref = self._create_volume()
volume_ids.append(vol_ref['id'])
self.assertRaises(exception.VolumeLimitExceeded,
old_type_ref = volume_types.create(ctxt, 'old', key_specs_old)
new_type_ref = volume_types.create(ctxt, 'new', key_specs_new)
- diff, equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
- new_type_ref['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
+ new_type_ref['id'])
volume = self._generate_vol_info(None, None)
old_type = volume_types.get_volume_type(ctxt, old_type_ref['id'])
old_type_ref = volume_types.create(ctxt, 'old', key_specs_old)
new_type_ref = volume_types.create(ctxt, 'new', key_specs_new)
- diff, equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
- new_type_ref['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
+ new_type_ref['id'])
volume = self._generate_vol_info(None, None)
old_type = volume_types.get_volume_type(ctxt, old_type_ref['id'])
old_type_ref = volume_types.create(ctxt, 'old', key_specs_old)
new_type_ref = volume_types.create(ctxt, 'new', key_specs_new)
- diff, equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
- new_type_ref['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt, old_type_ref['id'],
+ new_type_ref['id'])
volume = self._generate_vol_info(None, None)
old_type = volume_types.get_volume_type(ctxt, old_type_ref['id'])
disable_type = self._create_replication_volume_type(False)
enable_type = self._create_replication_volume_type(True)
- diff, equal = volume_types.volume_types_diff(ctxt,
- disable_type['id'],
- enable_type['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt,
+ disable_type['id'],
+ enable_type['id'])
volume = self._generate_vol_info(None, None)
volume['host'] = host
self.assertIsNone(model_update)
enable_type = self._create_replication_volume_type(True)
- diff, equal = volume_types.volume_types_diff(ctxt,
- None,
- enable_type['id'])
+ diff, _equal = volume_types.volume_types_diff(ctxt,
+ None,
+ enable_type['id'])
# Enable replica
self.driver.retype(ctxt, volume, enable_type, diff, host)
the vdisk_UID parameter and returns it.
Returns None if the specified vdisk does not exist.
"""
- vdisk_properties, err = self.sim._cmd_lsvdisk(obj=vdisk_name,
- delim='!')
+ vdisk_properties, _err = self.sim._cmd_lsvdisk(obj=vdisk_name,
+ delim='!')
# Iterate through each row until we find the vdisk_UID entry
for row in vdisk_properties.split('\n'):
# Create a volume as a way of getting a vdisk created, and find out the
# UID of that vdisk.
- volume, uid = self._create_volume_and_return_uid('manage_test')
+ _volume, uid = self._create_volume_and_return_uid('manage_test')
# Descriptor of the Cinder volume that we want to own the vdisk
# referenced by uid.
cg_name = self.url.split('/')[3]
snap_name = params['display_name']
- for (vol_name, params) in RUNTIME_VARS['volumes']:
+ for (_vol_name, params) in RUNTIME_VARS['volumes']:
if params['cg-name'] == cg_name:
snapshots = params['snapshots']
if snap_name in snapshots:
def _delete_snapshot(self):
snap = self.url.split('/')[3].split('.')[0]
- for (vol_name, params) in RUNTIME_VARS['volumes']:
+ for (_vol_name, params) in RUNTIME_VARS['volumes']:
if snap in params['snapshots']:
params['snapshots'].remove(snap)
return RUNTIME_VARS['good']
<pool-name>pool-00000001</pool-name>
</snapshot>"""
- for (vol_name, params) in RUNTIME_VARS['volumes']:
+ for (_vol_name, params) in RUNTIME_VARS['volumes']:
if params['cg-name'] == cg_name:
snapshots = params['snapshots']
resp = header
return used_devices
def _get_device_size(self, dev_path):
- out, err = self._execute('blockdev', '--getsz', dev_path,
- run_as_root=True)
+ out, _err = self._execute('blockdev', '--getsz', dev_path,
+ run_as_root=True)
size_in_m = int(out)
return size_in_m / 2048
info = _loc_info(prov_loc)
(arid, lun) = info['id_lu']
if 'tgt' in info.keys(): # connected?
- (_portal, iqn, loc, ctl, port) = info['tgt']
+ (_portal, iqn, _loc, ctl, port) = info['tgt']
self.bend.del_iscsi_conn(self.config['hus_cmd'],
HDS_VERSION,
self.config['mgmt_ip0'],
for lun in api_luns:
meta_dict = self._create_lun_meta(lun)
path = lun.get_child_content('path')
- (rest, splitter, name) = path.rpartition('/')
+ (_rest, _splitter, name) = path.rpartition('/')
handle = self._create_lun_handle(meta_dict)
size = lun.get_child_content('size')
discovered_lun = NetAppLun(handle, name,
msg_fmt = {'code': code, 'message': message}
exc_info = sys.exc_info()
LOG.warn(msg % msg_fmt)
- (igroup, lun_id) = self._find_mapped_lun_igroup(path, initiator)
+ (_igroup, lun_id) = self._find_mapped_lun_igroup(path, initiator)
if lun_id is not None:
return lun_id
else:
def _unmap_lun(self, path, initiator):
"""Unmaps a lun from given initiator."""
- (igroup_name, lun_id) = self._find_mapped_lun_igroup(path, initiator)
+ (igroup_name, _lun_id) = self._find_mapped_lun_igroup(path, initiator)
lun_unmap = NaElement.create_node_with_children(
'lun-unmap',
**{'path': path, 'initiator-group': igroup_name})
zbc = block_count
if z_calls == 0:
z_calls = 1
- for call in range(0, z_calls):
+ for _call in range(0, z_calls):
if zbc > z_limit:
block_count = z_limit
zbc -= z_limit
block_ranges = NaElement("block-ranges")
segments = int(math.ceil(block_count / float(bc_limit)))
bc = block_count
- for segment in range(0, segments):
+ for _segment in range(0, segments):
if bc > bc_limit:
block_count = bc_limit
bc -= bc_limit
"""Clone LUN with the given handle to the new name."""
metadata = self._get_lun_attr(name, 'metadata')
path = metadata['Path']
- (parent, splitter, name) = path.rpartition('/')
+ (parent, _splitter, name) = path.rpartition('/')
clone_path = '%s/%s' % (parent, new_name)
# zAPI can only handle 2^24 blocks per range
bc_limit = 2 ** 24 # 8GB
zbc = block_count
if z_calls == 0:
z_calls = 1
- for call in range(0, z_calls):
+ for _call in range(0, z_calls):
if zbc > z_limit:
block_count = z_limit
zbc -= z_limit
bc_limit = 2 ** 24 # 8GB
segments = int(math.ceil(block_count / float(bc_limit)))
bc = block_count
- for segment in range(0, segments):
+ for _segment in range(0, segments):
if bc > bc_limit:
block_count = bc_limit
bc -= bc_limit
self.configuration.thres_avl_size_perc_stop
for share in getattr(self, '_mounted_shares', []):
try:
- total_size, total_avl, total_alc =\
+ total_size, total_avl, _total_alc =\
self._get_capacity_info(share)
avl_percent = int((total_avl / total_size) * 100)
if avl_percent <= thres_size_perc_start:
def _check_share_can_hold_size(self, share, size):
"""Checks if volume can hold image with size."""
- tot_size, tot_available, tot_allocated = self._get_capacity_info(share)
+ _tot_size, tot_available, _tot_allocated = self._get_capacity_info(
+ share)
if tot_available < size:
msg = _("Container size smaller than required file size.")
raise exception.VolumeDriverException(msg)
def _clone_volume(self, volume_name, clone_name,
volume_id, share=None):
"""Clones mounted volume with NetApp filer."""
- (host_ip, export_path) = self._get_export_ip_path(volume_id, share)
+ (_host_ip, export_path) = self._get_export_ip_path(volume_id, share)
storage_path = self._get_actual_path_for_export(export_path)
target_path = '%s/%s' % (storage_path, clone_name)
(clone_id, vol_uuid) = self._start_clone('%s/%s' % (storage_path,
for nfs_share in self._mounted_shares:
if not self._is_share_eligible(nfs_share, volume_size_in_gib):
continue
- total_size, total_available, total_allocated = \
+ _total_size, _total_available, total_allocated = \
self._get_capacity_info(nfs_share)
if target_share is not None:
if target_share_reserved > total_allocated:
"""
parent_volume = self.rbd.Image(client.ioctx, volume_name)
try:
- pool, parent, snap = self._get_clone_info(parent_volume,
- volume_name)
+ _pool, parent, _snap = self._get_clone_info(parent_volume,
+ volume_name)
finally:
parent_volume.close()
try:
# First flatten source volume if required.
if flatten_parent:
- pool, parent, snap = self._get_clone_info(src_volume,
- src_name)
+ _pool, parent, snap = self._get_clone_info(src_volume,
+ src_name)
# Flatten source volume
LOG.debug("flattening source volume %s" % (src_name))
src_volume.flatten()
raise exception.VolumeIsBusy(volume_name=volume_name)
# Determine if this volume is itself a clone
- pool, parent, parent_snap = self._get_clone_info(rbd_image,
- volume_name,
- clone_snap)
+ _pool, parent, parent_snap = self._get_clone_info(rbd_image,
+ volume_name,
+ clone_snap)
finally:
rbd_image.close()
if image_location is None or not self._is_cloneable(
image_location, image_meta):
return ({}, False)
- prefix, pool, image, snapshot = self._parse_location(image_location)
+ _prefix, pool, image, snapshot = self._parse_location(image_location)
self._clone(volume, pool, image, snapshot)
self._resize(volume)
return {'provider_location': None}, True
type_id = volume.get('volume_type_id', None)
- hp3par_keys, qos, volume_type, vvs_name = self.get_type_info(
+ hp3par_keys, qos, _volume_type, vvs_name = self.get_type_info(
type_id)
name = volume.get('display_name', None)
" to %(new_cpg)s") %
{'volume_name': volume_name,
'old_cpg': old_cpg, 'new_cpg': new_cpg})
- response, body = self.client.modifyVolume(
+ _response, body = self.client.modifyVolume(
volume_name,
{'action': 6,
'tuneOperation': 1,
self.validate_persona(new_persona)
if host is not None:
- (host_type, host_id, host_cpg) = (
+ (host_type, host_id, _host_cpg) = (
host['capabilities']['location_info']).split(':')
if not (host_type == 'HP3PARDriver'):
LOG.info(_("Need to remove FC Zone, building initiator "
"target map"))
- target_wwns, init_targ_map, numPaths = \
+ target_wwns, init_targ_map, _numPaths = \
self._build_initiator_target_map(connector)
info['data'] = {'target_wwn': target_wwns,
init_targ_map[initiator] += fabric['target_port_wwn_list']
init_targ_map[initiator] = list(set(
init_targ_map[initiator]))
- for target in init_targ_map[initiator]:
+ for _target in init_targ_map[initiator]:
numPaths += 1
target_wwns = list(set(target_wwns))
else:
#NOTE(francois-charlier) Since 0.24 'collie cluster info -r'
# gives short output, but for compatibility reason we won't
# use it and just check if 'running' is in the output.
- (out, err) = self._execute('collie', 'cluster', 'info')
+ (out, _err) = self._execute('collie', 'cluster', 'info')
if 'status: running' not in out:
exception_message = (_("Sheepdog is not working: %s") % out)
raise exception.VolumeBackendAPIException(
def create_cloned_volume(self, volume, src_vref):
"""Create a clone of an existing volume."""
- (data, sfaccount, model) = self._do_clone_volume(
+ (_data, _sfaccount, model) = self._do_clone_volume(
src_vref['id'],
src_vref['project_id'],
volume)
restore at which time we'll rework this appropriately.
"""
- (data, sfaccount, model) = self._do_clone_volume(
+ (_data, _sfaccount, _model) = self._do_clone_volume(
snapshot['volume_id'],
snapshot['project_id'],
snapshot)
def create_volume_from_snapshot(self, volume, snapshot):
"""Create a volume from the specified snapshot."""
- (data, sfaccount, model) = self._do_clone_volume(
+ (_data, _sfaccount, model) = self._do_clone_volume(
snapshot['id'],
snapshot['project_id'],
volume)
param_list = {'dcPath': data_center_name, 'dsName': datastore_name}
base_url = base_url + '?' + urllib.urlencode(param_list)
_urlparse = urlparse.urlparse(base_url)
- scheme, netloc, path, params, query, fragment = _urlparse
+ scheme, netloc, path, _params, query, _fragment = _urlparse
if scheme == 'http':
conn = httplib.HTTPConnection(netloc)
elif scheme == 'https':
# Prepare the http connection to the vmdk url
cookies = session.vim.client.options.transport.cookiejar
_urlparse = urlparse.urlparse(url)
- scheme, netloc, path, params, query, fragment = _urlparse
+ scheme, netloc, path, _params, query, _fragment = _urlparse
if scheme == 'http':
conn = httplib.HTTPConnection(netloc)
elif scheme == 'https':
if disk_conversion:
# Clone the temporary backing for disk type conversion.
- (host, rp, folder, summary) = self._select_ds_for_volume(
+ (host, _rp, _folder, summary) = self._select_ds_for_volume(
volume)
datastore = summary.datastore
LOG.debug("Cloning temporary backing: %s for disk type "
"""
try:
# find host in which to create the volume
- (host, rp, folder, summary) = self._select_ds_for_volume(volume)
+ (_host, rp, folder, summary) = self._select_ds_for_volume(volume)
except error_util.VimException as excep:
err_msg = (_("Exception in _select_ds_for_volume: "
"%s."), excep)
{'name': name,
'path': tmp_file_path})
- (host, rp, folder, summary) = self._select_ds_for_volume(volume)
+ (_host, rp, folder, summary) = self._select_ds_for_volume(volume)
LOG.debug("Selected datastore: %(ds)s for backing: %(name)s.",
{'ds': summary.name,
'name': name})
renamed = False
try:
# Find datastore for clone.
- (host, rp, folder, summary) = self._select_ds_for_volume(volume)
+ (_host, _rp, _folder, summary) = self._select_ds_for_volume(volume)
datastore = summary.datastore
disk_type = VMwareEsxVmdkDriver._get_disk_type(volume)
datastore = None
if not clone_type == volumeops.LINKED_CLONE_TYPE:
# Pick a datastore where to create the full clone under any host
- (host, rp, folder, summary) = self._select_ds_for_volume(volume)
+ (_host, _rp, _folder, summary) = self._select_ds_for_volume(volume)
datastore = summary.datastore
clone = self.volumeops.clone_backing(volume['name'], backing,
snapshot, clone_type, datastore)
def _get_vpsa_volume_name(self, name):
"""Return VPSA's name for the volume."""
- (vol_name, size) = self._get_vpsa_volume_name_and_size(name)
+ (vol_name, _size) = self._get_vpsa_volume_name_and_size(name)
return vol_name
def _get_volume_cg_name(self, name):
max-public-methods=100
min-public-methods=0
max-args=6
+
+[Variables]
+
+dummy-variables-rgx=_