4 warnings.simplefilter('ignore', DeprecationWarning)
5 from eventlet import proc
6 warnings.simplefilter('default', DeprecationWarning)
7 from eventlet import coros
8 from eventlet import event as _event
9 from eventlet import Timeout, sleep, getcurrent, with_timeout
10 from tests import LimitedTestCase, skipped, silence_warnings
15 class ExpectedError(Exception):
19 class TestLink_Signal(LimitedTestCase):
24 q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
26 self.assertRaises(Timeout, s.wait, 0)
27 assert s.wait(0, None) is None
28 assert s.wait(0.001, None) is None
29 self.assertRaises(Timeout, s.wait, 0.001)
43 def test_send_exception(self):
45 q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
47 s.send_exception(OSError('hello'))
55 self.assertRaises(OSError, q1.wait)
56 self.assertRaises(OSError, q3.wait)
57 self.assertRaises(OSError, s.wait)
60 class TestProc(LimitedTestCase):
63 p = proc.spawn(lambda: 100)
64 receiver = proc.spawn(sleep, 1)
66 self.assertRaises(proc.LinkedCompleted, receiver.wait)
67 receiver2 = proc.spawn(sleep, 1)
69 self.assertRaises(proc.LinkedCompleted, receiver2.wait)
72 p = proc.spawn(lambda: 100)
73 event = _event.Event()
75 self.assertEqual(event.wait(), 100)
78 event2 = _event.Event()
80 self.assertEqual(event2.wait(), 100)
82 def test_current(self):
83 p = proc.spawn(lambda: 100)
85 self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
88 class TestCase(LimitedTestCase):
90 def link(self, p, listener=None):
91 getattr(p, self.link_method)(listener)
94 LimitedTestCase.tearDown(self)
97 def set_links(self, p, first_time, kill_exc_type):
98 event = _event.Event()
105 proc_flag.append('finished')
106 receiver = proc.spawn(receiver)
107 self.link(p, receiver)
109 queue = coros.queue(1)
114 except kill_exc_type:
118 assert first_time, 'not raising here only first time'
120 callback_flag = ['initial']
121 self.link(p, lambda *args: callback_flag.remove('initial'))
124 self.link(p, _event.Event())
125 self.link(p, coros.queue(1))
126 return event, receiver, proc_flag, queue, callback_flag
128 def set_links_timeout(self, link):
129 # stuff that won't be touched
130 event = _event.Event()
133 proc_finished_flag = []
137 proc_finished_flag.append('finished')
139 myproc = proc.spawn(myproc)
142 queue = coros.queue(0)
144 return event, myproc, proc_finished_flag, queue
146 def check_timed_out(self, event, myproc, proc_finished_flag, queue):
148 assert with_timeout(DELAY, event.wait, timeout_value=X) is X
149 assert with_timeout(DELAY, queue.wait, timeout_value=X) is X
150 assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X
151 assert proc_finished_flag == [], proc_finished_flag
154 class TestReturn_link(TestCase):
157 def test_return(self):
160 p = self.p = proc.spawn(return25)
161 self._test_return(p, True, 25, proc.LinkedCompleted, lambda: sleep(0))
162 # repeating the same with dead process
164 self._test_return(p, False, 25, proc.LinkedCompleted, lambda: sleep(0))
166 def _test_return(self, p, first_time, result, kill_exc_type, action):
167 event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
169 # stuff that will time out because there's no unhandled exception:
170 xxxxx = self.set_links_timeout(p.link_exception)
174 except kill_exc_type:
175 assert first_time, 'raising here only first time'
177 assert not first_time, 'Should not raise LinkedKilled here after first time'
181 self.assertEqual(event.wait(), result)
182 self.assertEqual(queue.wait(), result)
183 self.assertRaises(kill_exc_type, receiver.wait)
184 self.assertRaises(kill_exc_type, proc.waitall, [receiver])
187 assert not proc_flag, proc_flag
188 assert not callback_flag, callback_flag
190 self.check_timed_out(*xxxxx)
193 class TestReturn_link_value(TestReturn_link):
195 link_method = 'link_value'
198 class TestRaise_link(TestCase):
201 def _test_raise(self, p, first_time, kill_exc_type):
202 event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
203 xxxxx = self.set_links_timeout(p.link_value)
207 except kill_exc_type:
208 assert first_time, 'raising here only first time'
210 assert not first_time, 'Should not raise LinkedKilled here after first time'
214 self.assertRaises(ExpectedError, event.wait)
215 self.assertRaises(ExpectedError, queue.wait)
216 self.assertRaises(kill_exc_type, receiver.wait)
217 self.assertRaises(kill_exc_type, proc.waitall, [receiver])
219 assert not proc_flag, proc_flag
220 assert not callback_flag, callback_flag
222 self.check_timed_out(*xxxxx)
225 def test_raise(self):
226 p = self.p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_raise')))
227 self._test_raise(p, True, proc.LinkedFailed)
228 # repeating the same with dead process
230 self._test_raise(p, False, proc.LinkedFailed)
232 def _test_kill(self, p, first_time, kill_exc_type):
233 event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
234 xxxxx = self.set_links_timeout(p.link_value)
239 except kill_exc_type:
240 assert first_time, 'raising here only first time'
242 assert not first_time, 'Should not raise LinkedKilled here after first time'
246 self.assertRaises(proc.ProcExit, event.wait)
247 self.assertRaises(proc.ProcExit, queue.wait)
248 self.assertRaises(kill_exc_type, proc.waitall, [receiver])
249 self.assertRaises(kill_exc_type, receiver.wait)
252 assert not proc_flag, proc_flag
253 assert not callback_flag, callback_flag
255 self.check_timed_out(*xxxxx)
259 p = self.p = proc.spawn(sleep, DELAY)
260 self._test_kill(p, True, proc.LinkedKilled)
261 # repeating the same with dead process
263 self._test_kill(p, False, proc.LinkedKilled)
266 class TestRaise_link_exception(TestRaise_link):
267 link_method = 'link_exception'
270 class TestStuff(LimitedTestCase):
272 def test_wait_noerrors(self):
273 x = proc.spawn(lambda: 1)
274 y = proc.spawn(lambda: 2)
275 z = proc.spawn(lambda: 3)
276 self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3])
279 self.assertEqual(e.wait(), 1)
283 self.assertEqual(e.wait(), 1)
284 self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]])
286 # this test is timing-sensitive
288 def test_wait_error(self):
293 z = proc.spawn(lambda: 3)
294 y = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_wait_error')))
299 self.assertRaises(ExpectedError, proc.waitall, [x, y, z])
300 self.assertRaises(proc.LinkedFailed, proc.waitall, [x])
301 self.assertEqual(proc.waitall([z]), [3])
302 self.assertRaises(ExpectedError, proc.waitall, [y])
304 def test_wait_all_exception_order(self):
305 # if there're several exceptions raised, the earliest one must be raised by wait
308 raise ExpectedError('first')
309 a = proc.spawn(first)
310 b = proc.spawn(lambda: getcurrent().throw(ExpectedError('second')))
313 except ExpectedError as ex:
314 assert 'second' in str(ex), repr(str(ex))
315 sleep(0.2) # sleep to ensure that the other timer is raised
317 def test_multiple_listeners_error(self):
318 # if there was an error while calling a callback
319 # it should not prevent the other listeners from being called
320 # also, all of the errors should be logged, check the output
321 # manually that they are
322 p = proc.spawn(lambda: 5)
325 def listener1(*args):
327 raise ExpectedError('listener1')
329 def listener2(*args):
331 raise ExpectedError('listener2')
333 def listener3(*args):
334 raise ExpectedError('listener3')
339 assert results in [[10, 20], [20, 10]], results
341 p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
347 assert results in [[10, 20], [20, 10]], results
349 def _test_multiple_listeners_error_unlink(self, p):
350 # notification must not happen after unlink even
351 # though notification process has been already started
354 def listener1(*args):
357 raise ExpectedError('listener1')
359 def listener2(*args):
362 raise ExpectedError('listener2')
364 def listener3(*args):
365 raise ExpectedError('listener3')
370 assert results == [5], results
372 def test_multiple_listeners_error_unlink_Proc(self):
373 p = proc.spawn(lambda: 5)
374 self._test_multiple_listeners_error_unlink(p)
376 def test_multiple_listeners_error_unlink_Source(self):
378 proc.spawn(p.send, 6)
379 self._test_multiple_listeners_error_unlink(p)
381 def test_killing_unlinked(self):
386 raise ExpectedError('test_killing_unlinked')
388 e.send_exception(*sys.exc_info())
389 p = proc.spawn_link(func)
393 except ExpectedError:
396 p.unlink() # this disables LinkedCompleted that otherwise would be raised by the next line
400 if __name__ == '__main__':