Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / semaphore_test.py
1 import time
2 import unittest
3
4 import eventlet
5 from eventlet import semaphore
6 from tests import LimitedTestCase
7
8
9 class TestSemaphore(LimitedTestCase):
10
11     def test_bounded(self):
12         sem = semaphore.CappedSemaphore(2, limit=3)
13         self.assertEqual(sem.acquire(), True)
14         self.assertEqual(sem.acquire(), True)
15         gt1 = eventlet.spawn(sem.release)
16         self.assertEqual(sem.acquire(), True)
17         self.assertEqual(-3, sem.balance)
18         sem.release()
19         sem.release()
20         sem.release()
21         gt2 = eventlet.spawn(sem.acquire)
22         sem.release()
23         self.assertEqual(3, sem.balance)
24         gt1.wait()
25         gt2.wait()
26
27     def test_bounded_with_zero_limit(self):
28         sem = semaphore.CappedSemaphore(0, 0)
29         gt = eventlet.spawn(sem.acquire)
30         sem.release()
31         gt.wait()
32
33     def test_non_blocking(self):
34         sem = semaphore.Semaphore(0)
35         self.assertEqual(sem.acquire(blocking=False), False)
36
37     def test_timeout(self):
38         sem = semaphore.Semaphore(0)
39         start = time.time()
40         self.assertEqual(sem.acquire(timeout=0.1), False)
41         self.assertTrue(time.time() - start >= 0.1)
42
43     def test_timeout_non_blocking(self):
44         sem = semaphore.Semaphore()
45         self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
46
47
48 if __name__ == '__main__':
49     unittest.main()