self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
+def test_semaphore_contention():
+ g_mutex = semaphore.Semaphore()
+ counts = [0, 0]
+
+ def worker(no):
+ while min(counts) < 200:
+ with g_mutex:
+ counts[no - 1] += 1
+ eventlet.sleep(0.001)
+
+ t1 = eventlet.spawn(worker, no=1)
+ t2 = eventlet.spawn(worker, no=2)
+ eventlet.sleep(0.5)
+ t1.kill()
+ t2.kill()
+
+ assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
+
+
if __name__ == '__main__':
- unittest.main()
+ unittest.main()
\ No newline at end of file