from __future__ import with_statement import sys import tests from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless from tests.patcher_test import ProcessBase import time import eventlet from eventlet import hubs from eventlet.event import Event from eventlet.semaphore import Semaphore from import greenlets, six DELAY = 0.001 def noop(): pass class TestTimerCleanup(LimitedTestCase): TEST_TIMEOUT = 2 @skip_with_pyevent def test_cancel_immediate(self): hub = hubs.get_hub() stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(2000): t = hubs.get_hub().schedule_call_global(60, noop) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) @skip_with_pyevent def test_cancel_accumulated(self): hub = hubs.get_hub() stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(2000): t = hubs.get_hub().schedule_call_global(60, noop) eventlet.sleep() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1, hub.timers) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) @skip_with_pyevent def test_cancel_proportion(self): # if fewer than half the pending timers are canceled, it should # not clean them out hub = hubs.get_hub() uncanceled_timers = [] stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(1000): # 2/3rds of new timers are uncanceled t = hubs.get_hub().schedule_call_global(60, noop) t2 = hubs.get_hub().schedule_call_global(60, noop) t3 = hubs.get_hub().schedule_call_global(60, noop) eventlet.sleep() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) uncanceled_timers.append(t2) uncanceled_timers.append(t3) # 3000 new timers, plus a few extras self.assert_less_than_equal(stimers + 3000, stimers + hub.get_timers_count()) self.assertEqual(hub.timers_canceled, 1000) for t in uncanceled_timers: t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count()) eventlet.sleep() class TestScheduleCall(LimitedTestCase): def test_local(self): lst = [1] eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop) eventlet.sleep(0) eventlet.sleep(DELAY * 2) assert lst == [1], lst def test_global(self): lst = [1] eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop) eventlet.sleep(0) eventlet.sleep(DELAY * 2) assert lst == [], lst def test_ordering(self): lst = [] hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3) hubs.get_hub().schedule_call_global(DELAY, lst.append, 1) hubs.get_hub().schedule_call_global(DELAY, lst.append, 2) while len(lst) < 3: eventlet.sleep(DELAY) self.assertEqual(lst, [1, 2, 3]) class TestDebug(LimitedTestCase): def test_debug_listeners(self): hubs.get_hub().set_debug_listeners(True) hubs.get_hub().set_debug_listeners(False) def test_timer_exceptions(self): hubs.get_hub().set_timer_exceptions(True) hubs.get_hub().set_timer_exceptions(False) class TestExceptionInMainloop(LimitedTestCase): def test_sleep(self): # even if there was an error in the mainloop, the hub should continue # to work start = time.time() eventlet.sleep(DELAY) delay = time.time() - start assert delay >= DELAY * \ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( delay, DELAY) def fail(): 1 // 0 hubs.get_hub().schedule_call_global(0, fail) start = time.time() eventlet.sleep(DELAY) delay = time.time() - start assert delay >= DELAY * \ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( delay, DELAY) class TestExceptionInGreenthread(LimitedTestCase): @skip_unless(greenlets.preserves_excinfo) def test_exceptionpreservation(self): # events for controlling execution order gt1event = Event() gt2event = Event() def test_gt1(): try: raise KeyError() except KeyError: gt1event.send('exception') gt2event.wait() assert sys.exc_info()[0] is KeyError gt1event.send('test passed') def test_gt2(): gt1event.wait() gt1event.reset() assert sys.exc_info()[0] is None try: raise ValueError() except ValueError: gt2event.send('exception') gt1event.wait() assert sys.exc_info()[0] is ValueError g1 = eventlet.spawn(test_gt1) g2 = eventlet.spawn(test_gt2) try: g1.wait() g2.wait() finally: g1.kill() g2.kill() def test_exceptionleaks(self): # tests expected behaviour with all versions of greenlet def test_gt(sem): try: raise KeyError() except KeyError: sem.release() hubs.get_hub().switch() # semaphores for controlling execution order sem = Semaphore() sem.acquire() g = eventlet.spawn(test_gt, sem) try: sem.acquire() assert sys.exc_info()[0] is None finally: g.kill() class TestHubSelection(LimitedTestCase): def test_explicit_hub(self): oldhub = hubs.get_hub() try: hubs.use_hub(Foo) assert isinstance(hubs.get_hub(), Foo), hubs.get_hub() finally: hubs._threadlocal.hub = oldhub class TestHubBlockingDetector(LimitedTestCase): TEST_TIMEOUT = 10 @skip_with_pyevent def test_block_detect(self): def look_im_blocking(): import time time.sleep(2) from eventlet import debug debug.hub_blocking_detection(True) gt = eventlet.spawn(look_im_blocking) self.assertRaises(RuntimeError, gt.wait) debug.hub_blocking_detection(False) @skip_with_pyevent @skip_if_no_itimer def test_block_detect_with_itimer(self): def look_im_blocking(): import time time.sleep(0.5) from eventlet import debug debug.hub_blocking_detection(True, resolution=0.1) gt = eventlet.spawn(look_im_blocking) self.assertRaises(RuntimeError, gt.wait) debug.hub_blocking_detection(False) class TestSuspend(LimitedTestCase): TEST_TIMEOUT = 3 longMessage = True maxDiff = None def test_suspend_doesnt_crash(self): import os import shutil import signal import subprocess import sys import tempfile self.tempdir = tempfile.mkdtemp('test_suspend') filename = os.path.join(self.tempdir, '') fd = open(filename, "w") fd.write("""import eventlet eventlet.Timeout(0.5) try: eventlet.listen(("", 0)).accept() except eventlet.Timeout: print("exited correctly") """) fd.close() python_path = os.pathsep.join(sys.path + [self.tempdir]) new_env = os.environ.copy() new_env['PYTHONPATH'] = python_path p = subprocess.Popen([sys.executable, os.path.join(self.tempdir, filename)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) eventlet.sleep(0.4) # wait for process to hit accept os.kill(, signal.SIGSTOP) # suspend and resume to generate EINTR os.kill(, signal.SIGCONT) output, _ = p.communicate() lines = output.decode('utf-8', 'replace').splitlines() assert "exited correctly" in lines[-1], output shutil.rmtree(self.tempdir) class TestBadFilenos(LimitedTestCase): @skip_with_pyevent def test_repeated_selects(self): from import select self.assertRaises(ValueError,, [-1], [], []) self.assertRaises(ValueError,, [-1], [], []) class TestFork(LimitedTestCase): @skip_with_pyevent def test_fork(self): output = tests.run_python('tests/') lines = output.splitlines() self.assertEqual(lines, ["accept blocked", "child died ok"], output) class TestDeadRunLoop(LimitedTestCase): TEST_TIMEOUT = 2 class CustomException(Exception): pass def test_kill(self): """ Checks that killing a process after the hub runloop dies does not immediately return to hub greenlet's parent and schedule a redundant timer. """ hub = hubs.get_hub() def dummyproc(): hub.switch() g = eventlet.spawn(dummyproc) eventlet.sleep(0) # let dummyproc run assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) # kill dummyproc, this schedules a timer to return execution to # this greenlet before throwing an exception in dummyproc. # it is from this timer that execution should be returned to this # greenlet, and not by propogating of the terminating greenlet. g.kill() with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub, there should be no existing timers # that switch back to this greenlet and so this hub.switch() # call should block indefinately. self.assertRaises(self.CustomException, hub.switch) def test_parent(self): """ Checks that a terminating greenthread whose parent was a previous, now-defunct hub greenlet returns execution to the hub runloop and not the hub greenlet's parent. """ hub = hubs.get_hub() def dummyproc(): pass g = eventlet.spawn(dummyproc) assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) assert not g.dead # check dummyproc hasn't completed with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub which will allow # completion of dummyproc. # this should return execution back to the runloop and not # this greenlet so that hub.switch() would block indefinately. self.assertRaises(self.CustomException, hub.switch) assert g.dead # sanity check that dummyproc has completed class Foo(object): pass class TestDefaultHub(ProcessBase): def test_kqueue_unsupported(self): # # get_hub on windows broken by kqueue module_source = r''' from __future__ import print_function # Simulate absence of kqueue even on platforms that support it. import select try: del select.kqueue except AttributeError: pass import __builtin__ original_import = __builtin__.__import__ def fail_import(name, *args, **kwargs): if 'epoll' in name: raise ImportError('disabled for test') if 'kqueue' in name: print('kqueue tried') return original_import(name, *args, **kwargs) __builtin__.__import__ = fail_import import eventlet.hubs eventlet.hubs.get_default_hub() print('ok') ''' self.write_to_tempfile('newmod', module_source) output, _ = self.launch_subprocess('') self.assertEqual(output, 'kqueue tried\nok\n') if __name__ == '__main__': main()