5 from eventlet import semaphore
6 from tests import LimitedTestCase
9 class TestSemaphore(LimitedTestCase):
11 def test_bounded(self):
12 sem = semaphore.CappedSemaphore(2, limit=3)
13 self.assertEqual(sem.acquire(), True)
14 self.assertEqual(sem.acquire(), True)
15 gt1 = eventlet.spawn(sem.release)
16 self.assertEqual(sem.acquire(), True)
17 self.assertEqual(-3, sem.balance)
21 gt2 = eventlet.spawn(sem.acquire)
23 self.assertEqual(3, sem.balance)
27 def test_bounded_with_zero_limit(self):
28 sem = semaphore.CappedSemaphore(0, 0)
29 gt = eventlet.spawn(sem.acquire)
33 def test_non_blocking(self):
34 sem = semaphore.Semaphore(0)
35 self.assertEqual(sem.acquire(blocking=False), False)
37 def test_timeout(self):
38 sem = semaphore.Semaphore(0)
40 self.assertEqual(sem.acquire(timeout=0.1), False)
41 self.assertTrue(time.time() - start >= 0.1)
43 def test_timeout_non_blocking(self):
44 sem = semaphore.Semaphore()
45 self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
48 def test_semaphore_contention():
49 g_mutex = semaphore.Semaphore()
53 while min(counts) < 200:
58 t1 = eventlet.spawn(worker, no=1)
59 t2 = eventlet.spawn(worker, no=2)
64 assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
67 if __name__ == '__main__':