1 from __future__ import with_statement
5 from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless
6 from tests.patcher_test import ProcessBase
9 from eventlet import hubs
10 from eventlet.event import Event
11 from eventlet.semaphore import Semaphore
12 from eventlet.support import greenlets, six
22 class TestTimerCleanup(LimitedTestCase):
26 def test_cancel_immediate(self):
28 stimers = hub.get_timers_count()
29 scanceled = hub.timers_canceled
30 for i in six.moves.range(2000):
31 t = hubs.get_hub().schedule_call_global(60, noop)
33 self.assert_less_than_equal(hub.timers_canceled,
34 hub.get_timers_count() + 1)
35 # there should be fewer than 1000 new timers and canceled
36 self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
37 self.assert_less_than_equal(hub.timers_canceled, 1000)
40 def test_cancel_accumulated(self):
42 stimers = hub.get_timers_count()
43 scanceled = hub.timers_canceled
44 for i in six.moves.range(2000):
45 t = hubs.get_hub().schedule_call_global(60, noop)
47 self.assert_less_than_equal(hub.timers_canceled,
48 hub.get_timers_count() + 1)
50 self.assert_less_than_equal(hub.timers_canceled,
51 hub.get_timers_count() + 1, hub.timers)
52 # there should be fewer than 1000 new timers and canceled
53 self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
54 self.assert_less_than_equal(hub.timers_canceled, 1000)
57 def test_cancel_proportion(self):
58 # if fewer than half the pending timers are canceled, it should
61 uncanceled_timers = []
62 stimers = hub.get_timers_count()
63 scanceled = hub.timers_canceled
64 for i in six.moves.range(1000):
65 # 2/3rds of new timers are uncanceled
66 t = hubs.get_hub().schedule_call_global(60, noop)
67 t2 = hubs.get_hub().schedule_call_global(60, noop)
68 t3 = hubs.get_hub().schedule_call_global(60, noop)
70 self.assert_less_than_equal(hub.timers_canceled,
71 hub.get_timers_count() + 1)
73 self.assert_less_than_equal(hub.timers_canceled,
74 hub.get_timers_count() + 1)
75 uncanceled_timers.append(t2)
76 uncanceled_timers.append(t3)
77 # 3000 new timers, plus a few extras
78 self.assert_less_than_equal(stimers + 3000,
79 stimers + hub.get_timers_count())
80 self.assertEqual(hub.timers_canceled, 1000)
81 for t in uncanceled_timers:
83 self.assert_less_than_equal(hub.timers_canceled,
84 hub.get_timers_count())
88 class TestScheduleCall(LimitedTestCase):
92 eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
94 eventlet.sleep(DELAY * 2)
95 assert lst == [1], lst
97 def test_global(self):
99 eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
101 eventlet.sleep(DELAY * 2)
102 assert lst == [], lst
104 def test_ordering(self):
106 hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
107 hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
108 hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
110 eventlet.sleep(DELAY)
111 self.assertEqual(lst, [1, 2, 3])
114 class TestDebug(LimitedTestCase):
116 def test_debug_listeners(self):
117 hubs.get_hub().set_debug_listeners(True)
118 hubs.get_hub().set_debug_listeners(False)
120 def test_timer_exceptions(self):
121 hubs.get_hub().set_timer_exceptions(True)
122 hubs.get_hub().set_timer_exceptions(False)
125 class TestExceptionInMainloop(LimitedTestCase):
127 def test_sleep(self):
128 # even if there was an error in the mainloop, the hub should continue
131 eventlet.sleep(DELAY)
132 delay = time.time() - start
134 assert delay >= DELAY * \
135 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
141 hubs.get_hub().schedule_call_global(0, fail)
144 eventlet.sleep(DELAY)
145 delay = time.time() - start
147 assert delay >= DELAY * \
148 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
152 class TestExceptionInGreenthread(LimitedTestCase):
154 @skip_unless(greenlets.preserves_excinfo)
155 def test_exceptionpreservation(self):
156 # events for controlling execution order
164 gt1event.send('exception')
166 assert sys.exc_info()[0] is KeyError
167 gt1event.send('test passed')
172 assert sys.exc_info()[0] is None
176 gt2event.send('exception')
178 assert sys.exc_info()[0] is ValueError
180 g1 = eventlet.spawn(test_gt1)
181 g2 = eventlet.spawn(test_gt2)
189 def test_exceptionleaks(self):
190 # tests expected behaviour with all versions of greenlet
196 hubs.get_hub().switch()
198 # semaphores for controlling execution order
201 g = eventlet.spawn(test_gt, sem)
204 assert sys.exc_info()[0] is None
209 class TestHubSelection(LimitedTestCase):
211 def test_explicit_hub(self):
212 if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
213 # doesn't work with twisted
215 oldhub = hubs.get_hub()
218 assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
220 hubs._threadlocal.hub = oldhub
223 class TestHubBlockingDetector(LimitedTestCase):
227 def test_block_detect(self):
228 def look_im_blocking():
231 from eventlet import debug
232 debug.hub_blocking_detection(True)
233 gt = eventlet.spawn(look_im_blocking)
234 self.assertRaises(RuntimeError, gt.wait)
235 debug.hub_blocking_detection(False)
239 def test_block_detect_with_itimer(self):
240 def look_im_blocking():
244 from eventlet import debug
245 debug.hub_blocking_detection(True, resolution=0.1)
246 gt = eventlet.spawn(look_im_blocking)
247 self.assertRaises(RuntimeError, gt.wait)
248 debug.hub_blocking_detection(False)
251 class TestSuspend(LimitedTestCase):
256 def test_suspend_doesnt_crash(self):
263 self.tempdir = tempfile.mkdtemp('test_suspend')
264 filename = os.path.join(self.tempdir, 'test_suspend.py')
265 fd = open(filename, "w")
266 fd.write("""import eventlet
267 eventlet.Timeout(0.5)
269 eventlet.listen(("127.0.0.1", 0)).accept()
270 except eventlet.Timeout:
271 print("exited correctly")
274 python_path = os.pathsep.join(sys.path + [self.tempdir])
275 new_env = os.environ.copy()
276 new_env['PYTHONPATH'] = python_path
277 p = subprocess.Popen([sys.executable,
278 os.path.join(self.tempdir, filename)],
279 stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
280 eventlet.sleep(0.4) # wait for process to hit accept
281 os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
282 os.kill(p.pid, signal.SIGCONT)
283 output, _ = p.communicate()
284 lines = output.decode('utf-8', 'replace').splitlines()
285 assert "exited correctly" in lines[-1], output
286 shutil.rmtree(self.tempdir)
289 class TestBadFilenos(LimitedTestCase):
292 def test_repeated_selects(self):
293 from eventlet.green import select
294 self.assertRaises(ValueError, select.select, [-1], [], [])
295 self.assertRaises(ValueError, select.select, [-1], [], [])
298 class TestFork(LimitedTestCase):
302 output = tests.run_python('tests/hub_test_fork.py')
303 lines = output.splitlines()
304 self.assertEqual(lines, ["accept blocked", "child died ok"], output)
307 class TestDeadRunLoop(LimitedTestCase):
310 class CustomException(Exception):
314 """ Checks that killing a process after the hub runloop dies does
315 not immediately return to hub greenlet's parent and schedule a
322 g = eventlet.spawn(dummyproc)
323 eventlet.sleep(0) # let dummyproc run
324 assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
325 self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
328 # kill dummyproc, this schedules a timer to return execution to
329 # this greenlet before throwing an exception in dummyproc.
330 # it is from this timer that execution should be returned to this
331 # greenlet, and not by propogating of the terminating greenlet.
333 with eventlet.Timeout(0.5, self.CustomException()):
334 # we now switch to the hub, there should be no existing timers
335 # that switch back to this greenlet and so this hub.switch()
336 # call should block indefinately.
337 self.assertRaises(self.CustomException, hub.switch)
339 def test_parent(self):
340 """ Checks that a terminating greenthread whose parent
341 was a previous, now-defunct hub greenlet returns execution to
342 the hub runloop and not the hub greenlet's parent. """
348 g = eventlet.spawn(dummyproc)
349 assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
350 self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
353 assert not g.dead # check dummyproc hasn't completed
354 with eventlet.Timeout(0.5, self.CustomException()):
355 # we now switch to the hub which will allow
356 # completion of dummyproc.
357 # this should return execution back to the runloop and not
358 # this greenlet so that hub.switch() would block indefinately.
359 self.assertRaises(self.CustomException, hub.switch)
360 assert g.dead # sanity check that dummyproc has completed
367 class TestDefaultHub(ProcessBase):
369 def test_kqueue_unsupported(self):
370 # https://github.com/eventlet/eventlet/issues/38
371 # get_hub on windows broken by kqueue
373 from __future__ import print_function
375 # Simulate absence of kqueue even on platforms that support it.
379 except AttributeError:
383 original_import = __builtin__.__import__
385 def fail_import(name, *args, **kwargs):
387 raise ImportError('disabled for test')
389 print('kqueue tried')
390 return original_import(name, *args, **kwargs)
392 __builtin__.__import__ = fail_import
396 eventlet.hubs.get_default_hub()
399 self.write_to_tempfile('newmod', module_source)
400 output, _ = self.launch_subprocess('newmod.py')
401 self.assertEqual(output, 'kqueue tried\nok\n')
404 if __name__ == '__main__':