ced91364534fc7ee8424ff1a32b6f19369115dca
[packages/trusty/python-eventlet.git] / python-eventlet / tests / semaphore_test.py
1 import time
2 import unittest
3
4 import eventlet
5 from eventlet import semaphore
6 from tests import LimitedTestCase
7
8
9 class TestSemaphore(LimitedTestCase):
10
11     def test_bounded(self):
12         sem = semaphore.CappedSemaphore(2, limit=3)
13         self.assertEqual(sem.acquire(), True)
14         self.assertEqual(sem.acquire(), True)
15         gt1 = eventlet.spawn(sem.release)
16         self.assertEqual(sem.acquire(), True)
17         self.assertEqual(-3, sem.balance)
18         sem.release()
19         sem.release()
20         sem.release()
21         gt2 = eventlet.spawn(sem.acquire)
22         sem.release()
23         self.assertEqual(3, sem.balance)
24         gt1.wait()
25         gt2.wait()
26
27     def test_bounded_with_zero_limit(self):
28         sem = semaphore.CappedSemaphore(0, 0)
29         gt = eventlet.spawn(sem.acquire)
30         sem.release()
31         gt.wait()
32
33     def test_non_blocking(self):
34         sem = semaphore.Semaphore(0)
35         self.assertEqual(sem.acquire(blocking=False), False)
36
37     def test_timeout(self):
38         sem = semaphore.Semaphore(0)
39         start = time.time()
40         self.assertEqual(sem.acquire(timeout=0.1), False)
41         self.assertTrue(time.time() - start >= 0.1)
42
43     def test_timeout_non_blocking(self):
44         sem = semaphore.Semaphore()
45         self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
46
47
48 def test_semaphore_contention():
49     g_mutex = semaphore.Semaphore()
50     counts = [0, 0]
51
52     def worker(no):
53         while min(counts) < 200:
54             with g_mutex:
55                 counts[no - 1] += 1
56                 eventlet.sleep(0.001)
57
58     t1 = eventlet.spawn(worker, no=1)
59     t2 = eventlet.spawn(worker, no=2)
60     eventlet.sleep(0.5)
61     t1.kill()
62     t2.kill()
63
64     assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
65
66
67 if __name__ == '__main__':
68     unittest.main()