X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Ftests%2Fgreenthread_test.py;fp=eventlet%2Ftests%2Fgreenthread_test.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=39b29c79debdcef2e86b7dd50759ccc046ac56fd;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/greenthread_test.py b/eventlet/tests/greenthread_test.py deleted file mode 100644 index 39b29c7..0000000 --- a/eventlet/tests/greenthread_test.py +++ /dev/null @@ -1,164 +0,0 @@ -from tests import LimitedTestCase -from eventlet import greenthread -from eventlet.support import greenlets as greenlet - -_g_results = [] - - -def passthru(*args, **kw): - _g_results.append((args, kw)) - return args, kw - - -def waiter(a): - greenthread.sleep(0.1) - return a - - -class Asserts(object): - def assert_dead(self, gt): - if hasattr(gt, 'wait'): - self.assertRaises(greenlet.GreenletExit, gt.wait) - assert gt.dead - assert not gt - - -class Spawn(LimitedTestCase, Asserts): - def tearDown(self): - global _g_results - super(Spawn, self).tearDown() - _g_results = [] - - def test_simple(self): - gt = greenthread.spawn(passthru, 1, b=2) - self.assertEqual(gt.wait(), ((1,), {'b': 2})) - self.assertEqual(_g_results, [((1,), {'b': 2})]) - - def test_n(self): - gt = greenthread.spawn_n(passthru, 2, b=3) - assert not gt.dead - greenthread.sleep(0) - assert gt.dead - self.assertEqual(_g_results, [((2,), {'b': 3})]) - - def test_kill(self): - gt = greenthread.spawn(passthru, 6) - greenthread.kill(gt) - self.assert_dead(gt) - greenthread.sleep(0.001) - self.assertEqual(_g_results, []) - greenthread.kill(gt) - self.assert_dead(gt) - - def test_kill_meth(self): - gt = greenthread.spawn(passthru, 6) - gt.kill() - self.assert_dead(gt) - greenthread.sleep(0.001) - self.assertEqual(_g_results, []) - gt.kill() - self.assert_dead(gt) - - def test_kill_n(self): - gt = greenthread.spawn_n(passthru, 7) - greenthread.kill(gt) - self.assert_dead(gt) - greenthread.sleep(0.001) - self.assertEqual(_g_results, []) - greenthread.kill(gt) - self.assert_dead(gt) - - def test_link(self): - results = [] - - def link_func(g, *a, **kw): - results.append(g) - results.append(a) - results.append(kw) - gt = greenthread.spawn(passthru, 5) - gt.link(link_func, 4, b=5) - self.assertEqual(gt.wait(), ((5,), {})) - self.assertEqual(results, [gt, (4,), {'b': 5}]) - - def test_link_after_exited(self): - results = [] - - def link_func(g, *a, **kw): - results.append(g) - results.append(a) - results.append(kw) - gt = greenthread.spawn(passthru, 5) - self.assertEqual(gt.wait(), ((5,), {})) - gt.link(link_func, 4, b=5) - self.assertEqual(results, [gt, (4,), {'b': 5}]) - - def test_link_relinks(self): - # test that linking in a linked func doesn't cause infinite recursion. - called = [] - - def link_func(g): - g.link(link_func_pass) - - def link_func_pass(g): - called.append(True) - - gt = greenthread.spawn(passthru) - gt.link(link_func) - gt.wait() - self.assertEqual(called, [True]) - - -class SpawnAfter(Spawn): - def test_basic(self): - gt = greenthread.spawn_after(0.1, passthru, 20) - self.assertEqual(gt.wait(), ((20,), {})) - - def test_cancel(self): - gt = greenthread.spawn_after(0.1, passthru, 21) - gt.cancel() - self.assert_dead(gt) - - def test_cancel_already_started(self): - gt = greenthread.spawn_after(0, waiter, 22) - greenthread.sleep(0) - gt.cancel() - self.assertEqual(gt.wait(), 22) - - def test_kill_already_started(self): - gt = greenthread.spawn_after(0, waiter, 22) - greenthread.sleep(0) - gt.kill() - self.assert_dead(gt) - - -class SpawnAfterLocal(LimitedTestCase, Asserts): - def setUp(self): - super(SpawnAfterLocal, self).setUp() - self.lst = [1] - - def test_timer_fired(self): - def func(): - greenthread.spawn_after_local(0.1, self.lst.pop) - greenthread.sleep(0.2) - - greenthread.spawn(func) - assert self.lst == [1], self.lst - greenthread.sleep(0.3) - assert self.lst == [], self.lst - - def test_timer_cancelled_upon_greenlet_exit(self): - def func(): - greenthread.spawn_after_local(0.1, self.lst.pop) - - greenthread.spawn(func) - assert self.lst == [1], self.lst - greenthread.sleep(0.2) - assert self.lst == [1], self.lst - - def test_spawn_is_not_cancelled(self): - def func(): - greenthread.spawn(self.lst.pop) - # exiting immediatelly, but self.lst.pop must be called - greenthread.spawn(func) - greenthread.sleep(0.1) - assert self.lst == [], self.lst