self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
+ @mock.patch('cinder.volume.flows.api.create_volume.get_flow')
+ def test_create_volume_from_snapshot_with_types(self, _get_flow):
+ """Test volume create from snapshot with types including mistmatch."""
+ volume_api = cinder.volume.api.API()
+
+ db.volume_type_create(context.get_admin_context(),
+ {'name': 'foo', 'extra_specs': {}})
+ db.volume_type_create(context.get_admin_context(),
+ {'name': 'biz', 'extra_specs': {}})
+
+ foo_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'foo')
+ biz_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'biz')
+
+ snapshot = {'id': 1234,
+ 'status': 'available',
+ 'volume_size': 10,
+ 'volume_type_id': biz_type['id']}
+
+ # Make sure the case of specifying a type that
+ # doesn't match the snapshots type fails
+ self.assertRaises(exception.InvalidInput,
+ volume_api.create,
+ self.context,
+ size=1,
+ name='fake_name',
+ description='fake_desc',
+ volume_type=foo_type,
+ snapshot=snapshot)
+
+ # Make sure that trying to specify a type
+ # when the snapshots type is None fails
+ snapshot['volume_type_id'] = None
+ self.assertRaises(exception.InvalidInput,
+ volume_api.create,
+ self.context,
+ size=1,
+ name='fake_name',
+ description='fake_desc',
+ volume_type=foo_type,
+ snapshot=snapshot)
+
+ snapshot['volume_type_id'] = foo_type['id']
+ volume_api.create(self.context, size=1, name='fake_name',
+ description='fake_desc', volume_type=foo_type,
+ snapshot=snapshot)
+
+ db.volume_type_destroy(context.get_admin_context(),
+ foo_type['id'])
+ db.volume_type_destroy(context.get_admin_context(),
+ biz_type['id'])
+
+ @mock.patch('cinder.volume.flows.api.create_volume.get_flow')
+ def test_create_volume_from_source_with_types(self, _get_flow):
+ """Test volume create from source with types including mistmatch."""
+ volume_api = cinder.volume.api.API()
+
+ db.volume_type_create(context.get_admin_context(),
+ {'name': 'foo', 'extra_specs': {}})
+ db.volume_type_create(context.get_admin_context(),
+ {'name': 'biz', 'extra_specs': {}})
+
+ foo_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'foo')
+ biz_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'biz')
+
+ source_vol = {'id': 1234,
+ 'status': 'available',
+ 'volume_size': 10,
+ 'volume_type': biz_type,
+ 'volume_type_id': biz_type['id']}
+
+ # Make sure the case of specifying a type that
+ # doesn't match the snapshots type fails
+ self.assertRaises(exception.InvalidInput,
+ volume_api.create,
+ self.context,
+ size=1,
+ name='fake_name',
+ description='fake_desc',
+ volume_type=foo_type,
+ source_volume=source_vol)
+
+ # Make sure that trying to specify a type
+ # when the source type is None fails
+ source_vol['volume_type_id'] = None
+ source_vol['volume_type'] = None
+ self.assertRaises(exception.InvalidInput,
+ volume_api.create,
+ self.context,
+ size=1,
+ name='fake_name',
+ description='fake_desc',
+ volume_type=foo_type,
+ source_volume=source_vol)
+
+ source_vol['volume_type_id'] = biz_type['id']
+ source_vol['volume_type'] = biz_type
+ volume_api.create(self.context, size=1, name='fake_name',
+ description='fake_desc', volume_type=biz_type,
+ source_volume=source_vol)
+
+ db.volume_type_destroy(context.get_admin_context(),
+ foo_type['id'])
+ db.volume_type_destroy(context.get_admin_context(),
+ biz_type['id'])
+
def test_create_snapshot_driver_not_initialized(self):
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
availability_zone=None, source_volume=None,
scheduler_hints=None, backup_source_volume=None):
+ if source_volume and volume_type:
+ if volume_type['id'] != source_volume['volume_type_id']:
+ msg = _("Invalid volume_type provided (requested type "
+ "must match source volume, or be omitted). "
+ "You should omit the argument.")
+ raise exception.InvalidInput(reason=msg)
+
+ if snapshot and volume_type:
+ if volume_type['id'] != snapshot['volume_type_id']:
+ msg = _("Invalid volume_type provided (requested type "
+ "must match source snapshot, or be omitted). "
+ "You should omit the argument.")
+ raise exception.InvalidInput(reason=msg)
+
def check_volume_az_zone(availability_zone):
try:
return self._valid_availability_zone(availability_zone)
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume_id)
- if volume['migration_status'] != None:
+ if volume['migration_status'] is not None:
# Volume is migrating, wait until done
msg = _("Volume cannot be deleted while migrating")
raise exception.InvalidVolume(reason=msg)
force=False, metadata=None):
check_policy(context, 'create_snapshot', volume)
- if volume['migration_status'] != None:
+ if volume['migration_status'] is not None:
# Volume is migrating, wait until done
msg = _("Snapshot cannot be created while volume is migrating")
raise exception.InvalidVolume(reason=msg)
raise exception.InvalidVolume(reason=msg)
# Make sure volume is not part of a migration
- if volume['migration_status'] != None:
+ if volume['migration_status'] is not None:
msg = _("Volume is already part of an active migration")
raise exception.InvalidVolume(reason=msg)