Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / timeout_test_with_statement.py
1 """Tests with-statement behavior of Timeout class."""
2
3 import gc
4 import sys
5 import time
6 import weakref
7
8 from eventlet import sleep
9 from eventlet.timeout import Timeout
10 from tests import LimitedTestCase
11
12
13 DELAY = 0.01
14
15
16 class Error(Exception):
17     pass
18
19
20 class Test(LimitedTestCase):
21     def test_cancellation(self):
22         # Nothing happens if with-block finishes before the timeout expires
23         t = Timeout(DELAY * 2)
24         sleep(0)  # make it pending
25         assert t.pending, repr(t)
26         with t:
27             assert t.pending, repr(t)
28             sleep(DELAY)
29         # check if timer was actually cancelled
30         assert not t.pending, repr(t)
31         sleep(DELAY * 2)
32
33     def test_raising_self(self):
34         # An exception will be raised if it's not
35         try:
36             with Timeout(DELAY) as t:
37                 sleep(DELAY * 2)
38         except Timeout as ex:
39             assert ex is t, (ex, t)
40         else:
41             raise AssertionError('must raise Timeout')
42
43     def test_raising_self_true(self):
44         # specifying True as the exception raises self as well
45         try:
46             with Timeout(DELAY, True) as t:
47                 sleep(DELAY * 2)
48         except Timeout as ex:
49             assert ex is t, (ex, t)
50         else:
51             raise AssertionError('must raise Timeout')
52
53     def test_raising_custom_exception(self):
54         # You can customize the exception raised:
55         try:
56             with Timeout(DELAY, IOError("Operation takes way too long")):
57                 sleep(DELAY * 2)
58         except IOError as ex:
59             assert str(ex) == "Operation takes way too long", repr(ex)
60
61     def test_raising_exception_class(self):
62         # Providing classes instead of values should be possible too:
63         try:
64             with Timeout(DELAY, ValueError):
65                 sleep(DELAY * 2)
66         except ValueError:
67             pass
68
69     def test_raising_exc_tuple(self):
70         try:
71             1 // 0
72         except:
73             try:
74                 with Timeout(DELAY, sys.exc_info()[0]):
75                     sleep(DELAY * 2)
76                     raise AssertionError('should not get there')
77                 raise AssertionError('should not get there')
78             except ZeroDivisionError:
79                 pass
80         else:
81             raise AssertionError('should not get there')
82
83     def test_cancel_timer_inside_block(self):
84         # It's possible to cancel the timer inside the block:
85         with Timeout(DELAY) as timer:
86             timer.cancel()
87             sleep(DELAY * 2)
88
89     def test_silent_block(self):
90         # To silence the exception before exiting the block, pass
91         # False as second parameter.
92         XDELAY = 0.1
93         start = time.time()
94         with Timeout(XDELAY, False):
95             sleep(XDELAY * 2)
96         delta = (time.time() - start)
97         assert delta < XDELAY * 2, delta
98
99     def test_dummy_timer(self):
100         # passing None as seconds disables the timer
101         with Timeout(None):
102             sleep(DELAY)
103         sleep(DELAY)
104
105     def test_ref(self):
106         err = Error()
107         err_ref = weakref.ref(err)
108         with Timeout(DELAY * 2, err):
109             sleep(DELAY)
110         del err
111         gc.collect()
112         assert not err_ref(), repr(err_ref())
113
114     def test_nested_timeout(self):
115         with Timeout(DELAY, False):
116             with Timeout(DELAY * 2, False):
117                 sleep(DELAY * 3)
118             raise AssertionError('should not get there')
119
120         with Timeout(DELAY) as t1:
121             with Timeout(DELAY * 2) as t2:
122                 try:
123                     sleep(DELAY * 3)
124                 except Timeout as ex:
125                     assert ex is t1, (ex, t1)
126                 assert not t1.pending, t1
127                 assert t2.pending, t2
128             assert not t2.pending, t2
129
130         with Timeout(DELAY * 2) as t1:
131             with Timeout(DELAY) as t2:
132                 try:
133                     sleep(DELAY * 3)
134                 except Timeout as ex:
135                     assert ex is t2, (ex, t2)
136                 assert t1.pending, t1
137                 assert not t2.pending, t2
138         assert not t1.pending, t1