"""
import array
+import binascii
import uuid
from cinder import exception
def _generate_hex_key(self, **kwargs):
key_length = kwargs.get('key_length', 256)
# hex digit => 4 bits
- hex_encoded = utils.generate_password(length=key_length / 4,
+ hex_encoded = utils.generate_password(length=key_length // 4,
symbolgroups='0123456789ABCDEF')
return hex_encoded
def _generate_key(self, **kwargs):
_hex = self._generate_hex_key(**kwargs)
- return key.SymmetricKey('AES',
- array.array('B', _hex.decode('hex')).tolist())
+ key_bytes = array.array('B', binascii.unhexlify(_hex)).tolist()
+ return key.SymmetricKey('AES', key_bytes)
def create_key(self, ctxt, **kwargs):
"""Creates a key.
for length in [64, 128, 256]:
key_id = self.key_mgr.create_key(self.ctxt, key_length=length)
key = self.key_mgr.get_key(self.ctxt, key_id)
- self.assertEqual(length / 8, len(key.get_encoded()))
+ self.assertEqual(length // 8, len(key.get_encoded()))
def test_create_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.create_key, None)
def test_store_key(self):
- secret_key = array.array('B', ('0' * 64).decode('hex')).tolist()
+ secret_key = array.array('B', b'\x00' * 32).tolist()
_key = keymgr_key.SymmetricKey('AES', secret_key)
key_id = self.key_mgr.store_key(self.ctxt, _key)
from cinder.volume.drivers import lvm
from cinder.volume import manager as vol_manager
from cinder.volume import rpcapi as volume_rpcapi
+import cinder.volume.targets.tgt
from cinder.volume import utils as volutils
from cinder.volume import volume_types
self.stubs.Set(db, 'service_get_all_by_topic',
stub_service_get_all_by_topic)
+ def sort_func(obj):
+ return obj['name']
+
volume_api = cinder.volume.api.API()
azs = volume_api.list_availability_zones()
- azs = list(azs).sort()
+ azs = sorted(azs, key=sort_func)
- expected = [
+ expected = sorted([
{'name': 'pung', 'available': False},
{'name': 'pong', 'available': True},
{'name': 'ping', 'available': True},
- ].sort()
+ ], key=sort_func)
self.assertEqual(expected, azs)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
+ def sort_func(obj):
+ return obj['name']
+
volume_api = cinder.volume.api.API()
# Update fails when status != available
mock_volume_get.return_value = vol
mock_get_connector_properties.return_value = properties
- f = mock_file_open.return_value = file('/dev/null')
+ f = mock_file_open.return_value = open('/dev/null', 'rb')
backup_service.backup(backup_obj, f, None)
self.volume.driver._attach_volume.return_value = attach_info, vol
mock_volume_get.return_value = vol
self.volume.driver._create_temp_cloned_volume.return_value = temp_vol
mock_get_connector_properties.return_value = properties
- f = mock_file_open.return_value = file('/dev/null')
+ f = mock_file_open.return_value = open('/dev/null', 'rb')
backup_service.backup(backup_obj, f, None)
self.volume.driver._attach_volume.return_value = attach_info, vol
mock_volume_get.return_value = vol
mock_connect_volume.return_value = {'type': 'local',
'path': '/dev/null'}
- f = mock_file_open.return_value = file('/dev/null')
+ f = mock_file_open.return_value = open('/dev/null', 'rb')
self.volume.driver._connect_device
backup_service.backup(backup_obj, f, None)
mock_volume_get.return_value = vol
mock_get_connector_properties.return_value = properties
- f = mock_file_open.return_value = file('/dev/null')
+ f = mock_file_open.return_value = open('/dev/null', 'rb')
backup_service.backup(backup_obj, f, None)
self.volume.driver._attach_volume.return_value = attach_info
self.volume.driver._delete_temp_snapshot = mock.MagicMock()
mock_get_connector_properties.return_value = properties
- f = mock_file_open.return_value = file('/dev/null')
+ f = mock_file_open.return_value = open('/dev/null', 'rb')
backup_service.backup(backup_obj, f, None)
self.volume.driver._attach_volume.return_value = attach_info
except Exception:
with excutils.save_and_reraise_exception():
try:
- if hasattr(snapshot, 'id'):
+ if snapshot.obj_attr_is_set('id'):
snapshot.destroy()
finally:
QUOTAS.rollback(context, reservations)
# Check image size is not larger than volume size.
image_size = utils.as_int(image_meta['size'], quiet=False)
- image_size_in_gb = (image_size + GB - 1) / GB
+ image_size_in_gb = (image_size + GB - 1) // GB
if image_size_in_gb > size:
msg = _('Size of specified image %(image_size)sGB'
' is larger than volume size %(volume_size)sGB.')
sorted_snapshots = []
for vol in volumes:
- found_snaps = filter(
- lambda snap: snap['id'] == vol['snapshot_id'], snapshots)
+ found_snaps = [snap for snap in snapshots
+ if snap['id'] == vol['snapshot_id']]
if not found_snaps:
LOG.error(_LE("Source snapshot cannot be found for target "
"volume %(volume_id)s."),
sorted_source_vols = []
for vol in volumes:
- found_source_vols = filter(
- lambda source_vol: source_vol['id'] == vol['source_volid'],
- source_vols)
+ found_source_vols = [source_vol for source_vol in source_vols
+ if source_vol['id'] == vol['source_volid']]
if not found_source_vols:
LOG.error(_LE("Source volumes cannot be found for target "
"volume %(volume_id)s."),
commands =
python -m testtools.run \
cinder.tests.unit.image.test_glance \
+ cinder.tests.unit.keymgr.test_mock_key_mgr \
cinder.tests.unit.targets.test_base_iscsi_driver \
cinder.tests.unit.targets.test_cxt_driver \
cinder.tests.unit.targets.test_iser_driver \
cinder.tests.unit.test_v6000_common \
cinder.tests.unit.test_vmware_vmdk \
cinder.tests.unit.test_vmware_volumeops \
+ cinder.tests.unit.test_volume \
cinder.tests.unit.test_volume_configuration \
cinder.tests.unit.test_volume_glance_metadata \
cinder.tests.unit.test_volume_rpcapi \