Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / hub_test.py
1 from __future__ import with_statement
2 import sys
3
4 import tests
5 from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless
6 from tests.patcher_test import ProcessBase
7 import time
8 import eventlet
9 from eventlet import hubs
10 from eventlet.event import Event
11 from eventlet.semaphore import Semaphore
12 from eventlet.support import greenlets, six
13
14
15 DELAY = 0.001
16
17
18 def noop():
19     pass
20
21
22 class TestTimerCleanup(LimitedTestCase):
23     TEST_TIMEOUT = 2
24
25     @skip_with_pyevent
26     def test_cancel_immediate(self):
27         hub = hubs.get_hub()
28         stimers = hub.get_timers_count()
29         scanceled = hub.timers_canceled
30         for i in six.moves.range(2000):
31             t = hubs.get_hub().schedule_call_global(60, noop)
32             t.cancel()
33             self.assert_less_than_equal(hub.timers_canceled,
34                                         hub.get_timers_count() + 1)
35         # there should be fewer than 1000 new timers and canceled
36         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
37         self.assert_less_than_equal(hub.timers_canceled, 1000)
38
39     @skip_with_pyevent
40     def test_cancel_accumulated(self):
41         hub = hubs.get_hub()
42         stimers = hub.get_timers_count()
43         scanceled = hub.timers_canceled
44         for i in six.moves.range(2000):
45             t = hubs.get_hub().schedule_call_global(60, noop)
46             eventlet.sleep()
47             self.assert_less_than_equal(hub.timers_canceled,
48                                         hub.get_timers_count() + 1)
49             t.cancel()
50             self.assert_less_than_equal(hub.timers_canceled,
51                                         hub.get_timers_count() + 1, hub.timers)
52         # there should be fewer than 1000 new timers and canceled
53         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
54         self.assert_less_than_equal(hub.timers_canceled, 1000)
55
56     @skip_with_pyevent
57     def test_cancel_proportion(self):
58         # if fewer than half the pending timers are canceled, it should
59         # not clean them out
60         hub = hubs.get_hub()
61         uncanceled_timers = []
62         stimers = hub.get_timers_count()
63         scanceled = hub.timers_canceled
64         for i in six.moves.range(1000):
65             # 2/3rds of new timers are uncanceled
66             t = hubs.get_hub().schedule_call_global(60, noop)
67             t2 = hubs.get_hub().schedule_call_global(60, noop)
68             t3 = hubs.get_hub().schedule_call_global(60, noop)
69             eventlet.sleep()
70             self.assert_less_than_equal(hub.timers_canceled,
71                                         hub.get_timers_count() + 1)
72             t.cancel()
73             self.assert_less_than_equal(hub.timers_canceled,
74                                         hub.get_timers_count() + 1)
75             uncanceled_timers.append(t2)
76             uncanceled_timers.append(t3)
77         # 3000 new timers, plus a few extras
78         self.assert_less_than_equal(stimers + 3000,
79                                     stimers + hub.get_timers_count())
80         self.assertEqual(hub.timers_canceled, 1000)
81         for t in uncanceled_timers:
82             t.cancel()
83             self.assert_less_than_equal(hub.timers_canceled,
84                                         hub.get_timers_count())
85         eventlet.sleep()
86
87
88 class TestScheduleCall(LimitedTestCase):
89
90     def test_local(self):
91         lst = [1]
92         eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
93         eventlet.sleep(0)
94         eventlet.sleep(DELAY * 2)
95         assert lst == [1], lst
96
97     def test_global(self):
98         lst = [1]
99         eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
100         eventlet.sleep(0)
101         eventlet.sleep(DELAY * 2)
102         assert lst == [], lst
103
104     def test_ordering(self):
105         lst = []
106         hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
107         hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
108         hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
109         while len(lst) < 3:
110             eventlet.sleep(DELAY)
111         self.assertEqual(lst, [1, 2, 3])
112
113
114 class TestDebug(LimitedTestCase):
115
116     def test_debug_listeners(self):
117         hubs.get_hub().set_debug_listeners(True)
118         hubs.get_hub().set_debug_listeners(False)
119
120     def test_timer_exceptions(self):
121         hubs.get_hub().set_timer_exceptions(True)
122         hubs.get_hub().set_timer_exceptions(False)
123
124
125 class TestExceptionInMainloop(LimitedTestCase):
126
127     def test_sleep(self):
128         # even if there was an error in the mainloop, the hub should continue
129         # to work
130         start = time.time()
131         eventlet.sleep(DELAY)
132         delay = time.time() - start
133
134         assert delay >= DELAY * \
135             0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
136                 delay, DELAY)
137
138         def fail():
139             1 // 0
140
141         hubs.get_hub().schedule_call_global(0, fail)
142
143         start = time.time()
144         eventlet.sleep(DELAY)
145         delay = time.time() - start
146
147         assert delay >= DELAY * \
148             0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
149                 delay, DELAY)
150
151
152 class TestExceptionInGreenthread(LimitedTestCase):
153
154     @skip_unless(greenlets.preserves_excinfo)
155     def test_exceptionpreservation(self):
156         # events for controlling execution order
157         gt1event = Event()
158         gt2event = Event()
159
160         def test_gt1():
161             try:
162                 raise KeyError()
163             except KeyError:
164                 gt1event.send('exception')
165                 gt2event.wait()
166                 assert sys.exc_info()[0] is KeyError
167                 gt1event.send('test passed')
168
169         def test_gt2():
170             gt1event.wait()
171             gt1event.reset()
172             assert sys.exc_info()[0] is None
173             try:
174                 raise ValueError()
175             except ValueError:
176                 gt2event.send('exception')
177                 gt1event.wait()
178                 assert sys.exc_info()[0] is ValueError
179
180         g1 = eventlet.spawn(test_gt1)
181         g2 = eventlet.spawn(test_gt2)
182         try:
183             g1.wait()
184             g2.wait()
185         finally:
186             g1.kill()
187             g2.kill()
188
189     def test_exceptionleaks(self):
190         # tests expected behaviour with all versions of greenlet
191         def test_gt(sem):
192             try:
193                 raise KeyError()
194             except KeyError:
195                 sem.release()
196                 hubs.get_hub().switch()
197
198         # semaphores for controlling execution order
199         sem = Semaphore()
200         sem.acquire()
201         g = eventlet.spawn(test_gt, sem)
202         try:
203             sem.acquire()
204             assert sys.exc_info()[0] is None
205         finally:
206             g.kill()
207
208
209 class TestHubSelection(LimitedTestCase):
210
211     def test_explicit_hub(self):
212         oldhub = hubs.get_hub()
213         try:
214             hubs.use_hub(Foo)
215             assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
216         finally:
217             hubs._threadlocal.hub = oldhub
218
219
220 class TestHubBlockingDetector(LimitedTestCase):
221     TEST_TIMEOUT = 10
222
223     @skip_with_pyevent
224     def test_block_detect(self):
225         def look_im_blocking():
226             import time
227             time.sleep(2)
228         from eventlet import debug
229         debug.hub_blocking_detection(True)
230         gt = eventlet.spawn(look_im_blocking)
231         self.assertRaises(RuntimeError, gt.wait)
232         debug.hub_blocking_detection(False)
233
234     @skip_with_pyevent
235     @skip_if_no_itimer
236     def test_block_detect_with_itimer(self):
237         def look_im_blocking():
238             import time
239             time.sleep(0.5)
240
241         from eventlet import debug
242         debug.hub_blocking_detection(True, resolution=0.1)
243         gt = eventlet.spawn(look_im_blocking)
244         self.assertRaises(RuntimeError, gt.wait)
245         debug.hub_blocking_detection(False)
246
247
248 class TestSuspend(LimitedTestCase):
249     TEST_TIMEOUT = 3
250     longMessage = True
251     maxDiff = None
252
253     def test_suspend_doesnt_crash(self):
254         import os
255         import shutil
256         import signal
257         import subprocess
258         import sys
259         import tempfile
260         self.tempdir = tempfile.mkdtemp('test_suspend')
261         filename = os.path.join(self.tempdir, 'test_suspend.py')
262         fd = open(filename, "w")
263         fd.write("""import eventlet
264 eventlet.Timeout(0.5)
265 try:
266    eventlet.listen(("127.0.0.1", 0)).accept()
267 except eventlet.Timeout:
268    print("exited correctly")
269 """)
270         fd.close()
271         python_path = os.pathsep.join(sys.path + [self.tempdir])
272         new_env = os.environ.copy()
273         new_env['PYTHONPATH'] = python_path
274         p = subprocess.Popen([sys.executable,
275                               os.path.join(self.tempdir, filename)],
276                              stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
277         eventlet.sleep(0.4)  # wait for process to hit accept
278         os.kill(p.pid, signal.SIGSTOP)  # suspend and resume to generate EINTR
279         os.kill(p.pid, signal.SIGCONT)
280         output, _ = p.communicate()
281         lines = output.decode('utf-8', 'replace').splitlines()
282         assert "exited correctly" in lines[-1], output
283         shutil.rmtree(self.tempdir)
284
285
286 class TestBadFilenos(LimitedTestCase):
287
288     @skip_with_pyevent
289     def test_repeated_selects(self):
290         from eventlet.green import select
291         self.assertRaises(ValueError, select.select, [-1], [], [])
292         self.assertRaises(ValueError, select.select, [-1], [], [])
293
294
295 class TestFork(LimitedTestCase):
296
297     @skip_with_pyevent
298     def test_fork(self):
299         output = tests.run_python('tests/hub_test_fork.py')
300         lines = output.splitlines()
301         self.assertEqual(lines, ["accept blocked", "child died ok"], output)
302
303
304 class TestDeadRunLoop(LimitedTestCase):
305     TEST_TIMEOUT = 2
306
307     class CustomException(Exception):
308         pass
309
310     def test_kill(self):
311         """ Checks that killing a process after the hub runloop dies does
312         not immediately return to hub greenlet's parent and schedule a
313         redundant timer. """
314         hub = hubs.get_hub()
315
316         def dummyproc():
317             hub.switch()
318
319         g = eventlet.spawn(dummyproc)
320         eventlet.sleep(0)  # let dummyproc run
321         assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
322         self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
323                           KeyboardInterrupt())
324
325         # kill dummyproc, this schedules a timer to return execution to
326         # this greenlet before throwing an exception in dummyproc.
327         # it is from this timer that execution should be returned to this
328         # greenlet, and not by propogating of the terminating greenlet.
329         g.kill()
330         with eventlet.Timeout(0.5, self.CustomException()):
331             # we now switch to the hub, there should be no existing timers
332             # that switch back to this greenlet and so this hub.switch()
333             # call should block indefinately.
334             self.assertRaises(self.CustomException, hub.switch)
335
336     def test_parent(self):
337         """ Checks that a terminating greenthread whose parent
338         was a previous, now-defunct hub greenlet returns execution to
339         the hub runloop and not the hub greenlet's parent. """
340         hub = hubs.get_hub()
341
342         def dummyproc():
343             pass
344
345         g = eventlet.spawn(dummyproc)
346         assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
347         self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
348                           KeyboardInterrupt())
349
350         assert not g.dead  # check dummyproc hasn't completed
351         with eventlet.Timeout(0.5, self.CustomException()):
352             # we now switch to the hub which will allow
353             # completion of dummyproc.
354             # this should return execution back to the runloop and not
355             # this greenlet so that hub.switch() would block indefinately.
356             self.assertRaises(self.CustomException, hub.switch)
357         assert g.dead  # sanity check that dummyproc has completed
358
359
360 class Foo(object):
361     pass
362
363
364 class TestDefaultHub(ProcessBase):
365
366     def test_kqueue_unsupported(self):
367         # https://github.com/eventlet/eventlet/issues/38
368         # get_hub on windows broken by kqueue
369         module_source = r'''
370 from __future__ import print_function
371
372 # Simulate absence of kqueue even on platforms that support it.
373 import select
374 try:
375     del select.kqueue
376 except AttributeError:
377     pass
378
379 import __builtin__
380 original_import = __builtin__.__import__
381
382 def fail_import(name, *args, **kwargs):
383     if 'epoll' in name:
384         raise ImportError('disabled for test')
385     if 'kqueue' in name:
386         print('kqueue tried')
387     return original_import(name, *args, **kwargs)
388
389 __builtin__.__import__ = fail_import
390
391
392 import eventlet.hubs
393 eventlet.hubs.get_default_hub()
394 print('ok')
395 '''
396         self.write_to_tempfile('newmod', module_source)
397         output, _ = self.launch_subprocess('newmod.py')
398         self.assertEqual(output, 'kqueue tried\nok\n')
399
400
401 if __name__ == '__main__':
402     main()