volume_id = volume['id']
self.volume.init_host()
volume = db.volume_get(context.get_admin_context(), volume_id)
- self.assertEqual(volume['status'], "error")
+ self.assertEqual("error", volume['status'])
self.volume.delete_volume(self.context, volume_id)
def test_init_host_resumes_deletes(self):
host=volutils.append_host(CONF.host, 'pool2'))
self.volume.init_host()
stats = self.volume.stats
- self.assertEqual(stats['allocated_capacity_gb'], 2020)
+ self.assertEqual(2020, stats['allocated_capacity_gb'])
self.assertEqual(
- stats['pools']['pool0']['allocated_capacity_gb'], 384)
+ 384, stats['pools']['pool0']['allocated_capacity_gb'])
self.assertEqual(
- stats['pools']['pool1']['allocated_capacity_gb'], 512)
+ 512, stats['pools']['pool1']['allocated_capacity_gb'])
self.assertEqual(
- stats['pools']['pool2']['allocated_capacity_gb'], 1024)
+ 1024, stats['pools']['pool2']['allocated_capacity_gb'])
# NOTE(jdg): On the create we have host='xyz', BUT
# here we do a db.volume_get, and now the host has
# to be volume_backend_name or None
vol0 = db.volume_get(context.get_admin_context(), vol0['id'])
- self.assertEqual(vol0['host'],
- volutils.append_host(CONF.host, 'LVM'))
+ self.assertEqual(volutils.append_host(CONF.host, 'LVM'),
+ vol0['host'])
self.volume.delete_volume(self.context, vol0['id'])
self.volume.delete_volume(self.context, vol1['id'])
self.volume.delete_volume(self.context, vol2['id'])
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume,
self.context, volume_id)
volume = db.volume_get(context.get_admin_context(), volume_id)
- self.assertEqual(volume.status, "error")
+ self.assertEqual("error", volume.status)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_driver_not_initialized_rescheduling(self):
{'volume_properties': self.volume_params})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec,
# assert that it wasn't counted in allocated_capacity tracking.
- self.assertEqual(self.volume.stats['pools'], {})
+ self.assertEqual({}, self.volume.stats['pools'])
db.volume_destroy(context.get_admin_context(), volume_id)
{'volume_properties': params})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec,
# assert that it wasn't counted in allocated_capacity tracking.
- self.assertEqual(self.volume.stats['pools'], {})
+ self.assertEqual({}, self.volume.stats['pools'])
db.volume_destroy(context.get_admin_context(), volume_id)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.assertRaises(exception.DriverNotInitialized,
self.volume.delete_volume,
self.context, volume_id)
volume = db.volume_get(context.get_admin_context(), volume_id)
- self.assertEqual(volume.status, "error_deleting")
+ self.assertEqual("error_deleting", volume.status)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_delete_volume(self):
**self.volume_params)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.volume.create_volume(self.context, volume_id)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[0]
- self.assertEqual(msg['event_type'], 'volume.create.start')
+ self.assertEqual('volume.create.start', msg['event_type'])
expected = {
'status': 'creating',
'host': socket.gethostname(),
'replication_extended_status': None,
'replication_driver_data': None,
}
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[1]
- self.assertEqual(msg['event_type'], 'volume.create.end')
+ self.assertEqual('volume.create.end', msg['event_type'])
expected['status'] = 'available'
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
self.volume.delete_volume(self.context, volume_id)
vol = db.volume_get(context.get_admin_context(read_deleted='yes'),
volume_id)
- self.assertEqual(vol['status'], 'deleted')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 4)
+ self.assertEqual('deleted', vol['status'])
+ self.assertEqual(4, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[2]
- self.assertEqual(msg['event_type'], 'volume.delete.start')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('volume.delete.start', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[3]
- self.assertEqual(msg['event_type'], 'volume.delete.end')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('volume.delete.end', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
self.volume.create_volume(self.context, volume_id)
result_meta = {
volume.volume_metadata[0].key: volume.volume_metadata[0].value}
- self.assertEqual(result_meta, test_meta)
+ self.assertEqual(test_meta, result_meta)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
1,
'name',
'description')
- self.assertEqual(volume['availability_zone'], 'az2')
+ self.assertEqual('az2', volume['availability_zone'])
self.override_config('default_availability_zone', 'default-az')
volume = volume_api.create(self.context,
1,
'name',
'description')
- self.assertEqual(volume['availability_zone'], 'default-az')
+ self.assertEqual('default-az', volume['availability_zone'])
def test_create_volume_with_volume_type(self):
"""Test volume creation with default volume type."""
1,
'name',
'description')
- self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertIsNone(volume['encryption_key_id'])
# Create volume with specific volume type
'name',
'description',
volume_type=db_vol_type)
- self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
def test_create_volume_with_encrypted_volume_type(self):
self.stubs.Set(keymgr, "API", fake_keymgr.fake_api)
'name',
'description',
volume_type=db_vol_type)
- self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_volume_with_provider_id(self):
volume_type=db_vol_type)
self.assertIsNotNone(volume.get('encryption_key_id', None))
- self.assertEqual(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertIsNotNone(volume['encryption_key_id'])
volume['host'] = 'fake_host'
manager.publish_service_capabilities(self.context)
self.assertTrue(mock_loads.called)
volume_stats = manager.last_capabilities
- self.assertEqual(volume_stats['key1'],
- fake_capabilities['key1'])
- self.assertEqual(volume_stats['key2'],
- fake_capabilities['key2'])
+ self.assertEqual(fake_capabilities['key1'],
+ volume_stats['key1'])
+ self.assertEqual(fake_capabilities['key2'],
+ volume_stats['key2'])
def test_extra_capabilities_fail(self):
with mock.patch.object(jsonutils, 'loads') as mock_loads:
# NOTE(flaper87): The volume status should be error.
snapshot = db.snapshot_get(context.get_admin_context(), snapshot_id)
- self.assertEqual(snapshot.status, "error")
+ self.assertEqual("error", snapshot.status)
# NOTE(flaper87): Set initialized to True,
# lets cleanup the mess
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
- self.assertEqual(len(self.called), 1)
+ self.assertEqual(1, len(self.called))
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
snapshot_id=snap_id)
- self.assertEqual(len(self.called), 2)
+ self.assertEqual(2, len(self.called))
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)
# locked
self.volume.delete_volume(self.context, dst_vol_id)
- self.assertEqual(len(self.called), 4)
+ self.assertEqual(4, len(self.called))
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
- self.assertEqual(len(self.called), 6)
+ self.assertEqual(6, len(self.called))
# locked
self.volume.delete_volume(self.context, src_vol_id)
- self.assertEqual(len(self.called), 8)
+ self.assertEqual(8, len(self.called))
- self.assertEqual(self.called,
- ['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
+ self.assertEqual(['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
- 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+ 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))],
+ self.called)
def test_create_volume_from_volume_check_locks(self):
# mock the synchroniser so we can record events
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
- self.assertEqual(len(self.called), 1)
+ self.assertEqual(1, len(self.called))
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
source_volid=src_vol_id)
- self.assertEqual(len(self.called), 2)
+ self.assertEqual(2, len(self.called))
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)
# locked
self.volume.delete_volume(self.context, dst_vol_id)
- self.assertEqual(len(self.called), 4)
+ self.assertEqual(4, len(self.called))
# locked
self.volume.delete_volume(self.context, src_vol_id)
- self.assertEqual(len(self.called), 6)
+ self.assertEqual(6, len(self.called))
- self.assertEqual(self.called,
- ['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
+ self.assertEqual(['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
- 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
+ 'unlock-%s' % ('%s-delete_volume' % (src_vol_id))],
+ self.called)
def test_create_volume_from_volume_delete_lock_taken(self):
# create source volume
name='fake_name',
description='fake_desc',
snapshot=snapshot)
- self.assertEqual(volume_dst['availability_zone'], 'az2')
+ self.assertEqual('az2', volume_dst['availability_zone'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
- self.assertEqual(vol['status'], "in-use")
+ self.assertEqual("in-use", vol['status'])
self.assertEqual('attached', attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
- self.assertEqual(len(admin_metadata), 2)
+ self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.volume.create_volume(self.context, volume['id'])
msg = fake_notifier.NOTIFICATIONS[0]
- self.assertEqual(msg['event_type'], 'volume.create.start')
- self.assertEqual(msg['payload']['status'], 'creating')
- self.assertEqual(msg['priority'], 'INFO')
+ self.assertEqual('volume.create.start', msg['event_type'])
+ self.assertEqual('creating', msg['payload']['status'])
+ self.assertEqual('INFO', msg['priority'])
msg = fake_notifier.NOTIFICATIONS[1]
- self.assertEqual(msg['event_type'], 'volume.create.end')
- self.assertEqual(msg['payload']['status'], 'available')
- self.assertEqual(msg['priority'], 'INFO')
+ self.assertEqual('volume.create.end', msg['event_type'])
+ self.assertEqual('available', msg['payload']['status'])
+ self.assertEqual('INFO', msg['priority'])
if len(fake_notifier.NOTIFICATIONS) > 2:
# Cause an assert to print the unexpected item
self.assertFalse(fake_notifier.NOTIFICATIONS[2])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
snapshot_id = self._create_snapshot(volume['id'],
size=volume['size'])['id']
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
msg = fake_notifier.NOTIFICATIONS[2]
- self.assertEqual(msg['event_type'], 'snapshot.create.start')
+ self.assertEqual('snapshot.create.start', msg['event_type'])
expected = {
'created_at': 'DONTCARE',
'deleted': '',
'volume_size': 1,
'availability_zone': 'nova'
}
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[3]
- self.assertEqual(msg['event_type'], 'snapshot.create.end')
+ self.assertEqual('snapshot.create.end', msg['event_type'])
expected['status'] = 'available'
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
if len(fake_notifier.NOTIFICATIONS) > 4:
# Cause an assert to print the unexpected item
self.assertFalse(fake_notifier.NOTIFICATIONS[4])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 4)
+ self.assertEqual(4, len(fake_notifier.NOTIFICATIONS))
self.volume.delete_snapshot(self.context, snapshot_obj)
msg = fake_notifier.NOTIFICATIONS[4]
- self.assertEqual(msg['event_type'], 'snapshot.delete.start')
+ self.assertEqual('snapshot.delete.start', msg['event_type'])
expected['status'] = 'available'
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[5]
- self.assertEqual(msg['event_type'], 'snapshot.delete.end')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('snapshot.delete.end', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
if len(fake_notifier.NOTIFICATIONS) > 6:
# Cause an assert to print the unexpected item
self.assertFalse(fake_notifier.NOTIFICATIONS[6])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 6)
+ self.assertEqual(6, len(fake_notifier.NOTIFICATIONS))
snap = db.snapshot_get(context.get_admin_context(read_deleted='yes'),
snapshot_id)
- self.assertEqual(snap['status'], 'deleted')
+ self.assertEqual('deleted', snap['status'])
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
result_meta = {
result_dict['snapshot_metadata'][0].key:
result_dict['snapshot_metadata'][0].value}
- self.assertEqual(result_meta, test_meta)
+ self.assertEqual(test_meta, result_meta)
self.volume.delete_snapshot(self.context, snapshot_obj)
self.assertRaises(exception.NotFound,
db.snapshot_get,
# status is deleting
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['status'], 'deleting')
+ self.assertEqual('deleting', volume['status'])
# clean up
self.volume.delete_volume(self.context, volume['id'])
# create bootable volume from image
volume = self._create_volume_from_image()
volume_id = volume['id']
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['bootable'], True)
+ self.assertEqual('available', volume['status'])
+ self.assertTrue(volume['bootable'])
# get volume's volume_glance_metadata
ctxt = context.get_admin_context()
# create bootable volume from image
volume = self._create_volume_from_image()
volume_id = volume['id']
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['bootable'], True)
+ self.assertEqual('available', volume['status'])
+ self.assertTrue(volume['bootable'])
# get volume's volume_glance_metadata
ctxt = context.get_admin_context()
state and is bootable.
"""
volume = self._create_volume_from_image()
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['bootable'], True)
+ self.assertEqual('available', volume['status'])
+ self.assertTrue(volume['bootable'])
self.volume.delete_volume(self.context, volume['id'])
def test_create_volume_from_image_not_cloned_status_available(self):
state and is bootable.
"""
volume = self._create_volume_from_image(fakeout_clone_image=True)
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['bootable'], True)
+ self.assertEqual('available', volume['status'])
+ self.assertTrue(volume['bootable'])
self.volume.delete_volume(self.context, volume['id'])
def test_create_volume_from_image_exception(self):
None,
FAKE_UUID)
volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], "error")
- self.assertEqual(volume['bootable'], False)
+ self.assertEqual("error", volume['status'])
+ self.assertFalse(volume['bootable'])
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
volume = volume_api.create(self.context, 2, 'name', 'description',
image_id=1)
volume_id = volume['id']
- self.assertEqual(volume['status'], 'creating')
+ self.assertEqual('creating', volume['status'])
finally:
# cleanup
size,
'name',
'description')
- self.assertEqual(volume['size'], int(size))
+ self.assertEqual(int(size), volume['size'])
def test_create_volume_int_size(self):
"""Test volume creation with int size."""
volume_api = cinder.volume.api.API()
volume_api.begin_detaching(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
- self.assertEqual(volume['status'], "detaching")
+ self.assertEqual("detaching", volume['status'])
volume_api.roll_detaching(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
- self.assertEqual(volume['status'], "in-use")
+ self.assertEqual("in-use", volume['status'])
def test_volume_api_update(self):
# create a raw vol
volume_api.update(self.context, volume, update_dict)
# read changes from db
vol = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(vol['display_name'], 'test update name')
+ self.assertEqual('test update name', vol['display_name'])
def test_volume_api_update_snapshot(self):
# create raw snapshot
volume_api.update_snapshot(self.context, snapshot_obj, update_dict)
# read changes from db
snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
- self.assertEqual(snap['display_name'], 'test update name')
+ self.assertEqual('test update name', snap['display_name'])
@mock.patch.object(QUOTAS, 'reserve')
def test_extend_volume(self, reserve):
reserve.return_value = ["RESERVATION"]
volume_api.extend(self.context, volume, 3)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['status'], 'extending')
+ self.assertEqual('extending', volume['status'])
# Test the quota exceeded
volume['status'] = 'available'
fake_reservations)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume.status, 'error_extending')
+ self.assertEqual('error_extending', volume.status)
# NOTE(flaper87): Set initialized to True,
# lets cleanup the mess.
self.volume.extend_volume(self.context, volume['id'], '4',
fake_reservations)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['size'], 2)
- self.assertEqual(volume['status'], 'error_extending')
+ self.assertEqual(2, volume['size'])
+ self.assertEqual('error_extending', volume['status'])
# Test driver success
with mock.patch.object(self.volume.driver,
self.volume.extend_volume(self.context, volume['id'], '4',
fake_reservations)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['size'], 4)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual(4, volume['size'])
+ self.assertEqual('available', volume['status'])
# clean up
self.volume.delete_volume(self.context, volume['id'])
volumes_in_use = usage.in_use
except exception.QuotaUsageNotFound:
volumes_in_use = 0
- self.assertEqual(volumes_in_use, 100)
+ self.assertEqual(100, volumes_in_use)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume['volume_type_id'] = vol_type.get('id')
except exception.QuotaUsageNotFound:
volumes_reserved = 0
- self.assertEqual(volumes_reserved, 100)
+ self.assertEqual(100, volumes_reserved)
@mock.patch(
'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
name='fake_name',
description='fake_desc',
source_volume=volume_src)
- self.assertEqual(volume_dst['availability_zone'], 'az2')
+ self.assertEqual('az2', volume_dst['availability_zone'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
for meta_src in src_glancemeta:
for meta_dst in dst_glancemeta:
if meta_dst.key == meta_src.key:
- self.assertEqual(meta_dst.value, meta_src.value)
+ self.assertEqual(meta_src.value, meta_dst.value)
self.volume.delete_volume(self.context, volume_src['id'])
self.volume.delete_volume(self.context, volume_dst['id'])
self.context,
volume_dst['id'], None, None, None, None, None,
volume_src['id'])
- self.assertEqual(volume_src['status'], 'creating')
+ self.assertEqual('creating', volume_src['status'])
self.volume.delete_volume(self.context, volume_dst['id'])
self.volume.delete_volume(self.context, volume_src['id'])
# check volume properties
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['host'], 'newhost')
+ self.assertEqual('newhost', volume['host'])
self.assertIsNone(volume['migration_status'])
def test_migrate_volume_error(self):
host_obj, True)
volume = db.volume_get(context.get_admin_context(),
fake_volume['id'])
- self.assertEqual(volume['host'], 'newhost')
+ self.assertEqual('newhost', volume['host'])
self.assertIsNone(volume['migration_status'])
self.assertFalse(mock_migrate_volume.called)
self.assertFalse(delete_volume.called)
vol = tests_utils.attach_volume(self.context, old_volume['id'],
instance_uuid, attached_host,
'/dev/vda')
- self.assertEqual(vol['status'], 'in-use')
+ self.assertEqual('in-use', vol['status'])
attachment_id = vol['volume_attachment'][0]['id']
target_status = 'target:%s' % old_volume['id']
new_volume = tests_utils.create_volume(self.context, size=0,
attachment = db.volume_attachment_get_by_instance_uuid(
self.context, old_volume['id'], instance_uuid)
self.assertIsNotNone(attachment)
- self.assertEqual(attachment['attached_host'], attached_host)
- self.assertEqual(attachment['instance_uuid'], instance_uuid)
+ self.assertEqual(attached_host, attachment['attached_host'])
+ self.assertEqual(instance_uuid, attachment['instance_uuid'])
else:
self.assertFalse(mock_detach_volume.called)
self.context, volume, new_vol_type['id'])
volume = db.volume_get(elevated, volume.id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
migrate_exc=False, exc=None, diff_equal=False,
# check properties
if driver or diff_equal:
- self.assertEqual(volume['volume_type_id'], vol_type['id'])
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['host'], CONF.host)
- self.assertEqual(volumes_in_use, 1)
+ self.assertEqual(vol_type['id'], volume['volume_type_id'])
+ self.assertEqual('available', volume['status'])
+ self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(1, volumes_in_use)
elif not exc:
- self.assertEqual(volume['volume_type_id'], old_vol_type['id'])
- self.assertEqual(volume['status'], 'retyping')
- self.assertEqual(volume['host'], CONF.host)
- self.assertEqual(volumes_in_use, 1)
+ self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
+ self.assertEqual('retyping', volume['status'])
+ self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(1, volumes_in_use)
else:
- self.assertEqual(volume['volume_type_id'], old_vol_type['id'])
- self.assertEqual(volume['status'], 'available')
- self.assertEqual(volume['host'], CONF.host)
- self.assertEqual(volumes_in_use, 0)
+ self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
+ self.assertEqual('available', volume['status'])
+ self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(0, volumes_in_use)
def test_retype_volume_driver_success(self):
self._retype_volume_exec(True)
host_obj, True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume.migration_status, 'error')
+ self.assertEqual('error', volume.migration_status)
# NOTE(flaper87): Set initialized to True,
# lets cleanup the mess.
volume_api.update_readonly_flag(self.context, volume, False)
volume = db.volume_get(context.get_admin_context(), volume['id'])
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
admin_metadata = volume['volume_admin_metadata']
- self.assertEqual(len(admin_metadata), 1)
- self.assertEqual(admin_metadata[0]['key'], 'readonly')
- self.assertEqual(admin_metadata[0]['value'], 'False')
+ self.assertEqual(1, len(admin_metadata))
+ self.assertEqual('readonly', admin_metadata[0]['key'])
+ self.assertEqual('False', admin_metadata[0]['value'])
# clean up
self.volume.delete_volume(self.context, volume['id'])
availability_zone=CONF.storage_availability_zone,
volume_type='type1,type2')
group_id = group['id']
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.volume.create_consistencygroup(self.context, group_id)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[0]
- self.assertEqual(msg['event_type'], 'consistencygroup.create.start')
+ self.assertEqual('consistencygroup.create.start', msg['event_type'])
expected = {
'status': 'available',
'name': 'test_cg',
'user_id': 'fake',
'consistencygroup_id': group_id
}
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[1]
self.assertEqual(msg['event_type'], 'consistencygroup.create.end')
expected['status'] = 'available'
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
self.assertEqual(
group_id,
db.consistencygroup_get(context.get_admin_context(),
cg = db.consistencygroup_get(
context.get_admin_context(read_deleted='yes'),
group_id)
- self.assertEqual(cg['status'], 'deleted')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 4)
+ self.assertEqual('deleted', cg['status'])
+ self.assertEqual(4, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[2]
- self.assertEqual(msg['event_type'], 'consistencygroup.delete.start')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('consistencygroup.delete.start', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[3]
- self.assertEqual(msg['event_type'], 'consistencygroup.delete.end')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('consistencygroup.delete.end', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
self.assertRaises(exception.NotFound,
db.consistencygroup_get,
self.context,
if len(fake_notifier.NOTIFICATIONS) > 10:
self.assertFalse(fake_notifier.NOTIFICATIONS[10])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 10)
+ self.assertEqual(10, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[6]
- self.assertEqual(msg['event_type'], 'consistencygroup.delete.start')
+ self.assertEqual('consistencygroup.delete.start', msg['event_type'])
expected['status'] = 'available'
self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[8]
- self.assertEqual(msg['event_type'], 'consistencygroup.delete.end')
+ self.assertEqual('consistencygroup.delete.end', msg['event_type'])
self.assertDictMatch(expected, msg['payload'])
cg2 = db.consistencygroup_get(
if len(fake_notifier.NOTIFICATIONS) > 2:
self.assertFalse(fake_notifier.NOTIFICATIONS[2])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)
+ self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
cgsnapshot_returns = self._create_cgsnapshot(group_id, volume_id)
cgsnapshot_id = cgsnapshot_returns[0]['id']
self.assertFalse(fake_notifier.NOTIFICATIONS[6])
msg = fake_notifier.NOTIFICATIONS[2]
- self.assertEqual(msg['event_type'], 'cgsnapshot.create.start')
+ self.assertEqual('cgsnapshot.create.start', msg['event_type'])
expected = {
'created_at': 'DONTCARE',
'name': None,
'user_id': 'fake',
'consistencygroup_id': group_id
}
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[3]
- self.assertEqual(msg['event_type'], 'snapshot.create.start')
+ self.assertEqual('snapshot.create.start', msg['event_type'])
msg = fake_notifier.NOTIFICATIONS[4]
- self.assertEqual(msg['event_type'], 'cgsnapshot.create.end')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('cgsnapshot.create.end', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[5]
- self.assertEqual(msg['event_type'], 'snapshot.create.end')
+ self.assertEqual('snapshot.create.end', msg['event_type'])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 6)
+ self.assertEqual(6, len(fake_notifier.NOTIFICATIONS))
self.volume.delete_cgsnapshot(self.context, cgsnapshot_id)
self.assertFalse(fake_notifier.NOTIFICATIONS[10])
msg = fake_notifier.NOTIFICATIONS[6]
- self.assertEqual(msg['event_type'], 'cgsnapshot.delete.start')
+ self.assertEqual('cgsnapshot.delete.start', msg['event_type'])
expected['status'] = 'available'
- self.assertDictMatch(msg['payload'], expected)
+ self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[8]
- self.assertEqual(msg['event_type'], 'cgsnapshot.delete.end')
- self.assertDictMatch(msg['payload'], expected)
+ self.assertEqual('cgsnapshot.delete.end', msg['event_type'])
+ self.assertDictMatch(expected, msg['payload'])
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 10)
+ self.assertEqual(10, len(fake_notifier.NOTIFICATIONS))
cgsnap = db.cgsnapshot_get(
context.get_admin_context(read_deleted='yes'),
cgsnapshot_id)
- self.assertEqual(cgsnap['status'], 'deleted')
+ self.assertEqual('deleted', cgsnap['status'])
self.assertRaises(exception.NotFound,
db.cgsnapshot_get,
self.context,
cg = db.consistencygroup_get(
context.get_admin_context(read_deleted='yes'),
group_id)
- self.assertEqual(cg['status'], 'deleted')
+ self.assertEqual('deleted', cg['status'])
self.assertRaises(exception.NotFound,
db.consistencygroup_get,
self.context,
cg = db.consistencygroup_get(self.context,
group_id)
# Group is not deleted
- self.assertEqual(cg['status'], 'available')
+ self.assertEqual('available', cg['status'])
def test_secure_file_operations_enabled(self):
"""Test secure file operations setting for base driver.
self.image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
def test_copy_volume_to_image_status_use(self):
self.image_meta['id'] = 'a440c04b-79fa-479c-bed1-0b816eaec379'
self.image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
def test_copy_volume_to_image_driver_not_initialized(self):
# creating volume testdata
self.image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume.status, 'available')
+ self.assertEqual('available', volume.status)
def test_copy_volume_to_image_driver_exception(self):
self.image_meta['id'] = self.image_id
self.volume_id,
self.image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
# image shouldn't be deleted if it is not in queued state
image_service.show(self.context, self.image_id)
self.volume_id,
queued_image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
# queued image should be deleted
self.assertRaises(exception.ImageNotFound,
image_service.show,
self.volume_id,
saving_image_meta)
volume = db.volume_get(self.context, self.volume_id)
- self.assertEqual(volume['status'], 'available')
+ self.assertEqual('available', volume['status'])
# image in saving state should be deleted
self.assertRaises(exception.ImageNotFound,
image_service.show,
datetime.datetime(1, 3, 1, 1, 1, 1),
datetime.datetime(1, 4, 1, 1, 1, 1),
project_id='p1')
- self.assertEqual(len(volumes), 3)
- self.assertEqual(volumes[0].id, u'2')
- self.assertEqual(volumes[1].id, u'3')
- self.assertEqual(volumes[2].id, u'4')
+ self.assertEqual(3, len(volumes))
+ self.assertEqual(u'2', volumes[0].id)
+ self.assertEqual(u'3', volumes[1].id)
+ self.assertEqual(u'4', volumes[2].id)
def test_snapshot_get_active_by_window(self):
# Find all all snapshots valid within a timeframe window.
datetime.datetime(1, 3, 1, 1, 1, 1),
datetime.datetime(1, 4, 1, 1, 1, 1),
project_id='p1')
- self.assertEqual(len(snapshots), 3)
- self.assertEqual(snapshots[0].id, u'2')
- self.assertEqual(snapshots[0].volume.id, u'1')
- self.assertEqual(snapshots[1].id, u'3')
- self.assertEqual(snapshots[1].volume.id, u'1')
- self.assertEqual(snapshots[2].id, u'4')
- self.assertEqual(snapshots[2].volume.id, u'1')
+ self.assertEqual(3, len(snapshots))
+ self.assertEqual(u'2', snapshots[0].id)
+ self.assertEqual(u'1', snapshots[0].volume.id)
+ self.assertEqual(u'3', snapshots[1].id)
+ self.assertEqual(u'1', snapshots[1].volume.id)
+ self.assertEqual(u'4', snapshots[2].id)
+ self.assertEqual(u'1', snapshots[2].volume.id)
class DriverTestCase(test.TestCase):
vol = {'name': 'test', 'id': 1, 'size': 0}
def _rename_volume(old_name, new_name):
- self.assertEqual(old_name, ref['source-name'])
- self.assertEqual(new_name, vol['name'])
+ self.assertEqual(ref['source-name'], old_name)
+ self.assertEqual(vol['name'], new_name)
self.stubs.Set(self.volume.driver.vg, 'rename_volume',
_rename_volume)
size = self.volume.driver.manage_existing_get_size(vol, ref)
- self.assertEqual(size, 2)
+ self.assertEqual(2, size)
model_update = self.volume.driver.manage_existing(vol, ref)
self.assertIsNone(model_update)
cinder.volume.targets.tgt.TgtAdm(configuration=self.configuration)
iscsi_driver._do_iscsi_discovery = lambda v: "0.0.0.0:0000,0 iqn:iqn 0"
result = iscsi_driver._get_iscsi_properties(volume)
- self.assertEqual(result["target_portal"], "0.0.0.0:0000")
- self.assertEqual(result["target_iqn"], "iqn:iqn")
- self.assertEqual(result["target_lun"], 0)
+ self.assertEqual("0.0.0.0:0000", result["target_portal"])
+ self.assertEqual("iqn:iqn", result["target_iqn"])
+ self.assertEqual(0, result["target_lun"])
def test_get_iscsi_properties_multiple_portals(self):
volume = {"provider_location": '1.1.1.1:3260;2.2.2.2:3261,1 iqn:iqn 0',
iscsi_driver = \
cinder.volume.targets.tgt.TgtAdm(configuration=self.configuration)
result = iscsi_driver._get_iscsi_properties(volume)
- self.assertEqual(result["target_portal"], "1.1.1.1:3260")
- self.assertEqual(result["target_iqn"], "iqn:iqn")
- self.assertEqual(result["target_lun"], 0)
+ self.assertEqual("1.1.1.1:3260", result["target_portal"])
+ self.assertEqual("iqn:iqn", result["target_iqn"])
+ self.assertEqual(0, result["target_lun"])
self.assertEqual(["1.1.1.1:3260", "2.2.2.2:3261"],
result["target_portals"])
self.assertEqual(["iqn:iqn", "iqn:iqn"], result["target_iqns"])
stats = self.volume.driver._stats
self.assertEqual(
- stats['pools'][0]['total_capacity_gb'], float('5.52'))
+ float('5.52'), stats['pools'][0]['total_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['free_capacity_gb'], float('0.52'))
+ float('0.52'), stats['pools'][0]['free_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['provisioned_capacity_gb'], float('5.0'))
+ float('5.0'), stats['pools'][0]['provisioned_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['total_volumes'], int('1'))
+ int('1'), stats['pools'][0]['total_volumes'])
def test_validate_connector(self):
iscsi_driver =\
stats = self.volume.driver.get_volume_stats(refresh=True)
self.assertEqual(
- stats['pools'][0]['total_capacity_gb'], float('5.52'))
+ float('5.52'), stats['pools'][0]['total_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['free_capacity_gb'], float('0.52'))
+ float('0.52'), stats['pools'][0]['free_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['provisioned_capacity_gb'], float('5.0'))
- self.assertEqual(stats['storage_protocol'], 'iSER')
+ float('5.0'), stats['pools'][0]['provisioned_capacity_gb'])
+ self.assertEqual('iSER', stats['storage_protocol'])
@test.testtools.skip("SKIP until ISER driver is removed or fixed")
def test_get_volume_stats2(self):
stats = iser_driver.get_volume_stats(refresh=True)
self.assertEqual(
- stats['pools'][0]['total_capacity_gb'], 0)
+ 0, stats['pools'][0]['total_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['free_capacity_gb'], 0)
+ 0, stats['pools'][0]['free_capacity_gb'])
self.assertEqual(
- stats['pools'][0]['provisioned_capacity_gb'], float('5.0'))
- self.assertEqual(stats['storage_protocol'], 'iSER')
+ float('5.0'), stats['pools'][0]['provisioned_capacity_gb'])
+ self.assertEqual('iSER', stats['storage_protocol'])
class FibreChannelTestCase(DriverTestCase):