3 warnings.simplefilter('ignore', DeprecationWarning)
4 from eventlet import pool, coros, api, hubs, timeout
5 warnings.simplefilter('default', DeprecationWarning)
6 from eventlet import event as _event
7 from eventlet.support import six
8 from tests import LimitedTestCase
9 from unittest import main
12 class TestCoroutinePool(LimitedTestCase):
15 def test_execute_async(self):
20 pool = self.klass(0, 2)
21 pool.execute_async(some_work)
24 def test_execute(self):
25 value = 'return value'
29 pool = self.klass(0, 2)
30 worker = pool.execute(some_work)
31 self.assertEqual(value, worker.wait())
33 def test_waiting(self):
34 pool = self.klass(0, 1)
41 evt = pool.execute(consume)
45 waiters.append(eventlet.spawn(waiter, pool))
47 self.assertEqual(pool.waiting(), 0)
48 waiters.append(eventlet.spawn(waiter, pool))
50 self.assertEqual(pool.waiting(), 1)
51 waiters.append(eventlet.spawn(waiter, pool))
53 self.assertEqual(pool.waiting(), 2)
57 self.assertEqual(pool.waiting(), 0)
59 def test_multiple_coros(self):
64 results.append('prod')
68 results.append('cons1')
70 results.append('cons2')
72 pool = self.klass(0, 2)
73 done = pool.execute(consumer)
74 pool.execute_async(producer)
76 self.assertEqual(['cons1', 'prod', 'cons2'], results)
78 def test_timer_cancel(self):
79 # this test verifies that local timers are not fired
80 # outside of the context of the execute method
84 timer_fired.append(True)
87 hubs.get_hub().schedule_call_local(0, fire_timer)
88 pool = self.klass(0, 2)
89 worker = pool.execute(some_work)
92 self.assertEqual(timer_fired, [])
94 def test_reentrant(self):
95 pool = self.klass(0, 1)
98 waiter = pool.execute(lambda a: a, 'reenter')
99 self.assertEqual('reenter', waiter.wait())
101 outer_waiter = pool.execute(reenter)
107 pool.execute_async(lambda a: a, 'reenter')
110 pool.execute_async(reenter_async)
113 def assert_pool_has_free(self, pool, num_free):
114 def wait_long_time(e):
116 timer = timeout.Timeout(1, api.TimeoutError)
119 for x in six.moves.range(num_free):
120 pool.execute(wait_long_time, evt)
121 # if the pool has fewer free than we expect,
122 # then we'll hit the timeout error
126 # if the runtime error is not raised it means the pool had
127 # some unexpected free items
128 timer = timeout.Timeout(0, RuntimeError)
129 self.assertRaises(RuntimeError, pool.execute, wait_long_time, evt)
131 # clean up by causing all the wait_long_time functions to return
136 def test_resize(self):
137 pool = self.klass(max_size=2)
140 def wait_long_time(e):
142 pool.execute(wait_long_time, evt)
143 pool.execute(wait_long_time, evt)
144 self.assertEqual(pool.free(), 0)
145 self.assert_pool_has_free(pool, 0)
147 # verify that the pool discards excess items put into it
150 # cause the wait_long_time functions to return, which will
151 # trigger puts to the pool
156 self.assertEqual(pool.free(), 1)
157 self.assert_pool_has_free(pool, 1)
159 # resize larger and assert that there are more free items
161 self.assertEqual(pool.free(), 2)
162 self.assert_pool_has_free(pool, 2)
164 def test_stderr_raising(self):
165 # testing that really egregious errors in the error handling code
166 # (that prints tracebacks to stderr) don't cause the pool to lose
169 pool = self.klass(min_size=1, max_size=1)
171 def crash(*args, **kw):
172 raise RuntimeError("Whoa")
174 class FakeFile(object):
177 # we're going to do this by causing the traceback.print_exc in
178 # safe_apply to raise an exception and thus exit _main_loop
179 normal_err = sys.stderr
181 sys.stderr = FakeFile()
182 waiter = pool.execute(crash)
183 self.assertRaises(RuntimeError, waiter.wait)
184 # the pool should have something free at this point since the
186 # pool.Pool change: if an exception is raised during execution of a link,
187 # the rest of the links are scheduled to be executed on the next hub iteration
188 # this introduces a delay in updating pool.sem which makes pool.free() report 0
191 self.assertEqual(pool.free(), 1)
192 # shouldn't block when trying to get
193 t = timeout.Timeout(0.1)
195 pool.execute(api.sleep, 1)
199 sys.stderr = normal_err
201 def test_track_events(self):
202 pool = self.klass(track_events=True)
204 pool.execute(lambda n: n, x)
208 def test_track_slow_event(self):
209 pool = self.klass(track_events=True)
215 self.assertEqual(pool.wait(), 'ok')
217 def test_pool_smash(self):
218 # The premise is that a coroutine in a Pool tries to get a token out
219 # of a token pool but times out before getting the token. We verify
220 # that neither pool is adversely affected by this situation.
221 from eventlet import pools
222 pool = self.klass(min_size=1, max_size=1)
223 tp = pools.TokenPool(max_size=1)
224 token = tp.get() # empty pool
227 timeout.Timeout(0, RuntimeError())
230 self.fail("Shouldn't have recieved anything from the pool")
234 # the execute makes the token pool expect that coroutine, but then
235 # immediately cuts bait
236 e1 = pool.execute(do_receive, tp)
237 self.assertEqual(e1.wait(), 'timed out')
239 # the pool can get some random item back
242 api.spawn(send_wakeup, tp)
244 # now we ask the pool to run something else, which should not
245 # be affected by the previous send at all
248 e2 = pool.execute(resume)
249 self.assertEqual(e2.wait(), 'resumed')
251 # we should be able to get out the thing we put in there, too
252 self.assertEqual(tp.get(), 'wakeup')
255 class PoolBasicTests(LimitedTestCase):
258 def test_execute_async(self):
259 p = self.klass(max_size=2)
260 self.assertEqual(p.free(), 2)
265 evt = p.execute(foo, 1)
266 self.assertEqual(p.free(), 1)
268 self.assertEqual(r, [1])
270 self.assertEqual(p.free(), 2)
272 # Once the pool is exhausted, calling an execute forces a yield.
274 p.execute_async(foo, 2)
275 self.assertEqual(1, p.free())
276 self.assertEqual(r, [1])
278 p.execute_async(foo, 3)
279 self.assertEqual(0, p.free())
280 self.assertEqual(r, [1])
282 p.execute_async(foo, 4)
283 self.assertEqual(r, [1, 2, 3])
285 self.assertEqual(r, [1, 2, 3, 4])
287 def test_execute(self):
289 evt = p.execute(lambda a: ('foo', a), 1)
290 self.assertEqual(evt.wait(), ('foo', 1))
292 def test_with_intpool(self):
293 from eventlet import pools
295 class IntPool(pools.Pool):
297 self.current_integer = getattr(self, 'current_integer', 0) + 1
298 return self.current_integer
300 def subtest(intpool_size, pool_size, num_executes):
302 token = int_pool.get()
307 int_pool = IntPool(max_size=intpool_size)
308 pool = self.klass(max_size=pool_size)
309 for ix in six.moves.range(num_executes):
310 pool.execute(run, int_pool)
315 for isize in (20, 30, 40, 50):
316 for psize in (25, 35, 50):
317 subtest(isize, psize, psize)
320 if __name__ == '__main__':