1 """Tests with-statement behavior of Timeout class."""
8 from eventlet import sleep
9 from eventlet.timeout import Timeout
10 from tests import LimitedTestCase
16 class Error(Exception):
20 class Test(LimitedTestCase):
21 def test_cancellation(self):
22 # Nothing happens if with-block finishes before the timeout expires
23 t = Timeout(DELAY * 2)
24 sleep(0) # make it pending
25 assert t.pending, repr(t)
27 assert t.pending, repr(t)
29 # check if timer was actually cancelled
30 assert not t.pending, repr(t)
33 def test_raising_self(self):
34 # An exception will be raised if it's not
36 with Timeout(DELAY) as t:
39 assert ex is t, (ex, t)
41 raise AssertionError('must raise Timeout')
43 def test_raising_self_true(self):
44 # specifying True as the exception raises self as well
46 with Timeout(DELAY, True) as t:
49 assert ex is t, (ex, t)
51 raise AssertionError('must raise Timeout')
53 def test_raising_custom_exception(self):
54 # You can customize the exception raised:
56 with Timeout(DELAY, IOError("Operation takes way too long")):
59 assert str(ex) == "Operation takes way too long", repr(ex)
61 def test_raising_exception_class(self):
62 # Providing classes instead of values should be possible too:
64 with Timeout(DELAY, ValueError):
69 def test_raising_exc_tuple(self):
74 with Timeout(DELAY, sys.exc_info()[0]):
76 raise AssertionError('should not get there')
77 raise AssertionError('should not get there')
78 except ZeroDivisionError:
81 raise AssertionError('should not get there')
83 def test_cancel_timer_inside_block(self):
84 # It's possible to cancel the timer inside the block:
85 with Timeout(DELAY) as timer:
89 def test_silent_block(self):
90 # To silence the exception before exiting the block, pass
91 # False as second parameter.
94 with Timeout(XDELAY, False):
96 delta = (time.time() - start)
97 assert delta < XDELAY * 2, delta
99 def test_dummy_timer(self):
100 # passing None as seconds disables the timer
107 err_ref = weakref.ref(err)
108 with Timeout(DELAY * 2, err):
112 assert not err_ref(), repr(err_ref())
114 def test_nested_timeout(self):
115 with Timeout(DELAY, False):
116 with Timeout(DELAY * 2, False):
118 raise AssertionError('should not get there')
120 with Timeout(DELAY) as t1:
121 with Timeout(DELAY * 2) as t2:
124 except Timeout as ex:
125 assert ex is t1, (ex, t1)
126 assert not t1.pending, t1
127 assert t2.pending, t2
128 assert not t2.pending, t2
130 with Timeout(DELAY * 2) as t1:
131 with Timeout(DELAY) as t2:
134 except Timeout as ex:
135 assert ex is t2, (ex, t2)
136 assert t1.pending, t1
137 assert not t2.pending, t2
138 assert not t1.pending, t1