6 from eventlet import hubs, greenpool, event, pools
7 from eventlet.support import greenlets as greenlet, six
25 class GreenPool(tests.LimitedTestCase):
27 p = greenpool.GreenPool(4)
30 waiters.append(p.spawn(passthru, i))
31 results = [waiter.wait() for waiter in waiters]
32 self.assertEqual(results, list(range(10)))
34 def test_spawn_n(self):
35 p = greenpool.GreenPool(4)
40 results_closure.append(a)
43 p.spawn(do_something, i)
45 self.assertEqual(results_closure, list(range(10)))
47 def test_waiting(self):
48 pool = greenpool.GreenPool(1)
55 gt = pool.spawn(consume)
59 self.assertEqual(pool.running(), 0)
60 waiters.append(eventlet.spawn(waiter, pool))
62 self.assertEqual(pool.waiting(), 0)
63 waiters.append(eventlet.spawn(waiter, pool))
65 self.assertEqual(pool.waiting(), 1)
66 waiters.append(eventlet.spawn(waiter, pool))
68 self.assertEqual(pool.waiting(), 2)
69 self.assertEqual(pool.running(), 1)
73 self.assertEqual(pool.waiting(), 0)
74 self.assertEqual(pool.running(), 0)
76 def test_multiple_coros(self):
81 results.append('prod')
85 results.append('cons1')
87 results.append('cons2')
89 pool = greenpool.GreenPool(2)
90 done = pool.spawn(consumer)
91 pool.spawn_n(producer)
93 self.assertEqual(['cons1', 'prod', 'cons2'], results)
95 def test_timer_cancel(self):
96 # this test verifies that local timers are not fired
97 # outside of the context of the spawn
101 timer_fired.append(True)
104 hubs.get_hub().schedule_call_local(0, fire_timer)
106 pool = greenpool.GreenPool(2)
107 worker = pool.spawn(some_work)
111 self.assertEqual(timer_fired, [])
113 def test_reentrant(self):
114 pool = greenpool.GreenPool(1)
117 waiter = pool.spawn(lambda a: a, 'reenter')
118 self.assertEqual('reenter', waiter.wait())
120 outer_waiter = pool.spawn(reenter)
126 pool.spawn_n(lambda a: a, 'reenter')
129 pool.spawn_n(reenter_async)
130 self.assertEqual('done', evt.wait())
132 def assert_pool_has_free(self, pool, num_free):
133 self.assertEqual(pool.free(), num_free)
135 def wait_long_time(e):
138 timer = eventlet.Timeout(1)
141 for x in six.moves.range(num_free):
142 pool.spawn(wait_long_time, evt)
143 # if the pool has fewer free than we expect,
144 # then we'll hit the timeout error
148 # if the runtime error is not raised it means the pool had
149 # some unexpected free items
150 timer = eventlet.Timeout(0, RuntimeError)
152 self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt)
156 # clean up by causing all the wait_long_time functions to return
161 def test_resize(self):
162 pool = greenpool.GreenPool(2)
165 def wait_long_time(e):
168 pool.spawn(wait_long_time, evt)
169 pool.spawn(wait_long_time, evt)
170 self.assertEqual(pool.free(), 0)
171 self.assertEqual(pool.running(), 2)
172 self.assert_pool_has_free(pool, 0)
174 # verify that the pool discards excess items put into it
177 # cause the wait_long_time functions to return, which will
178 # trigger puts to the pool
183 self.assertEqual(pool.free(), 1)
184 self.assertEqual(pool.running(), 0)
185 self.assert_pool_has_free(pool, 1)
187 # resize larger and assert that there are more free items
189 self.assertEqual(pool.free(), 2)
190 self.assertEqual(pool.running(), 0)
191 self.assert_pool_has_free(pool, 2)
193 def test_pool_smash(self):
194 # The premise is that a coroutine in a Pool tries to get a token out
195 # of a token pool but times out before getting the token. We verify
196 # that neither pool is adversely affected by this situation.
197 pool = greenpool.GreenPool(1)
198 tp = pools.TokenPool(max_size=1)
199 tp.get() # empty out the pool
202 timer = eventlet.Timeout(0, RuntimeError())
205 self.fail("Shouldn't have received anything from the pool")
211 # the spawn makes the token pool expect that coroutine, but then
212 # immediately cuts bait
213 e1 = pool.spawn(do_receive, tp)
214 self.assertEqual(e1.wait(), 'timed out')
216 # the pool can get some random item back
219 gt = eventlet.spawn(send_wakeup, tp)
221 # now we ask the pool to run something else, which should not
222 # be affected by the previous send at all
225 e2 = pool.spawn(resume)
226 self.assertEqual(e2.wait(), 'resumed')
228 # we should be able to get out the thing we put in there, too
229 self.assertEqual(tp.get(), 'wakeup')
232 def test_spawn_n_2(self):
233 p = greenpool.GreenPool(2)
234 self.assertEqual(p.free(), 2)
241 self.assertEqual(p.free(), 1)
243 self.assertEqual(r, [1])
245 self.assertEqual(p.free(), 2)
247 # Once the pool is exhausted, spawning forces a yield.
249 self.assertEqual(1, p.free())
250 self.assertEqual(r, [1])
253 self.assertEqual(0, p.free())
254 self.assertEqual(r, [1])
257 self.assertEqual(set(r), set([1, 2, 3]))
259 self.assertEqual(set(r), set([1, 2, 3, 4]))
261 def test_exceptions(self):
262 p = greenpool.GreenPool(2)
263 for m in (p.spawn, p.spawn_n):
264 self.assert_pool_has_free(p, 2)
265 m(raiser, RuntimeError())
266 self.assert_pool_has_free(p, 1)
268 self.assert_pool_has_free(p, 2)
269 m(raiser, greenlet.GreenletExit)
270 self.assert_pool_has_free(p, 1)
272 self.assert_pool_has_free(p, 2)
275 p = greenpool.GreenPool(4)
276 result_list = list(p.imap(passthru, range(10)))
277 self.assertEqual(result_list, list(range(10)))
279 def test_empty_imap(self):
280 p = greenpool.GreenPool(4)
281 result_iter = p.imap(passthru, [])
282 self.assertRaises(StopIteration, result_iter.next)
284 def test_imap_nonefunc(self):
285 p = greenpool.GreenPool(4)
286 result_list = list(p.imap(None, range(10)))
287 self.assertEqual(result_list, [(x,) for x in range(10)])
289 def test_imap_multi_args(self):
290 p = greenpool.GreenPool(4)
291 result_list = list(p.imap(passthru2, range(10), range(10, 20)))
292 self.assertEqual(result_list, list(zip(range(10), range(10, 20))))
294 def test_imap_raises(self):
295 # testing the case where the function raises an exception;
296 # both that the caller sees that exception, and that the iterator
297 # continues to be usable to get the rest of the items
298 p = greenpool.GreenPool(4)
301 if item == 1 or item == 7:
302 raise RuntimeError("intentional error")
306 it = p.imap(raiser, range(10))
310 results.append(six.next(it))
313 except StopIteration:
315 self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9])
317 def test_starmap(self):
318 p = greenpool.GreenPool(4)
319 result_list = list(p.starmap(passthru, [(x,) for x in range(10)]))
320 self.assertEqual(result_list, list(range(10)))
322 def test_waitall_on_nothing(self):
323 p = greenpool.GreenPool()
326 def test_recursive_waitall(self):
327 p = greenpool.GreenPool()
328 gt = p.spawn(p.waitall)
329 self.assertRaises(AssertionError, gt.wait)
332 class GreenPile(tests.LimitedTestCase):
334 p = greenpool.GreenPile(4)
337 result_list = list(p)
338 self.assertEqual(result_list, list(range(10)))
340 def test_pile_spawn_times_out(self):
341 p = greenpool.GreenPile(4)
344 # now it should be full and this should time out
346 self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out")
347 # verify that the spawn breakage didn't interrupt the sequence
348 # and terminates properly
349 for i in range(4, 10):
351 self.assertEqual(list(p), list(range(10)))
353 def test_constructing_from_pool(self):
354 pool = greenpool.GreenPool(2)
355 pile1 = greenpool.GreenPile(pool)
356 pile2 = greenpool.GreenPile(pool)
358 def bunch_of_work(pile, unique):
360 pile.spawn(passthru, i + unique)
362 eventlet.spawn(bunch_of_work, pile1, 0)
363 eventlet.spawn(bunch_of_work, pile2, 100)
365 self.assertEqual(list(pile2), list(range(100, 110)))
366 self.assertEqual(list(pile1), list(range(10)))
369 class StressException(Exception):
376 while r.random() < 0.5:
377 eventlet.sleep(r.random() * 0.001)
381 raise StressException(arg)
385 while r.random() < 0.5:
386 eventlet.sleep(r.random() * 0.001)
390 class Stress(tests.LimitedTestCase):
391 # tests will take extra-long
394 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
395 def spawn_order_check(self, concurrency):
396 # checks that piles are strictly ordered
397 p = greenpool.GreenPile(concurrency)
399 def makework(count, unique):
400 for i in six.moves.range(count):
402 p.spawn(pressure, token)
405 eventlet.spawn(makework, iters, 1)
406 eventlet.spawn(makework, iters, 2)
407 eventlet.spawn(makework, iters, 3)
408 p.spawn(pressure, (0, 0))
415 except StressException as exc:
417 except StopIteration:
420 if received % 5 == 0:
421 eventlet.sleep(0.0001)
423 assert latest[unique] < order
424 latest[unique] = order
426 self.assertEqual(l, iters - 1)
428 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
429 def test_ordering_5(self):
430 self.spawn_order_check(5)
432 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
433 def test_ordering_50(self):
434 self.spawn_order_check(50)
436 def imap_memory_check(self, concurrency):
437 # checks that imap is strictly
438 # ordered and consumes a constant amount of memory
439 p = greenpool.GreenPool(concurrency)
441 it = p.imap(passthru, six.moves.range(count))
446 except StopIteration:
451 initial_obj_count = len(gc.get_objects())
455 eventlet.sleep(0.001)
458 objs_created = len(gc.get_objects()) - initial_obj_count
459 assert objs_created < 25 * concurrency, objs_created
460 # make sure we got to the end
461 self.assertEqual(latest, count - 1)
463 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
464 def test_imap_50(self):
465 self.imap_memory_check(50)
467 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
468 def test_imap_500(self):
469 self.imap_memory_check(500)
471 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
472 def test_with_intpool(self):
473 class IntPool(pools.Pool):
475 self.current_integer = getattr(self, 'current_integer', 0) + 1
476 return self.current_integer
478 def subtest(intpool_size, pool_size, num_executes):
480 token = int_pool.get()
481 eventlet.sleep(0.0001)
485 int_pool = IntPool(max_size=intpool_size)
486 pool = greenpool.GreenPool(pool_size)
487 for ix in six.moves.range(num_executes):
488 pool.spawn(run, int_pool)
493 for isize in (10, 20, 30, 40, 50):
494 for psize in (5, 25, 35, 50):
495 subtest(isize, psize, psize)