Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / test__proc.py
1 import sys
2 import unittest
3 import warnings
4 warnings.simplefilter('ignore', DeprecationWarning)
5 from eventlet import proc
6 warnings.simplefilter('default', DeprecationWarning)
7 from eventlet import coros
8 from eventlet import event as _event
9 from eventlet import Timeout, sleep, getcurrent, with_timeout
10 from tests import LimitedTestCase, skipped, silence_warnings
11
12 DELAY = 0.01
13
14
15 class ExpectedError(Exception):
16     pass
17
18
19 class TestLink_Signal(LimitedTestCase):
20
21     @silence_warnings
22     def test_send(self):
23         s = proc.Source()
24         q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
25         s.link_value(q1)
26         self.assertRaises(Timeout, s.wait, 0)
27         assert s.wait(0, None) is None
28         assert s.wait(0.001, None) is None
29         self.assertRaises(Timeout, s.wait, 0.001)
30         s.send(1)
31         assert not q1.ready()
32         assert s.wait() == 1
33         sleep(0)
34         assert q1.ready()
35         s.link_exception(q2)
36         s.link(q3)
37         assert not q2.ready()
38         sleep(0)
39         assert q3.ready()
40         assert s.wait() == 1
41
42     @silence_warnings
43     def test_send_exception(self):
44         s = proc.Source()
45         q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
46         s.link_exception(q1)
47         s.send_exception(OSError('hello'))
48         sleep(0)
49         assert q1.ready()
50         s.link_value(q2)
51         s.link(q3)
52         assert not q2.ready()
53         sleep(0)
54         assert q3.ready()
55         self.assertRaises(OSError, q1.wait)
56         self.assertRaises(OSError, q3.wait)
57         self.assertRaises(OSError, s.wait)
58
59
60 class TestProc(LimitedTestCase):
61
62     def test_proc(self):
63         p = proc.spawn(lambda: 100)
64         receiver = proc.spawn(sleep, 1)
65         p.link(receiver)
66         self.assertRaises(proc.LinkedCompleted, receiver.wait)
67         receiver2 = proc.spawn(sleep, 1)
68         p.link(receiver2)
69         self.assertRaises(proc.LinkedCompleted, receiver2.wait)
70
71     def test_event(self):
72         p = proc.spawn(lambda: 100)
73         event = _event.Event()
74         p.link(event)
75         self.assertEqual(event.wait(), 100)
76
77         for i in range(3):
78             event2 = _event.Event()
79             p.link(event2)
80             self.assertEqual(event2.wait(), 100)
81
82     def test_current(self):
83         p = proc.spawn(lambda: 100)
84         p.link()
85         self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
86
87
88 class TestCase(LimitedTestCase):
89
90     def link(self, p, listener=None):
91         getattr(p, self.link_method)(listener)
92
93     def tearDown(self):
94         LimitedTestCase.tearDown(self)
95         self.p.unlink()
96
97     def set_links(self, p, first_time, kill_exc_type):
98         event = _event.Event()
99         self.link(p, event)
100
101         proc_flag = []
102
103         def receiver():
104             sleep(DELAY)
105             proc_flag.append('finished')
106         receiver = proc.spawn(receiver)
107         self.link(p, receiver)
108
109         queue = coros.queue(1)
110         self.link(p, queue)
111
112         try:
113             self.link(p)
114         except kill_exc_type:
115             if first_time:
116                 raise
117         else:
118             assert first_time, 'not raising here only first time'
119
120         callback_flag = ['initial']
121         self.link(p, lambda *args: callback_flag.remove('initial'))
122
123         for _ in range(10):
124             self.link(p, _event.Event())
125             self.link(p, coros.queue(1))
126         return event, receiver, proc_flag, queue, callback_flag
127
128     def set_links_timeout(self, link):
129         # stuff that won't be touched
130         event = _event.Event()
131         link(event)
132
133         proc_finished_flag = []
134
135         def myproc():
136             sleep(10)
137             proc_finished_flag.append('finished')
138             return 555
139         myproc = proc.spawn(myproc)
140         link(myproc)
141
142         queue = coros.queue(0)
143         link(queue)
144         return event, myproc, proc_finished_flag, queue
145
146     def check_timed_out(self, event, myproc, proc_finished_flag, queue):
147         X = object()
148         assert with_timeout(DELAY, event.wait, timeout_value=X) is X
149         assert with_timeout(DELAY, queue.wait, timeout_value=X) is X
150         assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X
151         assert proc_finished_flag == [], proc_finished_flag
152
153
154 class TestReturn_link(TestCase):
155     link_method = 'link'
156
157     def test_return(self):
158         def return25():
159             return 25
160         p = self.p = proc.spawn(return25)
161         self._test_return(p, True, 25, proc.LinkedCompleted, lambda: sleep(0))
162         # repeating the same with dead process
163         for _ in range(3):
164             self._test_return(p, False, 25, proc.LinkedCompleted, lambda: sleep(0))
165
166     def _test_return(self, p, first_time, result, kill_exc_type, action):
167         event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
168
169         # stuff that will time out because there's no unhandled exception:
170         xxxxx = self.set_links_timeout(p.link_exception)
171
172         try:
173             sleep(DELAY * 2)
174         except kill_exc_type:
175             assert first_time, 'raising here only first time'
176         else:
177             assert not first_time, 'Should not raise LinkedKilled here after first time'
178
179         assert not p, p
180
181         self.assertEqual(event.wait(), result)
182         self.assertEqual(queue.wait(), result)
183         self.assertRaises(kill_exc_type, receiver.wait)
184         self.assertRaises(kill_exc_type, proc.waitall, [receiver])
185
186         sleep(DELAY)
187         assert not proc_flag, proc_flag
188         assert not callback_flag, callback_flag
189
190         self.check_timed_out(*xxxxx)
191
192
193 class TestReturn_link_value(TestReturn_link):
194     sync = False
195     link_method = 'link_value'
196
197
198 class TestRaise_link(TestCase):
199     link_method = 'link'
200
201     def _test_raise(self, p, first_time, kill_exc_type):
202         event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
203         xxxxx = self.set_links_timeout(p.link_value)
204
205         try:
206             sleep(DELAY)
207         except kill_exc_type:
208             assert first_time, 'raising here only first time'
209         else:
210             assert not first_time, 'Should not raise LinkedKilled here after first time'
211
212         assert not p, p
213
214         self.assertRaises(ExpectedError, event.wait)
215         self.assertRaises(ExpectedError, queue.wait)
216         self.assertRaises(kill_exc_type, receiver.wait)
217         self.assertRaises(kill_exc_type, proc.waitall, [receiver])
218         sleep(DELAY)
219         assert not proc_flag, proc_flag
220         assert not callback_flag, callback_flag
221
222         self.check_timed_out(*xxxxx)
223
224     @silence_warnings
225     def test_raise(self):
226         p = self.p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_raise')))
227         self._test_raise(p, True, proc.LinkedFailed)
228         # repeating the same with dead process
229         for _ in range(3):
230             self._test_raise(p, False, proc.LinkedFailed)
231
232     def _test_kill(self, p, first_time, kill_exc_type):
233         event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
234         xxxxx = self.set_links_timeout(p.link_value)
235
236         p.kill()
237         try:
238             sleep(DELAY)
239         except kill_exc_type:
240             assert first_time, 'raising here only first time'
241         else:
242             assert not first_time, 'Should not raise LinkedKilled here after first time'
243
244         assert not p, p
245
246         self.assertRaises(proc.ProcExit, event.wait)
247         self.assertRaises(proc.ProcExit, queue.wait)
248         self.assertRaises(kill_exc_type, proc.waitall, [receiver])
249         self.assertRaises(kill_exc_type, receiver.wait)
250
251         sleep(DELAY)
252         assert not proc_flag, proc_flag
253         assert not callback_flag, callback_flag
254
255         self.check_timed_out(*xxxxx)
256
257     @silence_warnings
258     def test_kill(self):
259         p = self.p = proc.spawn(sleep, DELAY)
260         self._test_kill(p, True, proc.LinkedKilled)
261         # repeating the same with dead process
262         for _ in range(3):
263             self._test_kill(p, False, proc.LinkedKilled)
264
265
266 class TestRaise_link_exception(TestRaise_link):
267     link_method = 'link_exception'
268
269
270 class TestStuff(LimitedTestCase):
271
272     def test_wait_noerrors(self):
273         x = proc.spawn(lambda: 1)
274         y = proc.spawn(lambda: 2)
275         z = proc.spawn(lambda: 3)
276         self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3])
277         e = _event.Event()
278         x.link(e)
279         self.assertEqual(e.wait(), 1)
280         x.unlink(e)
281         e = _event.Event()
282         x.link(e)
283         self.assertEqual(e.wait(), 1)
284         self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]])
285
286     # this test is timing-sensitive
287     @skipped
288     def test_wait_error(self):
289         def x():
290             sleep(DELAY)
291             return 1
292         x = proc.spawn(x)
293         z = proc.spawn(lambda: 3)
294         y = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_wait_error')))
295         y.link(x)
296         x.link(y)
297         y.link(z)
298         z.link(y)
299         self.assertRaises(ExpectedError, proc.waitall, [x, y, z])
300         self.assertRaises(proc.LinkedFailed, proc.waitall, [x])
301         self.assertEqual(proc.waitall([z]), [3])
302         self.assertRaises(ExpectedError, proc.waitall, [y])
303
304     def test_wait_all_exception_order(self):
305         # if there're several exceptions raised, the earliest one must be raised by wait
306         def first():
307             sleep(0.1)
308             raise ExpectedError('first')
309         a = proc.spawn(first)
310         b = proc.spawn(lambda: getcurrent().throw(ExpectedError('second')))
311         try:
312             proc.waitall([a, b])
313         except ExpectedError as ex:
314             assert 'second' in str(ex), repr(str(ex))
315         sleep(0.2)   # sleep to ensure that the other timer is raised
316
317     def test_multiple_listeners_error(self):
318         # if there was an error while calling a callback
319         # it should not prevent the other listeners from being called
320         # also, all of the errors should be logged, check the output
321         # manually that they are
322         p = proc.spawn(lambda: 5)
323         results = []
324
325         def listener1(*args):
326             results.append(10)
327             raise ExpectedError('listener1')
328
329         def listener2(*args):
330             results.append(20)
331             raise ExpectedError('listener2')
332
333         def listener3(*args):
334             raise ExpectedError('listener3')
335         p.link(listener1)
336         p.link(listener2)
337         p.link(listener3)
338         sleep(DELAY * 10)
339         assert results in [[10, 20], [20, 10]], results
340
341         p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
342         results = []
343         p.link(listener1)
344         p.link(listener2)
345         p.link(listener3)
346         sleep(DELAY * 10)
347         assert results in [[10, 20], [20, 10]], results
348
349     def _test_multiple_listeners_error_unlink(self, p):
350         # notification must not happen after unlink even
351         # though notification process has been already started
352         results = []
353
354         def listener1(*args):
355             p.unlink(listener2)
356             results.append(5)
357             raise ExpectedError('listener1')
358
359         def listener2(*args):
360             p.unlink(listener1)
361             results.append(5)
362             raise ExpectedError('listener2')
363
364         def listener3(*args):
365             raise ExpectedError('listener3')
366         p.link(listener1)
367         p.link(listener2)
368         p.link(listener3)
369         sleep(DELAY * 10)
370         assert results == [5], results
371
372     def test_multiple_listeners_error_unlink_Proc(self):
373         p = proc.spawn(lambda: 5)
374         self._test_multiple_listeners_error_unlink(p)
375
376     def test_multiple_listeners_error_unlink_Source(self):
377         p = proc.Source()
378         proc.spawn(p.send, 6)
379         self._test_multiple_listeners_error_unlink(p)
380
381     def test_killing_unlinked(self):
382         e = _event.Event()
383
384         def func():
385             try:
386                 raise ExpectedError('test_killing_unlinked')
387             except:
388                 e.send_exception(*sys.exc_info())
389         p = proc.spawn_link(func)
390         try:
391             try:
392                 e.wait()
393             except ExpectedError:
394                 pass
395         finally:
396             p.unlink()  # this disables LinkedCompleted that otherwise would be raised by the next line
397         sleep(DELAY)
398
399
400 if __name__ == '__main__':
401     unittest.main()