+++ /dev/null
-from tests import LimitedTestCase
-from eventlet import greenthread
-from eventlet.support import greenlets as greenlet
-
-_g_results = []
-
-
-def passthru(*args, **kw):
- _g_results.append((args, kw))
- return args, kw
-
-
-def waiter(a):
- greenthread.sleep(0.1)
- return a
-
-
-class Asserts(object):
- def assert_dead(self, gt):
- if hasattr(gt, 'wait'):
- self.assertRaises(greenlet.GreenletExit, gt.wait)
- assert gt.dead
- assert not gt
-
-
-class Spawn(LimitedTestCase, Asserts):
- def tearDown(self):
- global _g_results
- super(Spawn, self).tearDown()
- _g_results = []
-
- def test_simple(self):
- gt = greenthread.spawn(passthru, 1, b=2)
- self.assertEqual(gt.wait(), ((1,), {'b': 2}))
- self.assertEqual(_g_results, [((1,), {'b': 2})])
-
- def test_n(self):
- gt = greenthread.spawn_n(passthru, 2, b=3)
- assert not gt.dead
- greenthread.sleep(0)
- assert gt.dead
- self.assertEqual(_g_results, [((2,), {'b': 3})])
-
- def test_kill(self):
- gt = greenthread.spawn(passthru, 6)
- greenthread.kill(gt)
- self.assert_dead(gt)
- greenthread.sleep(0.001)
- self.assertEqual(_g_results, [])
- greenthread.kill(gt)
- self.assert_dead(gt)
-
- def test_kill_meth(self):
- gt = greenthread.spawn(passthru, 6)
- gt.kill()
- self.assert_dead(gt)
- greenthread.sleep(0.001)
- self.assertEqual(_g_results, [])
- gt.kill()
- self.assert_dead(gt)
-
- def test_kill_n(self):
- gt = greenthread.spawn_n(passthru, 7)
- greenthread.kill(gt)
- self.assert_dead(gt)
- greenthread.sleep(0.001)
- self.assertEqual(_g_results, [])
- greenthread.kill(gt)
- self.assert_dead(gt)
-
- def test_link(self):
- results = []
-
- def link_func(g, *a, **kw):
- results.append(g)
- results.append(a)
- results.append(kw)
- gt = greenthread.spawn(passthru, 5)
- gt.link(link_func, 4, b=5)
- self.assertEqual(gt.wait(), ((5,), {}))
- self.assertEqual(results, [gt, (4,), {'b': 5}])
-
- def test_link_after_exited(self):
- results = []
-
- def link_func(g, *a, **kw):
- results.append(g)
- results.append(a)
- results.append(kw)
- gt = greenthread.spawn(passthru, 5)
- self.assertEqual(gt.wait(), ((5,), {}))
- gt.link(link_func, 4, b=5)
- self.assertEqual(results, [gt, (4,), {'b': 5}])
-
- def test_link_relinks(self):
- # test that linking in a linked func doesn't cause infinite recursion.
- called = []
-
- def link_func(g):
- g.link(link_func_pass)
-
- def link_func_pass(g):
- called.append(True)
-
- gt = greenthread.spawn(passthru)
- gt.link(link_func)
- gt.wait()
- self.assertEqual(called, [True])
-
-
-class SpawnAfter(Spawn):
- def test_basic(self):
- gt = greenthread.spawn_after(0.1, passthru, 20)
- self.assertEqual(gt.wait(), ((20,), {}))
-
- def test_cancel(self):
- gt = greenthread.spawn_after(0.1, passthru, 21)
- gt.cancel()
- self.assert_dead(gt)
-
- def test_cancel_already_started(self):
- gt = greenthread.spawn_after(0, waiter, 22)
- greenthread.sleep(0)
- gt.cancel()
- self.assertEqual(gt.wait(), 22)
-
- def test_kill_already_started(self):
- gt = greenthread.spawn_after(0, waiter, 22)
- greenthread.sleep(0)
- gt.kill()
- self.assert_dead(gt)
-
-
-class SpawnAfterLocal(LimitedTestCase, Asserts):
- def setUp(self):
- super(SpawnAfterLocal, self).setUp()
- self.lst = [1]
-
- def test_timer_fired(self):
- def func():
- greenthread.spawn_after_local(0.1, self.lst.pop)
- greenthread.sleep(0.2)
-
- greenthread.spawn(func)
- assert self.lst == [1], self.lst
- greenthread.sleep(0.3)
- assert self.lst == [], self.lst
-
- def test_timer_cancelled_upon_greenlet_exit(self):
- def func():
- greenthread.spawn_after_local(0.1, self.lst.pop)
-
- greenthread.spawn(func)
- assert self.lst == [1], self.lst
- greenthread.sleep(0.2)
- assert self.lst == [1], self.lst
-
- def test_spawn_is_not_cancelled(self):
- def func():
- greenthread.spawn(self.lst.pop)
- # exiting immediatelly, but self.lst.pop must be called
- greenthread.spawn(func)
- greenthread.sleep(0.1)
- assert self.lst == [], self.lst