Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / greenthread_test.py
1 from tests import LimitedTestCase
2 from eventlet import greenthread
3 from eventlet.support import greenlets as greenlet
4
5 _g_results = []
6
7
8 def passthru(*args, **kw):
9     _g_results.append((args, kw))
10     return args, kw
11
12
13 def waiter(a):
14     greenthread.sleep(0.1)
15     return a
16
17
18 class Asserts(object):
19     def assert_dead(self, gt):
20         if hasattr(gt, 'wait'):
21             self.assertRaises(greenlet.GreenletExit, gt.wait)
22         assert gt.dead
23         assert not gt
24
25
26 class Spawn(LimitedTestCase, Asserts):
27     def tearDown(self):
28         global _g_results
29         super(Spawn, self).tearDown()
30         _g_results = []
31
32     def test_simple(self):
33         gt = greenthread.spawn(passthru, 1, b=2)
34         self.assertEqual(gt.wait(), ((1,), {'b': 2}))
35         self.assertEqual(_g_results, [((1,), {'b': 2})])
36
37     def test_n(self):
38         gt = greenthread.spawn_n(passthru, 2, b=3)
39         assert not gt.dead
40         greenthread.sleep(0)
41         assert gt.dead
42         self.assertEqual(_g_results, [((2,), {'b': 3})])
43
44     def test_kill(self):
45         gt = greenthread.spawn(passthru, 6)
46         greenthread.kill(gt)
47         self.assert_dead(gt)
48         greenthread.sleep(0.001)
49         self.assertEqual(_g_results, [])
50         greenthread.kill(gt)
51         self.assert_dead(gt)
52
53     def test_kill_meth(self):
54         gt = greenthread.spawn(passthru, 6)
55         gt.kill()
56         self.assert_dead(gt)
57         greenthread.sleep(0.001)
58         self.assertEqual(_g_results, [])
59         gt.kill()
60         self.assert_dead(gt)
61
62     def test_kill_n(self):
63         gt = greenthread.spawn_n(passthru, 7)
64         greenthread.kill(gt)
65         self.assert_dead(gt)
66         greenthread.sleep(0.001)
67         self.assertEqual(_g_results, [])
68         greenthread.kill(gt)
69         self.assert_dead(gt)
70
71     def test_link(self):
72         results = []
73
74         def link_func(g, *a, **kw):
75             results.append(g)
76             results.append(a)
77             results.append(kw)
78         gt = greenthread.spawn(passthru, 5)
79         gt.link(link_func, 4, b=5)
80         self.assertEqual(gt.wait(), ((5,), {}))
81         self.assertEqual(results, [gt, (4,), {'b': 5}])
82
83     def test_link_after_exited(self):
84         results = []
85
86         def link_func(g, *a, **kw):
87             results.append(g)
88             results.append(a)
89             results.append(kw)
90         gt = greenthread.spawn(passthru, 5)
91         self.assertEqual(gt.wait(), ((5,), {}))
92         gt.link(link_func, 4, b=5)
93         self.assertEqual(results, [gt, (4,), {'b': 5}])
94
95     def test_link_relinks(self):
96         # test that linking in a linked func doesn't cause infinite recursion.
97         called = []
98
99         def link_func(g):
100             g.link(link_func_pass)
101
102         def link_func_pass(g):
103             called.append(True)
104
105         gt = greenthread.spawn(passthru)
106         gt.link(link_func)
107         gt.wait()
108         self.assertEqual(called, [True])
109
110
111 class SpawnAfter(Spawn):
112     def test_basic(self):
113         gt = greenthread.spawn_after(0.1, passthru, 20)
114         self.assertEqual(gt.wait(), ((20,), {}))
115
116     def test_cancel(self):
117         gt = greenthread.spawn_after(0.1, passthru, 21)
118         gt.cancel()
119         self.assert_dead(gt)
120
121     def test_cancel_already_started(self):
122         gt = greenthread.spawn_after(0, waiter, 22)
123         greenthread.sleep(0)
124         gt.cancel()
125         self.assertEqual(gt.wait(), 22)
126
127     def test_kill_already_started(self):
128         gt = greenthread.spawn_after(0, waiter, 22)
129         greenthread.sleep(0)
130         gt.kill()
131         self.assert_dead(gt)
132
133
134 class SpawnAfterLocal(LimitedTestCase, Asserts):
135     def setUp(self):
136         super(SpawnAfterLocal, self).setUp()
137         self.lst = [1]
138
139     def test_timer_fired(self):
140         def func():
141             greenthread.spawn_after_local(0.1, self.lst.pop)
142             greenthread.sleep(0.2)
143
144         greenthread.spawn(func)
145         assert self.lst == [1], self.lst
146         greenthread.sleep(0.3)
147         assert self.lst == [], self.lst
148
149     def test_timer_cancelled_upon_greenlet_exit(self):
150         def func():
151             greenthread.spawn_after_local(0.1, self.lst.pop)
152
153         greenthread.spawn(func)
154         assert self.lst == [1], self.lst
155         greenthread.sleep(0.2)
156         assert self.lst == [1], self.lst
157
158     def test_spawn_is_not_cancelled(self):
159         def func():
160             greenthread.spawn(self.lst.pop)
161             # exiting immediatelly, but self.lst.pop must be called
162         greenthread.spawn(func)
163         greenthread.sleep(0.1)
164         assert self.lst == [], self.lst