Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / test__pool.py
1 import eventlet
2 import warnings
3 warnings.simplefilter('ignore', DeprecationWarning)
4 from eventlet import pool, coros, api, hubs, timeout
5 warnings.simplefilter('default', DeprecationWarning)
6 from eventlet import event as _event
7 from eventlet.support import six
8 from tests import LimitedTestCase
9 from unittest import main
10
11
12 class TestCoroutinePool(LimitedTestCase):
13     klass = pool.Pool
14
15     def test_execute_async(self):
16         done = _event.Event()
17
18         def some_work():
19             done.send()
20         pool = self.klass(0, 2)
21         pool.execute_async(some_work)
22         done.wait()
23
24     def test_execute(self):
25         value = 'return value'
26
27         def some_work():
28             return value
29         pool = self.klass(0, 2)
30         worker = pool.execute(some_work)
31         self.assertEqual(value, worker.wait())
32
33     def test_waiting(self):
34         pool = self.klass(0, 1)
35         done = _event.Event()
36
37         def consume():
38             done.wait()
39
40         def waiter(pool):
41             evt = pool.execute(consume)
42             evt.wait()
43
44         waiters = []
45         waiters.append(eventlet.spawn(waiter, pool))
46         api.sleep(0)
47         self.assertEqual(pool.waiting(), 0)
48         waiters.append(eventlet.spawn(waiter, pool))
49         api.sleep(0)
50         self.assertEqual(pool.waiting(), 1)
51         waiters.append(eventlet.spawn(waiter, pool))
52         api.sleep(0)
53         self.assertEqual(pool.waiting(), 2)
54         done.send(None)
55         for w in waiters:
56             w.wait()
57         self.assertEqual(pool.waiting(), 0)
58
59     def test_multiple_coros(self):
60         evt = _event.Event()
61         results = []
62
63         def producer():
64             results.append('prod')
65             evt.send()
66
67         def consumer():
68             results.append('cons1')
69             evt.wait()
70             results.append('cons2')
71
72         pool = self.klass(0, 2)
73         done = pool.execute(consumer)
74         pool.execute_async(producer)
75         done.wait()
76         self.assertEqual(['cons1', 'prod', 'cons2'], results)
77
78     def test_timer_cancel(self):
79         # this test verifies that local timers are not fired
80         # outside of the context of the execute method
81         timer_fired = []
82
83         def fire_timer():
84             timer_fired.append(True)
85
86         def some_work():
87             hubs.get_hub().schedule_call_local(0, fire_timer)
88         pool = self.klass(0, 2)
89         worker = pool.execute(some_work)
90         worker.wait()
91         api.sleep(0)
92         self.assertEqual(timer_fired, [])
93
94     def test_reentrant(self):
95         pool = self.klass(0, 1)
96
97         def reenter():
98             waiter = pool.execute(lambda a: a, 'reenter')
99             self.assertEqual('reenter', waiter.wait())
100
101         outer_waiter = pool.execute(reenter)
102         outer_waiter.wait()
103
104         evt = _event.Event()
105
106         def reenter_async():
107             pool.execute_async(lambda a: a, 'reenter')
108             evt.send('done')
109
110         pool.execute_async(reenter_async)
111         evt.wait()
112
113     def assert_pool_has_free(self, pool, num_free):
114         def wait_long_time(e):
115             e.wait()
116         timer = timeout.Timeout(1, api.TimeoutError)
117         try:
118             evt = _event.Event()
119             for x in six.moves.range(num_free):
120                 pool.execute(wait_long_time, evt)
121                 # if the pool has fewer free than we expect,
122                 # then we'll hit the timeout error
123         finally:
124             timer.cancel()
125
126         # if the runtime error is not raised it means the pool had
127         # some unexpected free items
128         timer = timeout.Timeout(0, RuntimeError)
129         self.assertRaises(RuntimeError, pool.execute, wait_long_time, evt)
130
131         # clean up by causing all the wait_long_time functions to return
132         evt.send(None)
133         api.sleep(0)
134         api.sleep(0)
135
136     def test_resize(self):
137         pool = self.klass(max_size=2)
138         evt = _event.Event()
139
140         def wait_long_time(e):
141             e.wait()
142         pool.execute(wait_long_time, evt)
143         pool.execute(wait_long_time, evt)
144         self.assertEqual(pool.free(), 0)
145         self.assert_pool_has_free(pool, 0)
146
147         # verify that the pool discards excess items put into it
148         pool.resize(1)
149
150         # cause the wait_long_time functions to return, which will
151         # trigger puts to the pool
152         evt.send(None)
153         api.sleep(0)
154         api.sleep(0)
155
156         self.assertEqual(pool.free(), 1)
157         self.assert_pool_has_free(pool, 1)
158
159         # resize larger and assert that there are more free items
160         pool.resize(2)
161         self.assertEqual(pool.free(), 2)
162         self.assert_pool_has_free(pool, 2)
163
164     def test_stderr_raising(self):
165         # testing that really egregious errors in the error handling code
166         # (that prints tracebacks to stderr) don't cause the pool to lose
167         # any members
168         import sys
169         pool = self.klass(min_size=1, max_size=1)
170
171         def crash(*args, **kw):
172             raise RuntimeError("Whoa")
173
174         class FakeFile(object):
175             write = crash
176
177         # we're going to do this by causing the traceback.print_exc in
178         # safe_apply to raise an exception and thus exit _main_loop
179         normal_err = sys.stderr
180         try:
181             sys.stderr = FakeFile()
182             waiter = pool.execute(crash)
183             self.assertRaises(RuntimeError, waiter.wait)
184             # the pool should have something free at this point since the
185             # waiter returned
186             # pool.Pool change: if an exception is raised during execution of a link,
187             # the rest of the links are scheduled to be executed on the next hub iteration
188             # this introduces a delay in updating pool.sem which makes pool.free() report 0
189             # therefore, sleep:
190             api.sleep(0)
191             self.assertEqual(pool.free(), 1)
192             # shouldn't block when trying to get
193             t = timeout.Timeout(0.1)
194             try:
195                 pool.execute(api.sleep, 1)
196             finally:
197                 t.cancel()
198         finally:
199             sys.stderr = normal_err
200
201     def test_track_events(self):
202         pool = self.klass(track_events=True)
203         for x in range(6):
204             pool.execute(lambda n: n, x)
205         for y in range(6):
206             pool.wait()
207
208     def test_track_slow_event(self):
209         pool = self.klass(track_events=True)
210
211         def slow():
212             api.sleep(0.1)
213             return 'ok'
214         pool.execute(slow)
215         self.assertEqual(pool.wait(), 'ok')
216
217     def test_pool_smash(self):
218         # The premise is that a coroutine in a Pool tries to get a token out
219         # of a token pool but times out before getting the token.  We verify
220         # that neither pool is adversely affected by this situation.
221         from eventlet import pools
222         pool = self.klass(min_size=1, max_size=1)
223         tp = pools.TokenPool(max_size=1)
224         token = tp.get()  # empty pool
225
226         def do_receive(tp):
227             timeout.Timeout(0, RuntimeError())
228             try:
229                 t = tp.get()
230                 self.fail("Shouldn't have recieved anything from the pool")
231             except RuntimeError:
232                 return 'timed out'
233
234         # the execute makes the token pool expect that coroutine, but then
235         # immediately cuts bait
236         e1 = pool.execute(do_receive, tp)
237         self.assertEqual(e1.wait(), 'timed out')
238
239         # the pool can get some random item back
240         def send_wakeup(tp):
241             tp.put('wakeup')
242         api.spawn(send_wakeup, tp)
243
244         # now we ask the pool to run something else, which should not
245         # be affected by the previous send at all
246         def resume():
247             return 'resumed'
248         e2 = pool.execute(resume)
249         self.assertEqual(e2.wait(), 'resumed')
250
251         # we should be able to get out the thing we put in there, too
252         self.assertEqual(tp.get(), 'wakeup')
253
254
255 class PoolBasicTests(LimitedTestCase):
256     klass = pool.Pool
257
258     def test_execute_async(self):
259         p = self.klass(max_size=2)
260         self.assertEqual(p.free(), 2)
261         r = []
262
263         def foo(a):
264             r.append(a)
265         evt = p.execute(foo, 1)
266         self.assertEqual(p.free(), 1)
267         evt.wait()
268         self.assertEqual(r, [1])
269         api.sleep(0)
270         self.assertEqual(p.free(), 2)
271
272         # Once the pool is exhausted, calling an execute forces a yield.
273
274         p.execute_async(foo, 2)
275         self.assertEqual(1, p.free())
276         self.assertEqual(r, [1])
277
278         p.execute_async(foo, 3)
279         self.assertEqual(0, p.free())
280         self.assertEqual(r, [1])
281
282         p.execute_async(foo, 4)
283         self.assertEqual(r, [1, 2, 3])
284         api.sleep(0)
285         self.assertEqual(r, [1, 2, 3, 4])
286
287     def test_execute(self):
288         p = self.klass()
289         evt = p.execute(lambda a: ('foo', a), 1)
290         self.assertEqual(evt.wait(), ('foo', 1))
291
292     def test_with_intpool(self):
293         from eventlet import pools
294
295         class IntPool(pools.Pool):
296             def create(self):
297                 self.current_integer = getattr(self, 'current_integer', 0) + 1
298                 return self.current_integer
299
300         def subtest(intpool_size, pool_size, num_executes):
301             def run(int_pool):
302                 token = int_pool.get()
303                 api.sleep(0.0001)
304                 int_pool.put(token)
305                 return token
306
307             int_pool = IntPool(max_size=intpool_size)
308             pool = self.klass(max_size=pool_size)
309             for ix in six.moves.range(num_executes):
310                 pool.execute(run, int_pool)
311             pool.waitall()
312
313         subtest(4, 7, 7)
314         subtest(50, 75, 100)
315         for isize in (20, 30, 40, 50):
316             for psize in (25, 35, 50):
317                 subtest(isize, psize, psize)
318
319
320 if __name__ == '__main__':
321     main()