Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / greenpool_test.py
1 import gc
2 import os
3 import random
4
5 import eventlet
6 from eventlet import hubs, greenpool, event, pools
7 from eventlet.support import greenlets as greenlet, six
8 import tests
9
10
11 def passthru(a):
12     eventlet.sleep(0.01)
13     return a
14
15
16 def passthru2(a, b):
17     eventlet.sleep(0.01)
18     return a, b
19
20
21 def raiser(exc):
22     raise exc
23
24
25 class GreenPool(tests.LimitedTestCase):
26     def test_spawn(self):
27         p = greenpool.GreenPool(4)
28         waiters = []
29         for i in range(10):
30             waiters.append(p.spawn(passthru, i))
31         results = [waiter.wait() for waiter in waiters]
32         self.assertEqual(results, list(range(10)))
33
34     def test_spawn_n(self):
35         p = greenpool.GreenPool(4)
36         results_closure = []
37
38         def do_something(a):
39             eventlet.sleep(0.01)
40             results_closure.append(a)
41
42         for i in range(10):
43             p.spawn(do_something, i)
44         p.waitall()
45         self.assertEqual(results_closure, list(range(10)))
46
47     def test_waiting(self):
48         pool = greenpool.GreenPool(1)
49         done = event.Event()
50
51         def consume():
52             done.wait()
53
54         def waiter(pool):
55             gt = pool.spawn(consume)
56             gt.wait()
57
58         waiters = []
59         self.assertEqual(pool.running(), 0)
60         waiters.append(eventlet.spawn(waiter, pool))
61         eventlet.sleep(0)
62         self.assertEqual(pool.waiting(), 0)
63         waiters.append(eventlet.spawn(waiter, pool))
64         eventlet.sleep(0)
65         self.assertEqual(pool.waiting(), 1)
66         waiters.append(eventlet.spawn(waiter, pool))
67         eventlet.sleep(0)
68         self.assertEqual(pool.waiting(), 2)
69         self.assertEqual(pool.running(), 1)
70         done.send(None)
71         for w in waiters:
72             w.wait()
73         self.assertEqual(pool.waiting(), 0)
74         self.assertEqual(pool.running(), 0)
75
76     def test_multiple_coros(self):
77         evt = event.Event()
78         results = []
79
80         def producer():
81             results.append('prod')
82             evt.send()
83
84         def consumer():
85             results.append('cons1')
86             evt.wait()
87             results.append('cons2')
88
89         pool = greenpool.GreenPool(2)
90         done = pool.spawn(consumer)
91         pool.spawn_n(producer)
92         done.wait()
93         self.assertEqual(['cons1', 'prod', 'cons2'], results)
94
95     def test_timer_cancel(self):
96         # this test verifies that local timers are not fired
97         # outside of the context of the spawn
98         timer_fired = []
99
100         def fire_timer():
101             timer_fired.append(True)
102
103         def some_work():
104             hubs.get_hub().schedule_call_local(0, fire_timer)
105
106         pool = greenpool.GreenPool(2)
107         worker = pool.spawn(some_work)
108         worker.wait()
109         eventlet.sleep(0)
110         eventlet.sleep(0)
111         self.assertEqual(timer_fired, [])
112
113     def test_reentrant(self):
114         pool = greenpool.GreenPool(1)
115
116         def reenter():
117             waiter = pool.spawn(lambda a: a, 'reenter')
118             self.assertEqual('reenter', waiter.wait())
119
120         outer_waiter = pool.spawn(reenter)
121         outer_waiter.wait()
122
123         evt = event.Event()
124
125         def reenter_async():
126             pool.spawn_n(lambda a: a, 'reenter')
127             evt.send('done')
128
129         pool.spawn_n(reenter_async)
130         self.assertEqual('done', evt.wait())
131
132     def assert_pool_has_free(self, pool, num_free):
133         self.assertEqual(pool.free(), num_free)
134
135         def wait_long_time(e):
136             e.wait()
137
138         timer = eventlet.Timeout(1)
139         try:
140             evt = event.Event()
141             for x in six.moves.range(num_free):
142                 pool.spawn(wait_long_time, evt)
143                 # if the pool has fewer free than we expect,
144                 # then we'll hit the timeout error
145         finally:
146             timer.cancel()
147
148         # if the runtime error is not raised it means the pool had
149         # some unexpected free items
150         timer = eventlet.Timeout(0, RuntimeError)
151         try:
152             self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt)
153         finally:
154             timer.cancel()
155
156         # clean up by causing all the wait_long_time functions to return
157         evt.send(None)
158         eventlet.sleep(0)
159         eventlet.sleep(0)
160
161     def test_resize(self):
162         pool = greenpool.GreenPool(2)
163         evt = event.Event()
164
165         def wait_long_time(e):
166             e.wait()
167
168         pool.spawn(wait_long_time, evt)
169         pool.spawn(wait_long_time, evt)
170         self.assertEqual(pool.free(), 0)
171         self.assertEqual(pool.running(), 2)
172         self.assert_pool_has_free(pool, 0)
173
174         # verify that the pool discards excess items put into it
175         pool.resize(1)
176
177         # cause the wait_long_time functions to return, which will
178         # trigger puts to the pool
179         evt.send(None)
180         eventlet.sleep(0)
181         eventlet.sleep(0)
182
183         self.assertEqual(pool.free(), 1)
184         self.assertEqual(pool.running(), 0)
185         self.assert_pool_has_free(pool, 1)
186
187         # resize larger and assert that there are more free items
188         pool.resize(2)
189         self.assertEqual(pool.free(), 2)
190         self.assertEqual(pool.running(), 0)
191         self.assert_pool_has_free(pool, 2)
192
193     def test_pool_smash(self):
194         # The premise is that a coroutine in a Pool tries to get a token out
195         # of a token pool but times out before getting the token.  We verify
196         # that neither pool is adversely affected by this situation.
197         pool = greenpool.GreenPool(1)
198         tp = pools.TokenPool(max_size=1)
199         tp.get()  # empty out the pool
200
201         def do_receive(tp):
202             timer = eventlet.Timeout(0, RuntimeError())
203             try:
204                 tp.get()
205                 self.fail("Shouldn't have recieved anything from the pool")
206             except RuntimeError:
207                 return 'timed out'
208             else:
209                 timer.cancel()
210
211         # the spawn makes the token pool expect that coroutine, but then
212         # immediately cuts bait
213         e1 = pool.spawn(do_receive, tp)
214         self.assertEqual(e1.wait(), 'timed out')
215
216         # the pool can get some random item back
217         def send_wakeup(tp):
218             tp.put('wakeup')
219         gt = eventlet.spawn(send_wakeup, tp)
220
221         # now we ask the pool to run something else, which should not
222         # be affected by the previous send at all
223         def resume():
224             return 'resumed'
225         e2 = pool.spawn(resume)
226         self.assertEqual(e2.wait(), 'resumed')
227
228         # we should be able to get out the thing we put in there, too
229         self.assertEqual(tp.get(), 'wakeup')
230         gt.wait()
231
232     def test_spawn_n_2(self):
233         p = greenpool.GreenPool(2)
234         self.assertEqual(p.free(), 2)
235         r = []
236
237         def foo(a):
238             r.append(a)
239
240         gt = p.spawn(foo, 1)
241         self.assertEqual(p.free(), 1)
242         gt.wait()
243         self.assertEqual(r, [1])
244         eventlet.sleep(0)
245         self.assertEqual(p.free(), 2)
246
247         # Once the pool is exhausted, spawning forces a yield.
248         p.spawn_n(foo, 2)
249         self.assertEqual(1, p.free())
250         self.assertEqual(r, [1])
251
252         p.spawn_n(foo, 3)
253         self.assertEqual(0, p.free())
254         self.assertEqual(r, [1])
255
256         p.spawn_n(foo, 4)
257         self.assertEqual(set(r), set([1, 2, 3]))
258         eventlet.sleep(0)
259         self.assertEqual(set(r), set([1, 2, 3, 4]))
260
261     def test_exceptions(self):
262         p = greenpool.GreenPool(2)
263         for m in (p.spawn, p.spawn_n):
264             self.assert_pool_has_free(p, 2)
265             m(raiser, RuntimeError())
266             self.assert_pool_has_free(p, 1)
267             p.waitall()
268             self.assert_pool_has_free(p, 2)
269             m(raiser, greenlet.GreenletExit)
270             self.assert_pool_has_free(p, 1)
271             p.waitall()
272             self.assert_pool_has_free(p, 2)
273
274     def test_imap(self):
275         p = greenpool.GreenPool(4)
276         result_list = list(p.imap(passthru, range(10)))
277         self.assertEqual(result_list, list(range(10)))
278
279     def test_empty_imap(self):
280         p = greenpool.GreenPool(4)
281         result_iter = p.imap(passthru, [])
282         self.assertRaises(StopIteration, result_iter.next)
283
284     def test_imap_nonefunc(self):
285         p = greenpool.GreenPool(4)
286         result_list = list(p.imap(None, range(10)))
287         self.assertEqual(result_list, [(x,) for x in range(10)])
288
289     def test_imap_multi_args(self):
290         p = greenpool.GreenPool(4)
291         result_list = list(p.imap(passthru2, range(10), range(10, 20)))
292         self.assertEqual(result_list, list(zip(range(10), range(10, 20))))
293
294     def test_imap_raises(self):
295         # testing the case where the function raises an exception;
296         # both that the caller sees that exception, and that the iterator
297         # continues to be usable to get the rest of the items
298         p = greenpool.GreenPool(4)
299
300         def raiser(item):
301             if item == 1 or item == 7:
302                 raise RuntimeError("intentional error")
303             else:
304                 return item
305
306         it = p.imap(raiser, range(10))
307         results = []
308         while True:
309             try:
310                 results.append(six.next(it))
311             except RuntimeError:
312                 results.append('r')
313             except StopIteration:
314                 break
315         self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9])
316
317     def test_starmap(self):
318         p = greenpool.GreenPool(4)
319         result_list = list(p.starmap(passthru, [(x,) for x in range(10)]))
320         self.assertEqual(result_list, list(range(10)))
321
322     def test_waitall_on_nothing(self):
323         p = greenpool.GreenPool()
324         p.waitall()
325
326     def test_recursive_waitall(self):
327         p = greenpool.GreenPool()
328         gt = p.spawn(p.waitall)
329         self.assertRaises(AssertionError, gt.wait)
330
331
332 class GreenPile(tests.LimitedTestCase):
333     def test_pile(self):
334         p = greenpool.GreenPile(4)
335         for i in range(10):
336             p.spawn(passthru, i)
337         result_list = list(p)
338         self.assertEqual(result_list, list(range(10)))
339
340     def test_pile_spawn_times_out(self):
341         p = greenpool.GreenPile(4)
342         for i in range(4):
343             p.spawn(passthru, i)
344         # now it should be full and this should time out
345         eventlet.Timeout(0)
346         self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out")
347         # verify that the spawn breakage didn't interrupt the sequence
348         # and terminates properly
349         for i in range(4, 10):
350             p.spawn(passthru, i)
351         self.assertEqual(list(p), list(range(10)))
352
353     def test_constructing_from_pool(self):
354         pool = greenpool.GreenPool(2)
355         pile1 = greenpool.GreenPile(pool)
356         pile2 = greenpool.GreenPile(pool)
357
358         def bunch_of_work(pile, unique):
359             for i in range(10):
360                 pile.spawn(passthru, i + unique)
361
362         eventlet.spawn(bunch_of_work, pile1, 0)
363         eventlet.spawn(bunch_of_work, pile2, 100)
364         eventlet.sleep(0)
365         self.assertEqual(list(pile2), list(range(100, 110)))
366         self.assertEqual(list(pile1), list(range(10)))
367
368
369 class StressException(Exception):
370     pass
371
372 r = random.Random(0)
373
374
375 def pressure(arg):
376     while r.random() < 0.5:
377         eventlet.sleep(r.random() * 0.001)
378     if r.random() < 0.8:
379         return arg
380     else:
381         raise StressException(arg)
382
383
384 def passthru(arg):
385     while r.random() < 0.5:
386         eventlet.sleep(r.random() * 0.001)
387     return arg
388
389
390 class Stress(tests.LimitedTestCase):
391     # tests will take extra-long
392     TEST_TIMEOUT = 60
393
394     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
395     def spawn_order_check(self, concurrency):
396         # checks that piles are strictly ordered
397         p = greenpool.GreenPile(concurrency)
398
399         def makework(count, unique):
400             for i in six.moves.range(count):
401                 token = (unique, i)
402                 p.spawn(pressure, token)
403
404         iters = 1000
405         eventlet.spawn(makework, iters, 1)
406         eventlet.spawn(makework, iters, 2)
407         eventlet.spawn(makework, iters, 3)
408         p.spawn(pressure, (0, 0))
409         latest = [-1] * 4
410         received = 0
411         it = iter(p)
412         while True:
413             try:
414                 i = six.next(it)
415             except StressException as exc:
416                 i = exc.args[0]
417             except StopIteration:
418                 break
419             received += 1
420             if received % 5 == 0:
421                 eventlet.sleep(0.0001)
422             unique, order = i
423             assert latest[unique] < order
424             latest[unique] = order
425         for l in latest[1:]:
426             self.assertEqual(l, iters - 1)
427
428     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
429     def test_ordering_5(self):
430         self.spawn_order_check(5)
431
432     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
433     def test_ordering_50(self):
434         self.spawn_order_check(50)
435
436     def imap_memory_check(self, concurrency):
437         # checks that imap is strictly
438         # ordered and consumes a constant amount of memory
439         p = greenpool.GreenPool(concurrency)
440         count = 1000
441         it = p.imap(passthru, six.moves.range(count))
442         latest = -1
443         while True:
444             try:
445                 i = six.next(it)
446             except StopIteration:
447                 break
448
449             if latest == -1:
450                 gc.collect()
451                 initial_obj_count = len(gc.get_objects())
452             assert i > latest
453             latest = i
454             if latest % 5 == 0:
455                 eventlet.sleep(0.001)
456             if latest % 10 == 0:
457                 gc.collect()
458                 objs_created = len(gc.get_objects()) - initial_obj_count
459                 assert objs_created < 25 * concurrency, objs_created
460         # make sure we got to the end
461         self.assertEqual(latest, count - 1)
462
463     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
464     def test_imap_50(self):
465         self.imap_memory_check(50)
466
467     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
468     def test_imap_500(self):
469         self.imap_memory_check(500)
470
471     @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
472     def test_with_intpool(self):
473         class IntPool(pools.Pool):
474             def create(self):
475                 self.current_integer = getattr(self, 'current_integer', 0) + 1
476                 return self.current_integer
477
478         def subtest(intpool_size, pool_size, num_executes):
479             def run(int_pool):
480                 token = int_pool.get()
481                 eventlet.sleep(0.0001)
482                 int_pool.put(token)
483                 return token
484
485             int_pool = IntPool(max_size=intpool_size)
486             pool = greenpool.GreenPool(pool_size)
487             for ix in six.moves.range(num_executes):
488                 pool.spawn(run, int_pool)
489             pool.waitall()
490
491         subtest(4, 7, 7)
492         subtest(50, 75, 100)
493         for isize in (10, 20, 30, 40, 50):
494             for psize in (5, 25, 35, 50):
495                 subtest(isize, psize, psize)