import gc import os import random import eventlet from eventlet import hubs, greenpool, event, pools from eventlet.support import greenlets as greenlet, six import tests def passthru(a): eventlet.sleep(0.01) return a def passthru2(a, b): eventlet.sleep(0.01) return a, b def raiser(exc): raise exc class GreenPool(tests.LimitedTestCase): def test_spawn(self): p = greenpool.GreenPool(4) waiters = [] for i in range(10): waiters.append(p.spawn(passthru, i)) results = [waiter.wait() for waiter in waiters] self.assertEqual(results, list(range(10))) def test_spawn_n(self): p = greenpool.GreenPool(4) results_closure = [] def do_something(a): eventlet.sleep(0.01) results_closure.append(a) for i in range(10): p.spawn(do_something, i) p.waitall() self.assertEqual(results_closure, list(range(10))) def test_waiting(self): pool = greenpool.GreenPool(1) done = event.Event() def consume(): done.wait() def waiter(pool): gt = pool.spawn(consume) gt.wait() waiters = [] self.assertEqual(pool.running(), 0) waiters.append(eventlet.spawn(waiter, pool)) eventlet.sleep(0) self.assertEqual(pool.waiting(), 0) waiters.append(eventlet.spawn(waiter, pool)) eventlet.sleep(0) self.assertEqual(pool.waiting(), 1) waiters.append(eventlet.spawn(waiter, pool)) eventlet.sleep(0) self.assertEqual(pool.waiting(), 2) self.assertEqual(pool.running(), 1) done.send(None) for w in waiters: w.wait() self.assertEqual(pool.waiting(), 0) self.assertEqual(pool.running(), 0) def test_multiple_coros(self): evt = event.Event() results = [] def producer(): results.append('prod') evt.send() def consumer(): results.append('cons1') evt.wait() results.append('cons2') pool = greenpool.GreenPool(2) done = pool.spawn(consumer) pool.spawn_n(producer) done.wait() self.assertEqual(['cons1', 'prod', 'cons2'], results) def test_timer_cancel(self): # this test verifies that local timers are not fired # outside of the context of the spawn timer_fired = [] def fire_timer(): timer_fired.append(True) def some_work(): hubs.get_hub().schedule_call_local(0, fire_timer) pool = greenpool.GreenPool(2) worker = pool.spawn(some_work) worker.wait() eventlet.sleep(0) eventlet.sleep(0) self.assertEqual(timer_fired, []) def test_reentrant(self): pool = greenpool.GreenPool(1) def reenter(): waiter = pool.spawn(lambda a: a, 'reenter') self.assertEqual('reenter', waiter.wait()) outer_waiter = pool.spawn(reenter) outer_waiter.wait() evt = event.Event() def reenter_async(): pool.spawn_n(lambda a: a, 'reenter') evt.send('done') pool.spawn_n(reenter_async) self.assertEqual('done', evt.wait()) def assert_pool_has_free(self, pool, num_free): self.assertEqual(pool.free(), num_free) def wait_long_time(e): e.wait() timer = eventlet.Timeout(1) try: evt = event.Event() for x in six.moves.range(num_free): pool.spawn(wait_long_time, evt) # if the pool has fewer free than we expect, # then we'll hit the timeout error finally: timer.cancel() # if the runtime error is not raised it means the pool had # some unexpected free items timer = eventlet.Timeout(0, RuntimeError) try: self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt) finally: timer.cancel() # clean up by causing all the wait_long_time functions to return evt.send(None) eventlet.sleep(0) eventlet.sleep(0) def test_resize(self): pool = greenpool.GreenPool(2) evt = event.Event() def wait_long_time(e): e.wait() pool.spawn(wait_long_time, evt) pool.spawn(wait_long_time, evt) self.assertEqual(pool.free(), 0) self.assertEqual(pool.running(), 2) self.assert_pool_has_free(pool, 0) # verify that the pool discards excess items put into it pool.resize(1) # cause the wait_long_time functions to return, which will # trigger puts to the pool evt.send(None) eventlet.sleep(0) eventlet.sleep(0) self.assertEqual(pool.free(), 1) self.assertEqual(pool.running(), 0) self.assert_pool_has_free(pool, 1) # resize larger and assert that there are more free items pool.resize(2) self.assertEqual(pool.free(), 2) self.assertEqual(pool.running(), 0) self.assert_pool_has_free(pool, 2) def test_pool_smash(self): # The premise is that a coroutine in a Pool tries to get a token out # of a token pool but times out before getting the token. We verify # that neither pool is adversely affected by this situation. pool = greenpool.GreenPool(1) tp = pools.TokenPool(max_size=1) tp.get() # empty out the pool def do_receive(tp): timer = eventlet.Timeout(0, RuntimeError()) try: tp.get() self.fail("Shouldn't have recieved anything from the pool") except RuntimeError: return 'timed out' else: timer.cancel() # the spawn makes the token pool expect that coroutine, but then # immediately cuts bait e1 = pool.spawn(do_receive, tp) self.assertEqual(e1.wait(), 'timed out') # the pool can get some random item back def send_wakeup(tp): tp.put('wakeup') gt = eventlet.spawn(send_wakeup, tp) # now we ask the pool to run something else, which should not # be affected by the previous send at all def resume(): return 'resumed' e2 = pool.spawn(resume) self.assertEqual(e2.wait(), 'resumed') # we should be able to get out the thing we put in there, too self.assertEqual(tp.get(), 'wakeup') gt.wait() def test_spawn_n_2(self): p = greenpool.GreenPool(2) self.assertEqual(p.free(), 2) r = [] def foo(a): r.append(a) gt = p.spawn(foo, 1) self.assertEqual(p.free(), 1) gt.wait() self.assertEqual(r, [1]) eventlet.sleep(0) self.assertEqual(p.free(), 2) # Once the pool is exhausted, spawning forces a yield. p.spawn_n(foo, 2) self.assertEqual(1, p.free()) self.assertEqual(r, [1]) p.spawn_n(foo, 3) self.assertEqual(0, p.free()) self.assertEqual(r, [1]) p.spawn_n(foo, 4) self.assertEqual(set(r), set([1, 2, 3])) eventlet.sleep(0) self.assertEqual(set(r), set([1, 2, 3, 4])) def test_exceptions(self): p = greenpool.GreenPool(2) for m in (p.spawn, p.spawn_n): self.assert_pool_has_free(p, 2) m(raiser, RuntimeError()) self.assert_pool_has_free(p, 1) p.waitall() self.assert_pool_has_free(p, 2) m(raiser, greenlet.GreenletExit) self.assert_pool_has_free(p, 1) p.waitall() self.assert_pool_has_free(p, 2) def test_imap(self): p = greenpool.GreenPool(4) result_list = list(p.imap(passthru, range(10))) self.assertEqual(result_list, list(range(10))) def test_empty_imap(self): p = greenpool.GreenPool(4) result_iter = p.imap(passthru, []) self.assertRaises(StopIteration, result_iter.next) def test_imap_nonefunc(self): p = greenpool.GreenPool(4) result_list = list(p.imap(None, range(10))) self.assertEqual(result_list, [(x,) for x in range(10)]) def test_imap_multi_args(self): p = greenpool.GreenPool(4) result_list = list(p.imap(passthru2, range(10), range(10, 20))) self.assertEqual(result_list, list(zip(range(10), range(10, 20)))) def test_imap_raises(self): # testing the case where the function raises an exception; # both that the caller sees that exception, and that the iterator # continues to be usable to get the rest of the items p = greenpool.GreenPool(4) def raiser(item): if item == 1 or item == 7: raise RuntimeError("intentional error") else: return item it = p.imap(raiser, range(10)) results = [] while True: try: results.append(six.next(it)) except RuntimeError: results.append('r') except StopIteration: break self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9]) def test_starmap(self): p = greenpool.GreenPool(4) result_list = list(p.starmap(passthru, [(x,) for x in range(10)])) self.assertEqual(result_list, list(range(10))) def test_waitall_on_nothing(self): p = greenpool.GreenPool() p.waitall() def test_recursive_waitall(self): p = greenpool.GreenPool() gt = p.spawn(p.waitall) self.assertRaises(AssertionError, gt.wait) class GreenPile(tests.LimitedTestCase): def test_pile(self): p = greenpool.GreenPile(4) for i in range(10): p.spawn(passthru, i) result_list = list(p) self.assertEqual(result_list, list(range(10))) def test_pile_spawn_times_out(self): p = greenpool.GreenPile(4) for i in range(4): p.spawn(passthru, i) # now it should be full and this should time out eventlet.Timeout(0) self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out") # verify that the spawn breakage didn't interrupt the sequence # and terminates properly for i in range(4, 10): p.spawn(passthru, i) self.assertEqual(list(p), list(range(10))) def test_constructing_from_pool(self): pool = greenpool.GreenPool(2) pile1 = greenpool.GreenPile(pool) pile2 = greenpool.GreenPile(pool) def bunch_of_work(pile, unique): for i in range(10): pile.spawn(passthru, i + unique) eventlet.spawn(bunch_of_work, pile1, 0) eventlet.spawn(bunch_of_work, pile2, 100) eventlet.sleep(0) self.assertEqual(list(pile2), list(range(100, 110))) self.assertEqual(list(pile1), list(range(10))) class StressException(Exception): pass r = random.Random(0) def pressure(arg): while r.random() < 0.5: eventlet.sleep(r.random() * 0.001) if r.random() < 0.8: return arg else: raise StressException(arg) def passthru(arg): while r.random() < 0.5: eventlet.sleep(r.random() * 0.001) return arg class Stress(tests.LimitedTestCase): # tests will take extra-long TEST_TIMEOUT = 60 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def spawn_order_check(self, concurrency): # checks that piles are strictly ordered p = greenpool.GreenPile(concurrency) def makework(count, unique): for i in six.moves.range(count): token = (unique, i) p.spawn(pressure, token) iters = 1000 eventlet.spawn(makework, iters, 1) eventlet.spawn(makework, iters, 2) eventlet.spawn(makework, iters, 3) p.spawn(pressure, (0, 0)) latest = [-1] * 4 received = 0 it = iter(p) while True: try: i = six.next(it) except StressException as exc: i = exc.args[0] except StopIteration: break received += 1 if received % 5 == 0: eventlet.sleep(0.0001) unique, order = i assert latest[unique] < order latest[unique] = order for l in latest[1:]: self.assertEqual(l, iters - 1) @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_ordering_5(self): self.spawn_order_check(5) @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_ordering_50(self): self.spawn_order_check(50) def imap_memory_check(self, concurrency): # checks that imap is strictly # ordered and consumes a constant amount of memory p = greenpool.GreenPool(concurrency) count = 1000 it = p.imap(passthru, six.moves.range(count)) latest = -1 while True: try: i = six.next(it) except StopIteration: break if latest == -1: gc.collect() initial_obj_count = len(gc.get_objects()) assert i > latest latest = i if latest % 5 == 0: eventlet.sleep(0.001) if latest % 10 == 0: gc.collect() objs_created = len(gc.get_objects()) - initial_obj_count assert objs_created < 25 * concurrency, objs_created # make sure we got to the end self.assertEqual(latest, count - 1) @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_imap_50(self): self.imap_memory_check(50) @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_imap_500(self): self.imap_memory_check(500) @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_with_intpool(self): class IntPool(pools.Pool): def create(self): self.current_integer = getattr(self, 'current_integer', 0) + 1 return self.current_integer def subtest(intpool_size, pool_size, num_executes): def run(int_pool): token = int_pool.get() eventlet.sleep(0.0001) int_pool.put(token) return token int_pool = IntPool(max_size=intpool_size) pool = greenpool.GreenPool(pool_size) for ix in six.moves.range(num_executes): pool.spawn(run, int_pool) pool.waitall() subtest(4, 7, 7) subtest(50, 75, 100) for isize in (10, 20, 30, 40, 50): for psize in (5, 25, 35, 50): subtest(isize, psize, psize)