Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / hub_test.py
1 from __future__ import with_statement
2 import sys
3
4 import tests
5 from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless
6 from tests.patcher_test import ProcessBase
7 import time
8 import eventlet
9 from eventlet import hubs
10 from eventlet.event import Event
11 from eventlet.semaphore import Semaphore
12 from eventlet.support import greenlets, six
13
14
15 DELAY = 0.001
16
17
18 def noop():
19     pass
20
21
22 class TestTimerCleanup(LimitedTestCase):
23     TEST_TIMEOUT = 2
24
25     @skip_with_pyevent
26     def test_cancel_immediate(self):
27         hub = hubs.get_hub()
28         stimers = hub.get_timers_count()
29         scanceled = hub.timers_canceled
30         for i in six.moves.range(2000):
31             t = hubs.get_hub().schedule_call_global(60, noop)
32             t.cancel()
33             self.assert_less_than_equal(hub.timers_canceled,
34                                         hub.get_timers_count() + 1)
35         # there should be fewer than 1000 new timers and canceled
36         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
37         self.assert_less_than_equal(hub.timers_canceled, 1000)
38
39     @skip_with_pyevent
40     def test_cancel_accumulated(self):
41         hub = hubs.get_hub()
42         stimers = hub.get_timers_count()
43         scanceled = hub.timers_canceled
44         for i in six.moves.range(2000):
45             t = hubs.get_hub().schedule_call_global(60, noop)
46             eventlet.sleep()
47             self.assert_less_than_equal(hub.timers_canceled,
48                                         hub.get_timers_count() + 1)
49             t.cancel()
50             self.assert_less_than_equal(hub.timers_canceled,
51                                         hub.get_timers_count() + 1, hub.timers)
52         # there should be fewer than 1000 new timers and canceled
53         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
54         self.assert_less_than_equal(hub.timers_canceled, 1000)
55
56     @skip_with_pyevent
57     def test_cancel_proportion(self):
58         # if fewer than half the pending timers are canceled, it should
59         # not clean them out
60         hub = hubs.get_hub()
61         uncanceled_timers = []
62         stimers = hub.get_timers_count()
63         scanceled = hub.timers_canceled
64         for i in six.moves.range(1000):
65             # 2/3rds of new timers are uncanceled
66             t = hubs.get_hub().schedule_call_global(60, noop)
67             t2 = hubs.get_hub().schedule_call_global(60, noop)
68             t3 = hubs.get_hub().schedule_call_global(60, noop)
69             eventlet.sleep()
70             self.assert_less_than_equal(hub.timers_canceled,
71                                         hub.get_timers_count() + 1)
72             t.cancel()
73             self.assert_less_than_equal(hub.timers_canceled,
74                                         hub.get_timers_count() + 1)
75             uncanceled_timers.append(t2)
76             uncanceled_timers.append(t3)
77         # 3000 new timers, plus a few extras
78         self.assert_less_than_equal(stimers + 3000,
79                                     stimers + hub.get_timers_count())
80         self.assertEqual(hub.timers_canceled, 1000)
81         for t in uncanceled_timers:
82             t.cancel()
83             self.assert_less_than_equal(hub.timers_canceled,
84                                         hub.get_timers_count())
85         eventlet.sleep()
86
87
88 class TestScheduleCall(LimitedTestCase):
89
90     def test_local(self):
91         lst = [1]
92         eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
93         eventlet.sleep(0)
94         eventlet.sleep(DELAY * 2)
95         assert lst == [1], lst
96
97     def test_global(self):
98         lst = [1]
99         eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
100         eventlet.sleep(0)
101         eventlet.sleep(DELAY * 2)
102         assert lst == [], lst
103
104     def test_ordering(self):
105         lst = []
106         hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
107         hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
108         hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
109         while len(lst) < 3:
110             eventlet.sleep(DELAY)
111         self.assertEqual(lst, [1, 2, 3])
112
113
114 class TestDebug(LimitedTestCase):
115
116     def test_debug_listeners(self):
117         hubs.get_hub().set_debug_listeners(True)
118         hubs.get_hub().set_debug_listeners(False)
119
120     def test_timer_exceptions(self):
121         hubs.get_hub().set_timer_exceptions(True)
122         hubs.get_hub().set_timer_exceptions(False)
123
124
125 class TestExceptionInMainloop(LimitedTestCase):
126
127     def test_sleep(self):
128         # even if there was an error in the mainloop, the hub should continue
129         # to work
130         start = time.time()
131         eventlet.sleep(DELAY)
132         delay = time.time() - start
133
134         assert delay >= DELAY * \
135             0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
136                 delay, DELAY)
137
138         def fail():
139             1 // 0
140
141         hubs.get_hub().schedule_call_global(0, fail)
142
143         start = time.time()
144         eventlet.sleep(DELAY)
145         delay = time.time() - start
146
147         assert delay >= DELAY * \
148             0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
149                 delay, DELAY)
150
151
152 class TestExceptionInGreenthread(LimitedTestCase):
153
154     @skip_unless(greenlets.preserves_excinfo)
155     def test_exceptionpreservation(self):
156         # events for controlling execution order
157         gt1event = Event()
158         gt2event = Event()
159
160         def test_gt1():
161             try:
162                 raise KeyError()
163             except KeyError:
164                 gt1event.send('exception')
165                 gt2event.wait()
166                 assert sys.exc_info()[0] is KeyError
167                 gt1event.send('test passed')
168
169         def test_gt2():
170             gt1event.wait()
171             gt1event.reset()
172             assert sys.exc_info()[0] is None
173             try:
174                 raise ValueError()
175             except ValueError:
176                 gt2event.send('exception')
177                 gt1event.wait()
178                 assert sys.exc_info()[0] is ValueError
179
180         g1 = eventlet.spawn(test_gt1)
181         g2 = eventlet.spawn(test_gt2)
182         try:
183             g1.wait()
184             g2.wait()
185         finally:
186             g1.kill()
187             g2.kill()
188
189     def test_exceptionleaks(self):
190         # tests expected behaviour with all versions of greenlet
191         def test_gt(sem):
192             try:
193                 raise KeyError()
194             except KeyError:
195                 sem.release()
196                 hubs.get_hub().switch()
197
198         # semaphores for controlling execution order
199         sem = Semaphore()
200         sem.acquire()
201         g = eventlet.spawn(test_gt, sem)
202         try:
203             sem.acquire()
204             assert sys.exc_info()[0] is None
205         finally:
206             g.kill()
207
208
209 class TestHubSelection(LimitedTestCase):
210
211     def test_explicit_hub(self):
212         if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
213             # doesn't work with twisted
214             return
215         oldhub = hubs.get_hub()
216         try:
217             hubs.use_hub(Foo)
218             assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
219         finally:
220             hubs._threadlocal.hub = oldhub
221
222
223 class TestHubBlockingDetector(LimitedTestCase):
224     TEST_TIMEOUT = 10
225
226     @skip_with_pyevent
227     def test_block_detect(self):
228         def look_im_blocking():
229             import time
230             time.sleep(2)
231         from eventlet import debug
232         debug.hub_blocking_detection(True)
233         gt = eventlet.spawn(look_im_blocking)
234         self.assertRaises(RuntimeError, gt.wait)
235         debug.hub_blocking_detection(False)
236
237     @skip_with_pyevent
238     @skip_if_no_itimer
239     def test_block_detect_with_itimer(self):
240         def look_im_blocking():
241             import time
242             time.sleep(0.5)
243
244         from eventlet import debug
245         debug.hub_blocking_detection(True, resolution=0.1)
246         gt = eventlet.spawn(look_im_blocking)
247         self.assertRaises(RuntimeError, gt.wait)
248         debug.hub_blocking_detection(False)
249
250
251 class TestSuspend(LimitedTestCase):
252     TEST_TIMEOUT = 3
253     longMessage = True
254     maxDiff = None
255
256     def test_suspend_doesnt_crash(self):
257         import os
258         import shutil
259         import signal
260         import subprocess
261         import sys
262         import tempfile
263         self.tempdir = tempfile.mkdtemp('test_suspend')
264         filename = os.path.join(self.tempdir, 'test_suspend.py')
265         fd = open(filename, "w")
266         fd.write("""import eventlet
267 eventlet.Timeout(0.5)
268 try:
269    eventlet.listen(("127.0.0.1", 0)).accept()
270 except eventlet.Timeout:
271    print("exited correctly")
272 """)
273         fd.close()
274         python_path = os.pathsep.join(sys.path + [self.tempdir])
275         new_env = os.environ.copy()
276         new_env['PYTHONPATH'] = python_path
277         p = subprocess.Popen([sys.executable,
278                               os.path.join(self.tempdir, filename)],
279                              stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
280         eventlet.sleep(0.4)  # wait for process to hit accept
281         os.kill(p.pid, signal.SIGSTOP)  # suspend and resume to generate EINTR
282         os.kill(p.pid, signal.SIGCONT)
283         output, _ = p.communicate()
284         lines = output.decode('utf-8', 'replace').splitlines()
285         assert "exited correctly" in lines[-1], output
286         shutil.rmtree(self.tempdir)
287
288
289 class TestBadFilenos(LimitedTestCase):
290
291     @skip_with_pyevent
292     def test_repeated_selects(self):
293         from eventlet.green import select
294         self.assertRaises(ValueError, select.select, [-1], [], [])
295         self.assertRaises(ValueError, select.select, [-1], [], [])
296
297
298 class TestFork(LimitedTestCase):
299
300     @skip_with_pyevent
301     def test_fork(self):
302         output = tests.run_python('tests/hub_test_fork.py')
303         lines = output.splitlines()
304         self.assertEqual(lines, ["accept blocked", "child died ok"], output)
305
306
307 class TestDeadRunLoop(LimitedTestCase):
308     TEST_TIMEOUT = 2
309
310     class CustomException(Exception):
311         pass
312
313     def test_kill(self):
314         """ Checks that killing a process after the hub runloop dies does
315         not immediately return to hub greenlet's parent and schedule a
316         redundant timer. """
317         hub = hubs.get_hub()
318
319         def dummyproc():
320             hub.switch()
321
322         g = eventlet.spawn(dummyproc)
323         eventlet.sleep(0)  # let dummyproc run
324         assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
325         self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
326                           KeyboardInterrupt())
327
328         # kill dummyproc, this schedules a timer to return execution to
329         # this greenlet before throwing an exception in dummyproc.
330         # it is from this timer that execution should be returned to this
331         # greenlet, and not by propogating of the terminating greenlet.
332         g.kill()
333         with eventlet.Timeout(0.5, self.CustomException()):
334             # we now switch to the hub, there should be no existing timers
335             # that switch back to this greenlet and so this hub.switch()
336             # call should block indefinately.
337             self.assertRaises(self.CustomException, hub.switch)
338
339     def test_parent(self):
340         """ Checks that a terminating greenthread whose parent
341         was a previous, now-defunct hub greenlet returns execution to
342         the hub runloop and not the hub greenlet's parent. """
343         hub = hubs.get_hub()
344
345         def dummyproc():
346             pass
347
348         g = eventlet.spawn(dummyproc)
349         assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
350         self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
351                           KeyboardInterrupt())
352
353         assert not g.dead  # check dummyproc hasn't completed
354         with eventlet.Timeout(0.5, self.CustomException()):
355             # we now switch to the hub which will allow
356             # completion of dummyproc.
357             # this should return execution back to the runloop and not
358             # this greenlet so that hub.switch() would block indefinately.
359             self.assertRaises(self.CustomException, hub.switch)
360         assert g.dead  # sanity check that dummyproc has completed
361
362
363 class Foo(object):
364     pass
365
366
367 class TestDefaultHub(ProcessBase):
368
369     def test_kqueue_unsupported(self):
370         # https://github.com/eventlet/eventlet/issues/38
371         # get_hub on windows broken by kqueue
372         module_source = r'''
373 from __future__ import print_function
374
375 # Simulate absence of kqueue even on platforms that support it.
376 import select
377 try:
378     del select.kqueue
379 except AttributeError:
380     pass
381
382 import __builtin__
383 original_import = __builtin__.__import__
384
385 def fail_import(name, *args, **kwargs):
386     if 'epoll' in name:
387         raise ImportError('disabled for test')
388     if 'kqueue' in name:
389         print('kqueue tried')
390     return original_import(name, *args, **kwargs)
391
392 __builtin__.__import__ = fail_import
393
394
395 import eventlet.hubs
396 eventlet.hubs.get_default_hub()
397 print('ok')
398 '''
399         self.write_to_tempfile('newmod', module_source)
400         output, _ = self.launch_subprocess('newmod.py')
401         self.assertEqual(output, 'kqueue tried\nok\n')
402
403
404 if __name__ == '__main__':
405     main()