self.assertEqual(0, len(reservations))
+ def _stub_allocated_get_all_by_project(self, allocated_quota=False):
+ def fake_qagabp(context, project_id):
+ self.assertEqual('test_project', project_id)
+ if allocated_quota:
+ return dict(project_id=project_id, volumes=3,
+ gigabytes = 2 * 1024)
+ return dict(project_id=project_id)
+
+ self.stubs.Set(sqa_api,
+ 'quota_allocated_get_all_by_project',
+ fake_qagabp)
+
+ def test_quota_reserve_with_allocated(self):
+ context = FakeContext('test_project', 'test_class')
+ # Allocated quota for volume will be updated for 3
+ self._stub_allocated_get_all_by_project(allocated_quota=True)
+ # Quota limited for volume updated for 10
+ quotas = dict(volumes=10,
+ gigabytes=10 * 1024, )
+ # Try reserve 7 volumes
+ deltas = dict(volumes=7,
+ gigabytes=2 * 1024, )
+ result = sqa_api.quota_reserve(context, self.resources, quotas,
+ deltas, self.expire, 5, 0)
+ # The reservation works
+ self.compare_reservation(
+ result,
+ [dict(resource='volumes',
+ usage_id=self.usages_created['volumes'],
+ project_id='test_project',
+ delta=7),
+ dict(resource='gigabytes',
+ usage_id=self.usages_created['gigabytes'],
+ delta=2 * 1024), ])
+ # But if we try reserve 8 volumes(more free quota that we have)
+ deltas = dict(volumes=8,
+ gigabytes=2 * 1024, )
+
+ self.assertRaises(exception.OverQuota,
+ sqa_api.quota_reserve,
+ context, self.resources, quotas,
+ deltas, self.expire, 0, 0)
+
def test_quota_reserve_create_usages(self):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5,
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, max_age)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
+ self._stub_allocated_get_all_by_project()
self.assertRaises(exception.OverQuota,
sqa_api.quota_reserve,
context, self.resources, quotas,
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
+ self._stub_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)