X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;ds=sidebyside;f=eventlet%2Ftests%2Fgreenpool_test.py;fp=eventlet%2Ftests%2Fgreenpool_test.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=069ed5a14b956b41065872cf634740d2d1a294fe;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/greenpool_test.py b/eventlet/tests/greenpool_test.py deleted file mode 100644 index 069ed5a..0000000 --- a/eventlet/tests/greenpool_test.py +++ /dev/null @@ -1,495 +0,0 @@ -import gc -import os -import random - -import eventlet -from eventlet import hubs, greenpool, event, pools -from eventlet.support import greenlets as greenlet, six -import tests - - -def passthru(a): - eventlet.sleep(0.01) - return a - - -def passthru2(a, b): - eventlet.sleep(0.01) - return a, b - - -def raiser(exc): - raise exc - - -class GreenPool(tests.LimitedTestCase): - def test_spawn(self): - p = greenpool.GreenPool(4) - waiters = [] - for i in range(10): - waiters.append(p.spawn(passthru, i)) - results = [waiter.wait() for waiter in waiters] - self.assertEqual(results, list(range(10))) - - def test_spawn_n(self): - p = greenpool.GreenPool(4) - results_closure = [] - - def do_something(a): - eventlet.sleep(0.01) - results_closure.append(a) - - for i in range(10): - p.spawn(do_something, i) - p.waitall() - self.assertEqual(results_closure, list(range(10))) - - def test_waiting(self): - pool = greenpool.GreenPool(1) - done = event.Event() - - def consume(): - done.wait() - - def waiter(pool): - gt = pool.spawn(consume) - gt.wait() - - waiters = [] - self.assertEqual(pool.running(), 0) - waiters.append(eventlet.spawn(waiter, pool)) - eventlet.sleep(0) - self.assertEqual(pool.waiting(), 0) - waiters.append(eventlet.spawn(waiter, pool)) - eventlet.sleep(0) - self.assertEqual(pool.waiting(), 1) - waiters.append(eventlet.spawn(waiter, pool)) - eventlet.sleep(0) - self.assertEqual(pool.waiting(), 2) - self.assertEqual(pool.running(), 1) - done.send(None) - for w in waiters: - w.wait() - self.assertEqual(pool.waiting(), 0) - self.assertEqual(pool.running(), 0) - - def test_multiple_coros(self): - evt = event.Event() - results = [] - - def producer(): - results.append('prod') - evt.send() - - def consumer(): - results.append('cons1') - evt.wait() - results.append('cons2') - - pool = greenpool.GreenPool(2) - done = pool.spawn(consumer) - pool.spawn_n(producer) - done.wait() - self.assertEqual(['cons1', 'prod', 'cons2'], results) - - def test_timer_cancel(self): - # this test verifies that local timers are not fired - # outside of the context of the spawn - timer_fired = [] - - def fire_timer(): - timer_fired.append(True) - - def some_work(): - hubs.get_hub().schedule_call_local(0, fire_timer) - - pool = greenpool.GreenPool(2) - worker = pool.spawn(some_work) - worker.wait() - eventlet.sleep(0) - eventlet.sleep(0) - self.assertEqual(timer_fired, []) - - def test_reentrant(self): - pool = greenpool.GreenPool(1) - - def reenter(): - waiter = pool.spawn(lambda a: a, 'reenter') - self.assertEqual('reenter', waiter.wait()) - - outer_waiter = pool.spawn(reenter) - outer_waiter.wait() - - evt = event.Event() - - def reenter_async(): - pool.spawn_n(lambda a: a, 'reenter') - evt.send('done') - - pool.spawn_n(reenter_async) - self.assertEqual('done', evt.wait()) - - def assert_pool_has_free(self, pool, num_free): - self.assertEqual(pool.free(), num_free) - - def wait_long_time(e): - e.wait() - - timer = eventlet.Timeout(1) - try: - evt = event.Event() - for x in six.moves.range(num_free): - pool.spawn(wait_long_time, evt) - # if the pool has fewer free than we expect, - # then we'll hit the timeout error - finally: - timer.cancel() - - # if the runtime error is not raised it means the pool had - # some unexpected free items - timer = eventlet.Timeout(0, RuntimeError) - try: - self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt) - finally: - timer.cancel() - - # clean up by causing all the wait_long_time functions to return - evt.send(None) - eventlet.sleep(0) - eventlet.sleep(0) - - def test_resize(self): - pool = greenpool.GreenPool(2) - evt = event.Event() - - def wait_long_time(e): - e.wait() - - pool.spawn(wait_long_time, evt) - pool.spawn(wait_long_time, evt) - self.assertEqual(pool.free(), 0) - self.assertEqual(pool.running(), 2) - self.assert_pool_has_free(pool, 0) - - # verify that the pool discards excess items put into it - pool.resize(1) - - # cause the wait_long_time functions to return, which will - # trigger puts to the pool - evt.send(None) - eventlet.sleep(0) - eventlet.sleep(0) - - self.assertEqual(pool.free(), 1) - self.assertEqual(pool.running(), 0) - self.assert_pool_has_free(pool, 1) - - # resize larger and assert that there are more free items - pool.resize(2) - self.assertEqual(pool.free(), 2) - self.assertEqual(pool.running(), 0) - self.assert_pool_has_free(pool, 2) - - def test_pool_smash(self): - # The premise is that a coroutine in a Pool tries to get a token out - # of a token pool but times out before getting the token. We verify - # that neither pool is adversely affected by this situation. - pool = greenpool.GreenPool(1) - tp = pools.TokenPool(max_size=1) - tp.get() # empty out the pool - - def do_receive(tp): - timer = eventlet.Timeout(0, RuntimeError()) - try: - tp.get() - self.fail("Shouldn't have recieved anything from the pool") - except RuntimeError: - return 'timed out' - else: - timer.cancel() - - # the spawn makes the token pool expect that coroutine, but then - # immediately cuts bait - e1 = pool.spawn(do_receive, tp) - self.assertEqual(e1.wait(), 'timed out') - - # the pool can get some random item back - def send_wakeup(tp): - tp.put('wakeup') - gt = eventlet.spawn(send_wakeup, tp) - - # now we ask the pool to run something else, which should not - # be affected by the previous send at all - def resume(): - return 'resumed' - e2 = pool.spawn(resume) - self.assertEqual(e2.wait(), 'resumed') - - # we should be able to get out the thing we put in there, too - self.assertEqual(tp.get(), 'wakeup') - gt.wait() - - def test_spawn_n_2(self): - p = greenpool.GreenPool(2) - self.assertEqual(p.free(), 2) - r = [] - - def foo(a): - r.append(a) - - gt = p.spawn(foo, 1) - self.assertEqual(p.free(), 1) - gt.wait() - self.assertEqual(r, [1]) - eventlet.sleep(0) - self.assertEqual(p.free(), 2) - - # Once the pool is exhausted, spawning forces a yield. - p.spawn_n(foo, 2) - self.assertEqual(1, p.free()) - self.assertEqual(r, [1]) - - p.spawn_n(foo, 3) - self.assertEqual(0, p.free()) - self.assertEqual(r, [1]) - - p.spawn_n(foo, 4) - self.assertEqual(set(r), set([1, 2, 3])) - eventlet.sleep(0) - self.assertEqual(set(r), set([1, 2, 3, 4])) - - def test_exceptions(self): - p = greenpool.GreenPool(2) - for m in (p.spawn, p.spawn_n): - self.assert_pool_has_free(p, 2) - m(raiser, RuntimeError()) - self.assert_pool_has_free(p, 1) - p.waitall() - self.assert_pool_has_free(p, 2) - m(raiser, greenlet.GreenletExit) - self.assert_pool_has_free(p, 1) - p.waitall() - self.assert_pool_has_free(p, 2) - - def test_imap(self): - p = greenpool.GreenPool(4) - result_list = list(p.imap(passthru, range(10))) - self.assertEqual(result_list, list(range(10))) - - def test_empty_imap(self): - p = greenpool.GreenPool(4) - result_iter = p.imap(passthru, []) - self.assertRaises(StopIteration, result_iter.next) - - def test_imap_nonefunc(self): - p = greenpool.GreenPool(4) - result_list = list(p.imap(None, range(10))) - self.assertEqual(result_list, [(x,) for x in range(10)]) - - def test_imap_multi_args(self): - p = greenpool.GreenPool(4) - result_list = list(p.imap(passthru2, range(10), range(10, 20))) - self.assertEqual(result_list, list(zip(range(10), range(10, 20)))) - - def test_imap_raises(self): - # testing the case where the function raises an exception; - # both that the caller sees that exception, and that the iterator - # continues to be usable to get the rest of the items - p = greenpool.GreenPool(4) - - def raiser(item): - if item == 1 or item == 7: - raise RuntimeError("intentional error") - else: - return item - - it = p.imap(raiser, range(10)) - results = [] - while True: - try: - results.append(six.next(it)) - except RuntimeError: - results.append('r') - except StopIteration: - break - self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9]) - - def test_starmap(self): - p = greenpool.GreenPool(4) - result_list = list(p.starmap(passthru, [(x,) for x in range(10)])) - self.assertEqual(result_list, list(range(10))) - - def test_waitall_on_nothing(self): - p = greenpool.GreenPool() - p.waitall() - - def test_recursive_waitall(self): - p = greenpool.GreenPool() - gt = p.spawn(p.waitall) - self.assertRaises(AssertionError, gt.wait) - - -class GreenPile(tests.LimitedTestCase): - def test_pile(self): - p = greenpool.GreenPile(4) - for i in range(10): - p.spawn(passthru, i) - result_list = list(p) - self.assertEqual(result_list, list(range(10))) - - def test_pile_spawn_times_out(self): - p = greenpool.GreenPile(4) - for i in range(4): - p.spawn(passthru, i) - # now it should be full and this should time out - eventlet.Timeout(0) - self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out") - # verify that the spawn breakage didn't interrupt the sequence - # and terminates properly - for i in range(4, 10): - p.spawn(passthru, i) - self.assertEqual(list(p), list(range(10))) - - def test_constructing_from_pool(self): - pool = greenpool.GreenPool(2) - pile1 = greenpool.GreenPile(pool) - pile2 = greenpool.GreenPile(pool) - - def bunch_of_work(pile, unique): - for i in range(10): - pile.spawn(passthru, i + unique) - - eventlet.spawn(bunch_of_work, pile1, 0) - eventlet.spawn(bunch_of_work, pile2, 100) - eventlet.sleep(0) - self.assertEqual(list(pile2), list(range(100, 110))) - self.assertEqual(list(pile1), list(range(10))) - - -class StressException(Exception): - pass - -r = random.Random(0) - - -def pressure(arg): - while r.random() < 0.5: - eventlet.sleep(r.random() * 0.001) - if r.random() < 0.8: - return arg - else: - raise StressException(arg) - - -def passthru(arg): - while r.random() < 0.5: - eventlet.sleep(r.random() * 0.001) - return arg - - -class Stress(tests.LimitedTestCase): - # tests will take extra-long - TEST_TIMEOUT = 60 - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def spawn_order_check(self, concurrency): - # checks that piles are strictly ordered - p = greenpool.GreenPile(concurrency) - - def makework(count, unique): - for i in six.moves.range(count): - token = (unique, i) - p.spawn(pressure, token) - - iters = 1000 - eventlet.spawn(makework, iters, 1) - eventlet.spawn(makework, iters, 2) - eventlet.spawn(makework, iters, 3) - p.spawn(pressure, (0, 0)) - latest = [-1] * 4 - received = 0 - it = iter(p) - while True: - try: - i = six.next(it) - except StressException as exc: - i = exc.args[0] - except StopIteration: - break - received += 1 - if received % 5 == 0: - eventlet.sleep(0.0001) - unique, order = i - assert latest[unique] < order - latest[unique] = order - for l in latest[1:]: - self.assertEqual(l, iters - 1) - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def test_ordering_5(self): - self.spawn_order_check(5) - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def test_ordering_50(self): - self.spawn_order_check(50) - - def imap_memory_check(self, concurrency): - # checks that imap is strictly - # ordered and consumes a constant amount of memory - p = greenpool.GreenPool(concurrency) - count = 1000 - it = p.imap(passthru, six.moves.range(count)) - latest = -1 - while True: - try: - i = six.next(it) - except StopIteration: - break - - if latest == -1: - gc.collect() - initial_obj_count = len(gc.get_objects()) - assert i > latest - latest = i - if latest % 5 == 0: - eventlet.sleep(0.001) - if latest % 10 == 0: - gc.collect() - objs_created = len(gc.get_objects()) - initial_obj_count - assert objs_created < 25 * concurrency, objs_created - # make sure we got to the end - self.assertEqual(latest, count - 1) - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def test_imap_50(self): - self.imap_memory_check(50) - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def test_imap_500(self): - self.imap_memory_check(500) - - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') - def test_with_intpool(self): - class IntPool(pools.Pool): - def create(self): - self.current_integer = getattr(self, 'current_integer', 0) + 1 - return self.current_integer - - def subtest(intpool_size, pool_size, num_executes): - def run(int_pool): - token = int_pool.get() - eventlet.sleep(0.0001) - int_pool.put(token) - return token - - int_pool = IntPool(max_size=intpool_size) - pool = greenpool.GreenPool(pool_size) - for ix in six.moves.range(num_executes): - pool.spawn(run, int_pool) - pool.waitall() - - subtest(4, 7, 7) - subtest(50, 75, 100) - for isize in (10, 20, 30, 40, 50): - for psize in (5, 25, 35, 50): - subtest(isize, psize, psize)