def _sync_volumes(context, project_id, session):
- return dict(zip(('volumes', 'gigabytes'),
- db.volume_data_get_for_project(context,
- project_id,
- session=session)))
+ (volumes, gigs) = db.volume_data_get_for_project(context,
+ project_id,
+ session=session)
+ return {'volumes': volumes}
def _sync_snapshots(context, project_id, session):
- return dict(zip(('snapshots', 'gigabytes'),
- db.snapshot_data_get_for_project(context,
- project_id,
- session=session)))
+ (snapshots, gigs) = db.snapshot_data_get_for_project(context,
+ project_id,
+ session=session)
+ return {'snapshots': snapshots}
+
+
+def _sync_gigabytes(context, project_id, session):
+ (_junk, vol_gigs) = db.volume_data_get_for_project(context,
+ project_id,
+ session=session)
+ if FLAGS.no_snapshot_gb_quota:
+ return {'gigabytes': vol_gigs}
+
+ (_junk, snap_gigs) = db.snapshot_data_get_for_project(context,
+ project_id,
+ session=session)
+ return {'gigabytes': vol_gigs + snap_gigs}
QUOTAS = QuotaEngine()
resources = [
ReservableResource('volumes', _sync_volumes, 'quota_volumes'),
ReservableResource('snapshots', _sync_snapshots, 'quota_snapshots'),
- ReservableResource('gigabytes', _sync_volumes, 'quota_gigabytes'),
- ReservableResource('gigabytes', _sync_snapshots, 'quota_gigabytes'), ]
+ ReservableResource('gigabytes', _sync_gigabytes, 'quota_gigabytes'), ]
QUOTAS.register_resources(resources)
vol['status'] = 'available'
return db.volume_create(self.context, vol)
+ def _create_snapshot(self, volume):
+ snapshot = {}
+ snapshot['user_id'] = self.user_id
+ snapshot['project_id'] = self.project_id
+ snapshot['volume_id'] = volume['id']
+ snapshot['volume_size'] = volume['size']
+ snapshot['status'] = 'available'
+ return db.snapshot_create(self.context, snapshot)
+
def test_too_many_volumes(self):
volume_ids = []
for i in range(FLAGS.quota_volumes):
for volume_id in volume_ids:
db.volume_destroy(self.context, volume_id)
+ def test_too_many_combined_gigabytes(self):
+ vol_ref = self._create_volume(size=10)
+ snap_ref = self._create_snapshot(vol_ref)
+ self.assertRaises(exception.QuotaError,
+ volume.API().create_snapshot,
+ self.context, vol_ref, '', '')
+ usages = db.quota_usage_get_all_by_project(self.context,
+ self.project_id)
+ self.assertEqual(usages['gigabytes']['in_use'], 20)
+ db.snapshot_destroy(self.context, snap_ref['id'])
+ db.volume_destroy(self.context, vol_ref['id'])
+
+ def test_no_snapshot_gb_quota_flag(self):
+ self.flags(quota_volumes=2,
+ quota_snapshots=2,
+ quota_gigabytes=20,
+ no_snapshot_gb_quota=True)
+ vol_ref = self._create_volume(size=10)
+ snap_ref = self._create_snapshot(vol_ref)
+ snap_ref2 = volume.API().create_snapshot(self.context,
+ vol_ref, '', '')
+
+ # Make sure no reservation was created for snapshot gigabytes.
+ reservations = db.reservation_get_all_by_project(self.context,
+ self.project_id)
+ self.assertEqual(reservations.get('gigabytes'), None)
+
+ # Make sure the snapshot volume_size isn't included in usage.
+ vol_type = db.volume_type_create(self.context,
+ dict(name=FLAGS.default_volume_type))
+ vol_ref2 = volume.API().create(self.context, 10, '', '')
+ usages = db.quota_usage_get_all_by_project(self.context,
+ self.project_id)
+ self.assertEqual(usages['gigabytes']['in_use'], 20)
+
+ db.snapshot_destroy(self.context, snap_ref['id'])
+ db.snapshot_destroy(self.context, snap_ref2['id'])
+ db.volume_destroy(self.context, vol_ref['id'])
+ db.volume_destroy(self.context, vol_ref2['id'])
+ db.volume_type_destroy(self.context, vol_type['id'])
+
class FakeContext(object):
def __init__(self, project_id, quota_class):