try:
meta_ref = _snapshot_metadata_get_item(context, snapshot_id,
meta_key, session)
- except exception.SnapshotMetadataNotFound as e:
+ except exception.SnapshotMetadataNotFound:
meta_ref = models.SnapshotMetadata()
item.update({"key": meta_key, "snapshot_id": snapshot_id})
try:
spec_ref = _volume_type_extra_specs_get_item(
context, volume_type_id, key, session)
- except exception.VolumeTypeExtraSpecsNotFound as e:
+ except exception.VolumeTypeExtraSpecsNotFound:
spec_ref = models.VolumeTypeExtraSpecs()
spec_ref.update({"key": key, "value": value,
"volume_type_id": volume_type_id,
try:
spec_ref = _qos_specs_get_item(
context, qos_specs_id, key, session)
- except exception.QoSSpecsKeyNotFound as e:
+ except exception.QoSSpecsKeyNotFound:
spec_ref = models.QualityOfServiceSpecs()
id = None
if spec_ref.get('id', None):
volume_id = transfer_ref['volume_id']
volume_ref = _volume_get(context, volume_id, session=session)
if volume_ref['status'] != 'awaiting-transfer':
- volume_status = volume_ref['status']
msg = _('Transfer %(transfer_id)s: Volume id %(volume_id)s in '
'unexpected state %(status)s, expected '
'awaiting-transfer') % {'transfer_id': transfer_id,
# Just for the ForeignKey and column creation to succeed, these are not the
# actual definitions of tables .
#
- volumes = Table('volumes',
- meta,
- Column('id', Integer(),
- primary_key=True, nullable=False),
- mysql_engine='InnoDB')
- snapshots = Table('snapshots',
- meta,
- Column('id', Integer(),
- primary_key=True, nullable=False),
- mysql_engine='InnoDB')
+ Table('volumes',
+ meta,
+ Column('id', Integer(), primary_key=True, nullable=False),
+ mysql_engine='InnoDB')
+ Table('snapshots',
+ meta,
+ Column('id', Integer(), primary_key=True, nullable=False),
+ mysql_engine='InnoDB')
# Create new table
volume_glance_metadata = Table(
'volume_glance_metadata',
meta = MetaData()
meta.bind = migrate_engine
- snapshots = Table('snapshots', meta, autoload=True)
+ Table('snapshots', meta, autoload=True)
# New table
snapshot_metadata = Table(
meta = MetaData()
meta.bind = migrate_engine
- volumes = Table('volumes', meta, autoload=True)
+ Table('volumes', meta, autoload=True)
# New table
transfers = Table(
meta = MetaData()
meta.bind = migrate_engine
- _volumes = Table('volumes', meta, autoload=True)
+ Table('volumes', meta, autoload=True)
# New table
volume_admin_metadata = Table(
volume_ref = db.volume_get(context, volume_id)
try:
- tgt_host = self.driver.host_passes_filters(context,
- volume_ref['host'],
- request_spec,
- filter_properties)
+ self.driver.host_passes_filters(context,
+ volume_ref['host'],
+ request_spec,
+ filter_properties)
except exception.NoValidHost as ex:
_manage_existing_set_error(self, context, ex, request_spec)
except Exception as ex:
self.addCleanup(rpc.clear_extra_exmods)
self.addCleanup(rpc.cleanup)
- fs = '%(levelname)s [%(name)s] %(message)s'
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 15
class AdminActionsTest(test.TestCase):
-
def setUp(self):
super(AdminActionsTest, self).setUp()
expected_status = 400
expected_id = None
ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
- expected_status, expected_id)
- volume = self._migrate_volume_comp_exec(ctx, volume2, volume1, False,
- expected_status, expected_id)
+ self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
+ expected_status, expected_id)
+ self._migrate_volume_comp_exec(ctx, volume2, volume1, False,
+ expected_status, expected_id)
def test_migrate_volume_comp_bad_mig_status(self):
admin_ctx = context.get_admin_context()
expected_status = 400
expected_id = None
ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
- expected_status, expected_id)
+ self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
+ expected_status, expected_id)
def test_migrate_volume_comp_no_action(self):
admin_ctx = context.get_admin_context()
def extension1(req):
called.append('pre1')
- resp_obj = yield
+ yield
called.append('post1')
def extension2(req):
called.append('pre2')
- resp_obj = yield
+ yield
called.append('post2')
extensions = [extension1, extension2]
called = []
def extension1(req):
- resp_obj = yield
+ yield
called.append(1)
def extension2(req):
- resp_obj = yield
+ yield
called.append(2)
ext1 = extension1(None)
called = []
def extension1(req):
- resp_obj = yield
+ yield
called.append(1)
def extension2(req):
- resp_obj = yield
+ yield
called.append(2)
yield 'foo'
self.assertTrue(self.controller.is_valid_body(body, 'foo'))
def test_is_valid_body_none(self):
- resource = wsgi.Resource(controller=None)
+ wsgi.Resource(controller=None)
self.assertFalse(self.controller.is_valid_body(None, 'foo'))
def test_is_valid_body_empty(self):
- resource = wsgi.Resource(controller=None)
+ wsgi.Resource(controller=None)
self.assertFalse(self.controller.is_valid_body({}, 'foo'))
def test_is_valid_body_no_entity(self):
- resource = wsgi.Resource(controller=None)
+ wsgi.Resource(controller=None)
body = {'bar': {}}
self.assertFalse(self.controller.is_valid_body(body, 'foo'))
def test_is_valid_body_malformed_entity(self):
- resource = wsgi.Resource(controller=None)
+ wsgi.Resource(controller=None)
body = {'foo': 'bar'}
self.assertFalse(self.controller.is_valid_body(body, 'foo'))
def test_invalid_methods(self):
"""Only POSTs should work."""
- requests = []
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
request = webob.Request.blank("/", method=method)
response = request.get_response(self.app)
vol.update(dict(volume_type=CONF.default_volume_type))
body.update(dict(volume=vol))
res_dict = self.controller.create(req, body)
- volume_id = res_dict['volume']['id']
+ self.assertIn('id', res_dict['volume'])
self.assertEqual(len(res_dict), 1)
self.assertEqual(res_dict['volume']['volume_type'],
db_vol_type['name'])
vol.update(dict(volume_type=db_vol_type['id']))
body.update(dict(volume=vol))
res_dict = self.controller.create(req, body)
- volume_id = res_dict['volume']['id']
+ self.assertIn('id', res_dict['volume'])
self.assertEqual(len(res_dict), 1)
self.assertEqual(res_dict['volume']['volume_type'],
db_vol_type['name'])
self.mox.StubOutWithMock(nova, 'novaclient')
def test_update_server_volume(self):
- volume_id = 'volume_id1'
nova.novaclient(self.ctx).AndReturn(self.novaclient)
self.mox.StubOutWithMock(self.novaclient.volumes,
'update_server_volume')
file_old = None
file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
- data = dict(a=1, b=2, c=3)
jdata = ""
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
backups = db.backup_get_all_by_project(self.ctxt, 'project1')
self.assertEqual(len(backups), 0)
- b1 = self._create_backup_db_entry()
+ self._create_backup_db_entry()
b2 = self._create_backup_db_entry(project_id='project1')
backups = db.backup_get_all_by_project(self.ctxt, 'project1')
self.assertEqual(len(backups), 1)
(backup_driver.__module__,
backup_driver.__class__.__name__,
'verify'))
- with mock.patch(_mock_backup_verify_class) as _mock_record_verify:
+ with mock.patch(_mock_backup_verify_class):
self.backup_mgr.import_record(self.ctxt,
imported_record,
export['backup_service'],
def test_backup_vol_length_0(self):
volume_id = str(uuid.uuid4())
self._create_volume_db_entry(volume_id, 0)
- volume = db.volume_get(self.ctxt, volume_id)
backup_id = str(uuid.uuid4())
self._create_backup_db_entry(backup_id, volume_id, 1)
def _cmd_to_dict(self, arg_list):
"""Convert command for kwargs (assumes a properly formed command)."""
- path = arg_list[-1]
- other = arg_list[-2]
ret = {'cmd': arg_list[0],
'type': arg_list[1],
'path': arg_list[-1]}
def test_service_get(self):
service1 = self._create_service({})
- service2 = self._create_service({'host': 'some_other_fake_host'})
real_service1 = db.service_get(self.ctxt, service1['id'])
self._assertEqualObjects(service1, real_service1)
def test_service_get_by_host_and_topic(self):
service1 = self._create_service({'host': 'host1', 'topic': 'topic1'})
- service2 = self._create_service({'host': 'host2', 'topic': 'topic2'})
real_service1 = db.service_get_by_host_and_topic(self.ctxt,
host='host1',
self._assertEqualsVolumeOrderResult([], filters=filters)
def test_volume_get_iscsi_target_num(self):
- target = db.iscsi_target_create_safe(self.ctxt, {'volume_id': 42,
- 'target_num': 43})
+ db.iscsi_target_create_safe(self.ctxt, {'volume_id': 42,
+ 'target_num': 43})
self.assertEqual(43, db.volume_get_iscsi_target_num(self.ctxt, 42))
def test_volume_get_iscsi_target_num_nonexistent(self):
db.volume_create(self.ctxt, {'id': 1,
'project_id': 'project1',
'size': 42})
- snapshot = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1,
- 'project_id': 'project1',
- 'volume_size': 42})
+ db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1,
+ 'project_id': 'project1',
+ 'volume_size': 42})
actual = db.snapshot_data_get_for_project(self.ctxt, 'project1')
self.assertEqual(actual, (1, 42))
def test_reservation_expire(self):
self.values['expire'] = datetime.datetime.utcnow() + \
datetime.timedelta(days=1)
- reservations = _quota_reserve(self.ctxt, 'project1')
+ _quota_reserve(self.ctxt, 'project1')
db.reservation_expire(self.ctxt)
expected = {'project_id': 'project1',
'nonexistent')
def test_quota_class_get_all_by_name(self):
- sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
- sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
+ db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
+ db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
self.assertEqual({'class_name': 'test_qc', 'test_resource': 42},
db.quota_class_get_all_by_name(self.ctxt, 'test_qc'))
self.assertEqual({'class_name': 'test2', 'res1': 43, 'res2': 44},
self.assertEqual(43, updated['hard_limit'])
def test_quota_class_destroy_all_by_name(self):
- sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
- sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
+ db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
+ db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
db.quota_class_destroy_all_by_name(self.ctxt, 'test2')
self.assertEqual({'class_name': 'test2'},
db.quota_class_get_all_by_name(self.ctxt, 'test2'))
'nonexitent_resource')
def test_quota_usage_get(self):
- reservations = _quota_reserve(self.ctxt, 'p1')
+ _quota_reserve(self.ctxt, 'p1')
quota_usage = db.quota_usage_get(self.ctxt, 'p1', 'gigabytes')
expected = {'resource': 'gigabytes', 'project_id': 'p1',
'in_use': 0, 'reserved': 2, 'total': 2}
self.assertEqual(value, quota_usage[key], key)
def test_quota_usage_get_all_by_project(self):
- reservations = _quota_reserve(self.ctxt, 'p1')
+ _quota_reserve(self.ctxt, 'p1')
expected = {'project_id': 'p1',
'volumes': {'in_use': 0, 'reserved': 1},
'gigabytes': {'in_use': 0, 'reserved': 2}}
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
-from cinder import units
from cinder.volume.drivers.emc.emc_smis_common import EMCSMISCommon
from cinder.volume.drivers.emc.emc_smis_fc import EMCSMISFCDriver
from cinder.volume.drivers.emc.emc_smis_iscsi import EMCSMISISCSIDriver
def test_map_unmap(self):
self.driver.create_volume(self.data.test_volume)
self.data.test_volume['EMCCurrentOwningStorageProcessor'] = 'SP_A'
- connection_info = self.driver.initialize_connection(
- self.data.test_volume,
- self.data.connector)
+ self.driver.initialize_connection(self.data.test_volume,
+ self.data.connector)
self.driver.terminate_connection(self.data.test_volume,
self.data.connector)
self.driver.delete_volume(self.data.test_volume)
vol_instance = self.driver.common._find_lun(self.data.test_volume)
- expected = [
- mock.call._get_ecom_connection(),
- mock.call.find_device_number(self.data.test_volume),
- mock.call._find_lun(self.data.test_volume),
- mock.call.self._find_controller_configuration_service(
- self.data.storage_system),
- mock.call._remove_members(conf_service, vol_instance),
- mock.call.get_target_wwns(
- self.data.storage_system,
- self.data.connector)]
+ mock.call._get_ecom_connection(),
+ mock.call.find_device_number(self.data.test_volume),
+ mock.call._find_lun(self.data.test_volume),
+ mock.call.self._find_controller_configuration_service(
+ self.data.storage_system),
+ mock.call._remove_members(conf_service, vol_instance),
+ mock.call.get_target_wwns(
+ self.data.storage_system,
+ self.data.connector)
output = {
'driver_volume_type': 'fibre_channel',
volume_with_vt['volume_type_id'] = 1
self.driver.create_volume(volume_with_vt)
- configservice = {'CreationClassName':
- 'Clar_StorageConfigurationService',
- 'SystemName': 'CLARiiON+APM00123456789'}
-
- pool = {'InstanceID': 'CLARiiON+APM00123456789+U+gold',
- 'CreationClassName': 'Clar_UnifiedStoragePool'}
-
- volumesize = int(volume_with_vt['size']) * units.GiB
-
- storage_type = {'storagetype:provisioning': 'thick',
- 'storagetype:pool': 'gold'}
-
- expected = [
- mock.call._get_storage_type(volume_with_vt),
- mock.call._find_pool('gold'),
- mock.call.get_provisioning(storage_type),
- mock.call.InvokeMethod('CreateOrModifyElementFromStoragePool',
- configservice, volume_with_vt['name'],
- pool,
- self.driver.common._getnum(2, '16'),
- self.driver.common._getnum(volumesize,
- '64'))]
-
def _cleanup(self):
bExists = os.path.exists(self.config_file_path)
if bExists:
mox.StubOutWithMock(drv, '_execute')
vol_filename = 'volume-%s' % self.VOLUME_UUID
- snap_filename = '%s.%s' % (vol_filename, self.SNAP_UUID)
hashed = drv._get_hash_str(self.TEST_EXPORT1)
vol_path = '%s/%s/%s' % (self.TEST_MNT_POINT_BASE,
drv._create_qcow2_snap_file(snap_ref, vol_filename, snap_path)
- qemu_img_info_output = ("""image: volume-%s
- file format: raw
- virtual size: 1.0G (1073741824 bytes)
- disk size: 152K
- """ % self.VOLUME_UUID, '')
-
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(info_dict)
self.VOLUME_UUID)
volume_filename = 'volume-%s' % self.VOLUME_UUID
- snap_path = '%s.%s' % (volume_path, self.SNAP_UUID)
snap_path_2 = '%s.%s' % (volume_path, self.SNAP_UUID_2)
snap_file = '%s.%s' % (volume_filename, self.SNAP_UUID)
snap_file_2 = '%s.%s' % (volume_filename, self.SNAP_UUID_2)
'volume': self._simple_volume(),
'id': self.SNAP_UUID_2}
- snap_path_2_chain = [{self.SNAP_UUID_2: snap_file_2},
- {self.SNAP_UUID: snap_file},
- {'active': snap_file_2}]
-
- snap_path_chain = [{self.SNAP_UUID: snap_file},
- {'active': snap_file}]
-
drv._read_info_file(info_path, empty_if_missing=True).\
AndReturn(info_file_dict)
hashed,
volume_file)
- info_path = '%s%s' % (volume_path, '.info')
snap_path = '%s.%s' % (volume_path, self.SNAP_UUID)
snap_file = 'volume-%s.%s' % (self.VOLUME_UUID, self.SNAP_UUID)
snap_path_2 = '%s.%s' % (volume_path, self.SNAP_UUID_2)
snap_file_2 = 'volume-%s.%s' % (self.VOLUME_UUID, self.SNAP_UUID_2)
- qemu_img_info_output_snap_2 = """image: volume-%s.%s
- file format: qcow2
- virtual size: 1.0G (1073741824 bytes)
- disk size: 173K
- backing file: %s
- """ % (self.VOLUME_UUID, self.SNAP_UUID_2,
- 'volume-%s.%s' % (self.VOLUME_UUID, self.SNAP_UUID_2))
-
qemu_img_info_output_snap_1 = """image: volume-%s.%s
file format: qcow2
virtual size: 1.0G (1073741824 bytes)
""" % (self.VOLUME_UUID, self.SNAP_UUID,
'volume-%s.%s' % (self.VOLUME_UUID, self.SNAP_UUID))
- qemu_img_info_output = """image: volume-%s
- file format: qcow2
- virtual size: 1.0G (1073741824 bytes)
- disk size: 175K
- """ % self.VOLUME_UUID
-
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_read_info_file')
mox.StubOutWithMock(drv, '_write_info_file')
info_path = '%s.info' % volume_path
snap_path = '%s.%s' % (volume_path, self.SNAP_UUID)
- snap_path_2 = '%s.%s' % (volume_path, self.SNAP_UUID_2)
snap_file = '%s.%s' % (volume_file, self.SNAP_UUID)
snap_file_2 = '%s.%s' % (volume_file, self.SNAP_UUID_2)
@patch('cinder.utils.execute')
def test_get_fs_from_path_ok(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('Filesystem 1K-blocks '
'Used Available Use%% Mounted on\n'
'%s 10485760 531968 9953792'
@patch('cinder.utils.execute')
def test_get_fs_from_path_fail_path(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('Filesystem 1K-blocks '
'Used Available Use% Mounted on\n'
'test 10485760 531968 '
@patch('cinder.utils.execute')
def test_get_fs_from_path_fail_raise(self, mock_exec):
- ctxt = self.context
mock_exec.side_effect = processutils.ProcessExecutionError(
stdout='test', stderr='test')
self.assertRaises(exception.VolumeBackendAPIException,
@patch('cinder.utils.execute')
def test_get_gpfs_cluster_id_ok(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('mmlsconfig::HEADER:version:reserved:'
'reserved:configParameter:value:nodeList:\n'
'mmlsconfig::0:1:::clusterId:%s::'
@patch('cinder.utils.execute')
def test_get_gpfs_cluster_id_fail_id(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('mmlsconfig::HEADER.:version:reserved:'
'reserved:configParameter:value:nodeList:\n'
'mmlsconfig::0:1:::clusterId:test::', '')
@patch('cinder.utils.execute')
def test_get_gpfs_cluster_id_fail_raise(self, mock_exec):
- ctxt = self.context
mock_exec.side_effect = processutils.ProcessExecutionError(
stdout='test', stderr='test')
self.assertRaises(exception.VolumeBackendAPIException,
@patch('cinder.utils.execute')
def test_verify_gpfs_pool_ok(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('Storage pools in file system at \'/gpfs0\':'
'\n'
'Name Id BlkSize Data '
@patch('cinder.utils.execute')
def test_verify_gpfs_pool_fail_pool(self, mock_exec):
- ctxt = self.context
mock_exec.return_value = ('Storage pools in file system at \'/gpfs0\':'
'\n'
'Name Id BlkSize Data '
@patch('cinder.utils.execute')
def test_verify_gpfs_pool_fail_raise(self, mock_exec):
- ctxt = self.context
mock_exec.side_effect = processutils.ProcessExecutionError(
stdout='test', stderr='test')
self.assertFalse(self.driver._verify_gpfs_pool('/dev/gpfs'))
@patch('cinder.volume.drivers.ibm.gpfs.GPFSDriver._is_gpfs_path')
def test_is_cloneable_ok(self, mock_is_gpfs_path):
- org_value_share_mode = self.driver.configuration.gpfs_images_share_mode
self.flags(volume_driver=self.driver_name,
gpfs_images_share_mode='test')
- org_value_dir = CONF.gpfs_images_dir
CONF.gpfs_images_dir = self.images_dir
mock_is_gpfs_path.return_value = None
self.assertEqual((True, None, os.path.join(CONF.gpfs_images_dir,
@patch('cinder.volume.drivers.ibm.gpfs.GPFSDriver._is_gpfs_path')
def test_is_cloneable_fail_config(self, mock_is_gpfs_path):
- org_value_share_mode = self.driver.configuration.gpfs_images_share_mode
self.flags(volume_driver=self.driver_name, gpfs_images_share_mode='')
- org_value_dir = CONF.gpfs_images_dir
CONF.gpfs_images_dir = ''
mock_is_gpfs_path.return_value = None
self.assertNotEqual((True, None, os.path.join(CONF.gpfs_images_dir,
@patch('cinder.volume.drivers.ibm.gpfs.GPFSDriver._is_gpfs_path')
def test_is_cloneable_fail_path(self, mock_is_gpfs_path):
- org_value_share_mode = self.driver.configuration.gpfs_images_share_mode
self.flags(volume_driver=self.driver_name,
gpfs_images_share_mode='test')
- org_value_dir = CONF.gpfs_images_dir
CONF.gpfs_images_dir = self.images_dir
mock_is_gpfs_path.side_effect = (
processutils.ProcessExecutionError(stdout='test', stderr='test'))
old_type_ref = volume_types.create(ctxt, 'old', key_specs_old)
new_type_ref = volume_types.create(ctxt, 'new', key_specs_new)
- old_type = volume_types.get_volume_type(ctxt, old_type_ref['id'])
+ volume_types.get_volume_type(ctxt, old_type_ref['id'])
new_type = volume_types.get_volume_type(ctxt, new_type_ref['id'])
diff, equal = volume_types.volume_types_diff(ctxt,
'serialNumber': 'different'},
}
- mock_client = self.setup_driver(mock_conf=conf)
+ self.setup_driver(mock_conf=conf)
volume = {'name': HP3PARBaseDriver.VOLUME_NAME,
'id': HP3PARBaseDriver.CLONE_ID,
lambda x: {'OpenStackCPG': {'domain': 'OpenStack'}}.get(x, {})
}
- mock_client = self.setup_driver(mock_conf=conf)
+ self.setup_driver(mock_conf=conf)
volume = {'name': HP3PARBaseDriver.VOLUME_NAME,
'id': HP3PARBaseDriver.CLONE_ID,
'getVolume.return_value': {}
}
- mock_client = self.setup_driver(mock_conf=conf)
+ self.setup_driver(mock_conf=conf)
volume = self.volume.copy()
volume['size'] = self.volume['size'] + 10
'growVolume.side_effect': extend_ex
}
- mock_client = self.setup_driver(mock_conf=conf)
+ self.setup_driver(mock_conf=conf)
grow_size = 3
old_size = self.volume['size']
new_size = old_size + grow_size
configuration = create_configuration()
configuration.netapp_storage_family = 'xyz_abc'
try:
- driver = common.NetAppDriver(configuration=configuration)
+ common.NetAppDriver(configuration=configuration)
raise AssertionError('Wrong storage family is getting accepted.')
except exception.InvalidInput:
pass
configuration.netapp_storage_family = 'ontap'
configuration.netapp_storage_protocol = 'ontap'
try:
- driver = common.NetAppDriver(configuration=configuration)
+ common.NetAppDriver(configuration=configuration)
raise AssertionError('Wrong storage protocol is getting accepted.')
except exception.InvalidInput:
pass
configuration.netapp_storage_family = 'test_family'
configuration.netapp_storage_protocol = 'iscsi'
try:
- driver = common.NetAppDriver(configuration=configuration)
+ common.NetAppDriver(configuration=configuration)
raise AssertionError('Non NetApp driver is getting instantiated.')
except exception.InvalidInput:
pass
extra_specs = {}
mock_volume_extra_specs.return_value = extra_specs
fake_share = 'localhost:myshare'
- fake_qos_policy = 'qos_policy_1'
with mock.patch.object(drv, '_ensure_shares_mounted'):
with mock.patch.object(drv, '_find_shares',
return_value=['localhost:myshare']):
'key2': 'value2',
'key3': 'value3',
'consumer': 'back-end'}
- id = self._create_qos_specs(one_time_value, input)
+ self._create_qos_specs(one_time_value, input)
specs = qos_specs.get_qos_specs_by_name(self.ctxt,
one_time_value)
self.assertEqual(specs['specs']['key1'], one_time_value)
with mock.patch.object(self.volume.driver, 'clone_image') as \
mock_clone_image:
mock_clone_image.side_effect = exception.CinderException
- with mock.patch.object(self.volume.driver, 'create_volume') as \
- mock_create:
+ with mock.patch.object(self.volume.driver, 'create_volume'):
with mock.patch.object(create_volume.CreateVolumeFromSpecTask,
'_copy_image_to_volume') as mock_copy:
self._create_volume_from_image('error', raw=True,
self._driver.volumeops = self._volumeops
backing = FakeMor('VirtualMachine', 'my_vm')
- task = FakeMor('Task', 'my_task')
+ FakeMor('Task', 'my_task')
m.StubOutWithMock(self._volumeops, 'get_backing')
m.StubOutWithMock(self._volumeops, 'delete_backing')
m.StubOutWithMock(self._driver, '_get_folder_ds_summary')
folder = FakeMor('Folder', 'my_fol')
summary = FakeDatastoreSummary(1, 1, datastore1)
- size = 1
self._driver._get_folder_ds_summary(volume, resource_pool,
[datastore1]).AndReturn((folder,
summary))
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
- admin_ctxt = context.get_admin_context()
orig_elevated = self.context.elevated
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
- admin_ctxt = context.get_admin_context()
orig_elevated = self.context.elevated
def test_snapshot_get_active_by_window(self):
# Find all all snapshots valid within a timeframe window.
- vol = db.volume_create(self.context, {'id': 1})
+ db.volume_create(self.context, {'id': 1})
for i in range(5):
self.db_attrs[i]['volume_id'] = 1
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
db.volume_create(ctxt, {'id': 2})
- vol_metadata = db.volume_glance_metadata_create(ctxt, 1, 'key1',
- 'value1')
- vol_metadata = db.volume_glance_metadata_create(ctxt, 2, 'key1',
- 'value1')
- vol_metadata = db.volume_glance_metadata_create(ctxt, 2,
- 'key2',
- 'value2')
- vol_metadata = db.volume_glance_metadata_create(ctxt, 2,
- 'key3',
- 123)
+ db.volume_glance_metadata_create(ctxt, 1, 'key1', 'value1')
+ db.volume_glance_metadata_create(ctxt, 2, 'key1', 'value1')
+ db.volume_glance_metadata_create(ctxt, 2, 'key2', 'value2')
+ db.volume_glance_metadata_create(ctxt, 2, 'key3', 123)
expected_metadata_1 = {'volume_id': '1',
'key': 'key1',
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
db.volume_glance_metadata_delete_by_volume(ctxt, 1)
- vol_metadata = db.volume_glance_metadata_create(ctxt, 1, 'key1',
- 'value1')
+ db.volume_glance_metadata_create(ctxt, 1, 'key1', 'value1')
db.volume_glance_metadata_delete_by_volume(ctxt, 1)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_glance_metadata_get, ctxt, 1)
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
db.snapshot_create(ctxt, {'id': 100, 'volume_id': 1})
- vol_meta = db.volume_glance_metadata_create(ctxt, 1, 'key1',
- 'value1')
+ db.volume_glance_metadata_create(ctxt, 1, 'key1', 'value1')
db.volume_glance_metadata_copy_to_snapshot(ctxt, 100, 1)
expected_meta = {'snapshot_id': '100',
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
db.volume_create(ctxt, {'id': 100, 'source_volid': 1})
- vol_meta = db.volume_glance_metadata_create(ctxt, 1, 'key1',
- 'value1')
+ db.volume_glance_metadata_create(ctxt, 1, 'key1', 'value1')
db.volume_glance_metadata_copy_from_volume_to_volume(ctxt, 1, 100)
expected_meta = {'key': 'key1',
self.fake_args = None
self.fake_kwargs = None
- real_prepare = rpcapi.client.prepare
-
def _fake_prepare_method(*args, **kwds):
for kwd in kwds:
self.assertEqual(kwds[kwd], target[kwd])
def test_get_default_volume_type(self):
"""Ensures default volume type can be retrieved."""
- type_ref = volume_types.create(self.ctxt,
- conf_fixture.def_vol_type,
- {})
+ volume_types.create(self.ctxt, conf_fixture.def_vol_type, {})
default_vol_type = volume_types.get_default_volume_type()
self.assertEqual(default_vol_type.get('name'),
conf_fixture.def_vol_type)
"""Ensures proper exception raised if default volume type
is not in database.
"""
- session = db_api.get_session()
default_vol_type = volume_types.get_default_volume_type()
self.assertEqual(default_vol_type, {})
class ExceptionWithNoneCode(Exception):
code = None
- msg = 'Internal Server Error'
-
@webob.dec.wsgify
def fail(req):
raise ExceptionWithNoneCode()
ops.delete_volume('server', 'path', 'sr_uuid', 'vdi_uuid')
mock.ReplayAll()
- result = drv.delete_volume(dict(
- provider_location='sr_uuid/vdi_uuid'))
+ drv.delete_volume(dict(provider_location='sr_uuid/vdi_uuid'))
mock.VerifyAll()
def test_create_export_does_not_raise_exception(self):
connector3 = dict(initiator='test_iqn.3')
self.driver.create_volume(volume)
- props1 = self.driver.initialize_connection(volume, connector1)
- props2 = self.driver.initialize_connection(volume, connector2)
- props3 = self.driver.initialize_connection(volume, connector3)
+ self.driver.initialize_connection(volume, connector1)
+ self.driver.initialize_connection(volume, connector2)
+ self.driver.initialize_connection(volume, connector3)
self.driver.terminate_connection(volume, connector1)
self.driver.terminate_connection(volume, connector3)
def test_wrong_attach_params(self):
"""Test different wrong attach scenarios."""
volume1 = {'name': 'test_volume_01', 'size': 1, 'id': 101}
- volume2 = {'name': 'test_volume_02', 'size': 1, 'id': 102}
- volume3 = {'name': 'test_volume_03', 'size': 1, 'id': 103}
connector1 = dict(initiator='test_iqn.1')
- connector2 = dict(initiator='test_iqn.2')
- connector3 = dict(initiator='test_iqn.3')
self.assertRaises(exception.VolumeNotFound,
self.driver.initialize_connection,
self.driver.create_volume(volume1)
self.driver.create_volume(volume2)
- props1 = self.driver.initialize_connection(volume1, connector1)
- props2 = self.driver.initialize_connection(volume2, connector2)
+ self.driver.initialize_connection(volume1, connector1)
+ self.driver.initialize_connection(volume2, connector2)
self.assertRaises(exception.ZadaraServerNotFound,
self.driver.terminate_connection,
connector3 = dict(initiator='test_iqn.3')
self.driver.create_volume(volume1)
- props1 = self.driver.initialize_connection(volume1, connector1)
- props2 = self.driver.initialize_connection(volume1, connector2)
- props3 = self.driver.initialize_connection(volume1, connector3)
+ self.driver.initialize_connection(volume1, connector1)
+ self.driver.initialize_connection(volume1, connector2)
+ self.driver.initialize_connection(volume1, connector3)
self.flags(zadara_vpsa_auto_detach_on_delete=False)
self.assertRaises(exception.VolumeAttached,
def test_get_device_mapping_from_network(self, get_nameserver_info_mock):
initiator_list = ['10008c7cff523b01']
target_list = ['20240002ac000a50', '20240002ac000a40']
- with mock.patch.object(self.client, 'connect') as client_connect_mock:
+ with mock.patch.object(self.client, 'connect'):
get_nameserver_info_mock.return_value = (nsshow_data)
device_map = self.get_device_mapping_from_network(
initiator_list, target_list)
"""
try:
return minidom.parseString(xml_string, parser=ProtectedExpatParser())
- except sax.SAXParseException as se:
+ except sax.SAXParseException:
raise expat.ExpatError()
"""Creates a EMC(VMAX/VNX) volume."""
volpath = self.common.create_volume(volume)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
"""Creates a volume from a snapshot."""
volpath = self.common.create_volume_from_snapshot(volume, snapshot)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
"""Creates a cloned volume."""
volpath = self.common.create_cloned_volume(volume, src_vref)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
"""Creates a EMC(VMAX/VNX) volume."""
volpath = self.common.create_volume(volume)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
"""Creates a volume from a snapshot."""
volpath = self.common.create_volume_from_snapshot(volume, snapshot)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
"""Creates a cloned volume."""
volpath = self.common.create_cloned_volume(volume, src_vref)
- ctxt = context.get_admin_context()
model_update = {}
volume['provider_location'] = str(volpath)
model_update['provider_location'] = volume['provider_location']
if check:
raise exception.ParameterNotFound(param=element)
return None
- except ETree.ParseError as e:
+ except ETree.ParseError:
if check:
with excutils.save_and_reraise_exception():
LOG.error(_("XML exception reading parameter: %s") % element)
def extend_volume(self, volume, new_size):
"""Extend an existing volume."""
(arid, lun) = _loc_info(volume['provider_location'])['id_lu']
- out = self.bend.extend_vol(self.config['hus_cmd'],
- HDS_VERSION,
- self.config['mgmt_ip0'],
- self.config['mgmt_ip1'],
- self.config['username'],
- self.config['password'],
- arid, lun,
- '%s' % (new_size * 1024))
+ self.bend.extend_vol(self.config['hus_cmd'],
+ HDS_VERSION,
+ self.config['mgmt_ip0'],
+ self.config['mgmt_ip1'],
+ self.config['username'],
+ self.config['password'],
+ arid, lun,
+ '%s' % (new_size * 1024))
LOG.debug(_("LUN %(lun)s extended to %(size)s GB.")
% {'lun': lun,
'size': new_size})
(arid, lun) = info['id_lu']
if 'tgt' in info.keys(): # connected?
(_portal, iqn, loc, ctl, port) = info['tgt']
- _out = self.bend.del_iscsi_conn(self.config['hus_cmd'],
- HDS_VERSION,
- self.config['mgmt_ip0'],
- self.config['mgmt_ip1'],
- self.config['username'],
- self.config['password'],
- arid, lun, ctl, port, iqn,
- '')
+ self.bend.del_iscsi_conn(self.config['hus_cmd'],
+ HDS_VERSION,
+ self.config['mgmt_ip0'],
+ self.config['mgmt_ip1'],
+ self.config['username'],
+ self.config['password'],
+ arid, lun, ctl, port, iqn,
+ '')
name = self.hus_name
LOG.debug(_("delete lun %(lun)s on %(name)s")
% {'lun': lun,
'name': name})
- _out = self.bend.delete_lu(self.config['hus_cmd'],
- HDS_VERSION,
- self.config['mgmt_ip0'],
- self.config['mgmt_ip1'],
- self.config['username'],
- self.config['password'],
- arid, lun)
+ self.bend.delete_lu(self.config['hus_cmd'],
+ HDS_VERSION,
+ self.config['mgmt_ip0'],
+ self.config['mgmt_ip1'],
+ self.config['username'],
+ self.config['password'],
+ arid, lun)
def remove_export(self, context, volume):
"""Disconnect a volume from an attached instance."""
(arid, lun) = info['id_lu']
(_portal, iqn, loc, ctl, port) = info['tgt']
- _out = self.bend.del_iscsi_conn(self.config['hus_cmd'],
- HDS_VERSION,
- self.config['mgmt_ip0'],
- self.config['mgmt_ip1'],
- self.config['username'],
- self.config['password'],
- arid, lun, ctl, port, iqn,
- connector['initiator'])
+ self.bend.del_iscsi_conn(self.config['hus_cmd'],
+ HDS_VERSION,
+ self.config['mgmt_ip0'],
+ self.config['mgmt_ip1'],
+ self.config['username'],
+ self.config['password'],
+ arid, lun, ctl, port, iqn,
+ connector['initiator'])
self._update_vol_location(volume['id'], loc)
return {'provider_location': loc}
if loc is None: # to take care of spurious input
return # which could cause exception.
(arid, lun) = loc.split('.')
- _out = self.bend.delete_lu(self.config['hus_cmd'],
- HDS_VERSION,
- self.config['mgmt_ip0'],
- self.config['mgmt_ip1'],
- self.config['username'],
- self.config['password'],
- arid, lun)
+ self.bend.delete_lu(self.config['hus_cmd'],
+ HDS_VERSION,
+ self.config['mgmt_ip0'],
+ self.config['mgmt_ip1'],
+ self.config['username'],
+ self.config['password'],
+ arid, lun)
LOG.debug(_("LUN %s is deleted.") % lun)
return
:return: True of False
"""
items_list = get_xml_item(xml_root, item)
- value = []
if attrib_key:
for tmp_dict in items_list:
if tmp_dict['attrib'].get(attrib_key, None):
"""
- maxpool_id = None
- maxpool_size = 0.0
nameindex, sizeindex = ((1, 4) if luntype == 'Thin' else (5, 3))
pools_dev = sorted(pools_dev, key=lambda x: float(x[sizeindex]))
while len(pools_dev) > 0:
Error checking done by manage_existing_get_size is not repeated.
"""
lv_name = existing_ref['lv_name']
- lv = self.vg.get_volume(lv_name)
+ self.vg.get_volume(lv_name)
# Attempt to rename the LV to match the OpenStack internal name.
try:
if vol_size != snap_size:
try:
self.extend_volume(volume, vol_size)
- except Exception as e:
+ except Exception:
with excutils.save_and_reraise_exception():
LOG.error(
_("Resizing %s failed. Cleaning volume."),
try:
self._invoke_successfully(clone_clear, None)
break
- except Exception as e:
+ except Exception:
# Filer might be rebooting
time.sleep(5)
retry = retry - 1
def validate_cpg(self, cpg_name):
try:
- cpg = self.client.getCPG(cpg_name)
- except hpexceptions.HTTPNotFound as ex:
+ self.client.getCPG(cpg_name)
+ except hpexceptions.HTTPNotFound:
err = (_("CPG (%s) doesn't exist on array") % cpg_name)
LOG.error(err)
raise exception.InvalidInput(reason=err)
domain = self.common.get_domain(cpg)
try:
host = self.common._get_3par_host(hostname)
- except hpexceptions.HTTPNotFound as ex:
+ except hpexceptions.HTTPNotFound:
# get persona from the volume type extra specs
persona_id = self.common.get_persona_type(volume)
# host doesn't exist, we have to create it
cliq_args['volumeName'] = volume['name']
cliq_args['prompt'] = 'false' # Don't confirm
try:
- volume_info = self._cliq_get_volume_info(volume['name'])
+ self._cliq_get_volume_info(volume['name'])
except processutils.ProcessExecutionError:
LOG.error(_("Volume did not exist. It will not be deleted"))
return
cliq_args['snapshotName'] = snapshot['name']
cliq_args['prompt'] = 'false' # Don't confirm
try:
- volume_info = self._cliq_get_snapshot_info(snapshot['name'])
+ self._cliq_get_snapshot_info(snapshot['name'])
except processutils.ProcessExecutionError:
LOG.error(_("Snapshot did not exist. It will not be deleted"))
return
{'host': self.config.san_ip, 'err': ex})
LOG.error(msg)
raise exception.HPMSAConnectionError(reason=msg)
- except msa.HPMSAAuthenticationError as e:
+ except msa.HPMSAAuthenticationError:
msg = _("Failed to log on MSA Array (invalid login?)")
LOG.error(msg)
raise exception.HPMSAConnectionError(reason=msg)
max_size=max_size)
last_exception = None
try:
- total_attempts = attempts
with self.sshpool.item() as ssh:
while attempts > 0:
attempts -= 1
glance_host, glance_port, glance_use_ssl = glance_server
try:
- result = self.glance_plugin.upload_vhd(
+ self.glance_plugin.upload_vhd(
vdi_uuids, image_id, glance_host, glance_port, glance_use_ssl,
os.path.join(sr_base_path, sr_uuid), auth_token, dict())
finally:
snapshot, volume['display_name'], volume['name_description'])
def create_snapshot(self, snapshot):
- volume_id = snapshot['volume_id']
volume = snapshot['volume']
return self._copy_volume(
volume, snapshot['display_name'], snapshot['display_description'])
volume = self.db.volume_get(context, volume_id)
try:
utils.require_driver_initialized(self.driver)
- except exception.DriverNotInitialized as ex:
+ except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
LOG.exception(_("Error detaching volume %(volume)s, "
"due to uninitialized driver."),
commands = {posargs}
[flake8]
-ignore = E711,E712,F403,F841,H302,H303,H304,H803
+ignore = E711,E712,F403,H302,H303,H304,H803
builtins = _
exclude = .git,.venv,.tox,dist,tools,doc,common,*egg,build