1 from __future__ import print_function
3 from eventlet import coros, proc, api
4 from eventlet.semaphore import Semaphore
5 from eventlet.support import six
9 "The pool module is deprecated. Please use the "
10 "eventlet.GreenPool and eventlet.GreenPile classes instead.",
11 DeprecationWarning, stacklevel=2)
15 def __init__(self, min_size=0, max_size=4, track_events=False):
16 if min_size > max_size:
17 raise ValueError('min_size cannot be bigger than max_size')
18 self.max_size = max_size
19 self.sem = Semaphore(max_size)
20 self.procs = proc.RunningProcSet()
22 self.results = coros.queue()
26 def resize(self, new_max_size):
27 """ Change the :attr:`max_size` of the pool.
29 If the pool gets resized when there are more than *new_max_size*
30 coroutines checked out, when they are returned to the pool they will be
31 discarded. The return value of :meth:`free` will be negative in this
34 max_size_delta = new_max_size - self.max_size
35 self.sem.counter += max_size_delta
36 self.max_size = new_max_size
39 def current_size(self):
40 """ The number of coroutines that are currently executing jobs. """
41 return len(self.procs)
44 """ Returns the number of coroutines that are available for doing
46 return self.sem.counter
48 def execute(self, func, *args, **kwargs):
49 """Execute func in one of the coroutines maintained
50 by the pool, when one is free.
52 Immediately returns a :class:`~eventlet.proc.Proc` object which can be
53 queried for the func's result.
56 >>> task = pool.execute(lambda a: ('foo', a), 1)
60 # if reentering an empty pool, don't try to wait on a coroutine freeing
61 # itself -- instead, just execute in the current coroutine
62 if self.sem.locked() and api.getcurrent() in self.procs:
63 p = proc.spawn(func, *args, **kwargs)
70 p = self.procs.spawn(func, *args, **kwargs)
71 # assuming the above line cannot raise
72 p.link(lambda p: self.sem.release())
73 if self.results is not None:
77 execute_async = execute
79 def _execute(self, evt, func, args, kw):
80 p = self.execute(func, *args, **kw)
85 """ Calling this function blocks until every coroutine
86 completes its work (i.e. there are 0 running coroutines)."""
87 return self.procs.waitall()
92 """Wait for the next execute in the pool to complete,
93 and return the result."""
94 return self.results.wait()
97 """Return the number of coroutines waiting to execute.
99 if self.sem.balance < 0:
100 return -self.sem.balance
105 """ Kill every running coroutine as immediately as possible."""
106 return self.procs.killall()
108 def launch_all(self, function, iterable):
109 """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
110 in its own coroutine -- like ``itertools.starmap()``, but in parallel.
111 Discard values returned by ``function()``. You should call
112 ``wait_all()`` to wait for all coroutines, newly-launched plus any
113 previously-submitted :meth:`execute` or :meth:`execute_async` calls, to
118 ... print("I saw %s!" % x)
120 >>> pool.launch_all(saw, "ABC")
127 self.execute(function, *tup)
129 def process_all(self, function, iterable):
130 """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
131 in its own coroutine -- like ``itertools.starmap()``, but in parallel.
132 Discard values returned by ``function()``. Don't return until all
133 coroutines, newly-launched plus any previously-submitted :meth:`execute()`
134 or :meth:`execute_async` calls, have completed.
136 >>> from eventlet import coros
137 >>> pool = coros.CoroutinePool()
138 >>> def saw(x): print("I saw %s!" % x)
140 >>> pool.process_all(saw, "DEF")
145 self.launch_all(function, iterable)
148 def generate_results(self, function, iterable, qsize=None):
149 """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
150 in its own coroutine -- like ``itertools.starmap()``, but in parallel.
151 Yield each of the values returned by ``function()``, in the order
152 they're completed rather than the order the coroutines were launched.
154 Iteration stops when we've yielded results for each arguments tuple in
155 *iterable*. Unlike :meth:`wait_all` and :meth:`process_all`, this
156 function does not wait for any previously-submitted :meth:`execute` or
157 :meth:`execute_async` calls.
159 Results are temporarily buffered in a queue. If you pass *qsize=*, this
160 value is used to limit the max size of the queue: an attempt to buffer
161 too many results will suspend the completed :class:`CoroutinePool`
162 coroutine until the requesting coroutine (the caller of
163 :meth:`generate_results`) has retrieved one or more results by calling
164 this generator-iterator's ``next()``.
166 If any coroutine raises an uncaught exception, that exception will
167 propagate to the requesting coroutine via the corresponding ``next()``
170 What I particularly want these tests to illustrate is that using this
173 for result in generate_results(function, iterable):
174 # ... do something with result ...
177 executes coroutines at least as aggressively as the classic eventlet
180 events = [pool.execute(function, *args) for args in iterable]
182 result = event.wait()
183 # ... do something with result ...
185 even without a distinct event object for every arg tuple in *iterable*,
186 and despite the funny flow control from interleaving launches of new
187 coroutines with yields of completed coroutines' results.
189 (The use case that makes this function preferable to the classic idiom
190 above is when the *iterable*, which may itself be a generator, produces
193 >>> from eventlet import coros
194 >>> from eventlet.support import six
196 >>> pool = coros.CoroutinePool(max_size=5)
197 >>> pausers = [coros.Event() for x in range(2)]
198 >>> def longtask(evt, desc):
199 ... print("%s woke up with %s" % (desc, evt.wait()))
201 >>> pool.launch_all(longtask, zip(pausers, "AB"))
202 >>> def quicktask(desc):
203 ... print("returning %s" % desc)
207 (Instead of using a ``for`` loop, step through :meth:`generate_results`
208 items individually to illustrate timing)
210 >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
211 >>> print(six.next(step))
216 >>> print(six.next(step))
218 >>> print(six.next(step))
220 >>> print(six.next(step))
225 >>> pausers[0].send("A")
226 >>> print(six.next(step))
228 >>> print(six.next(step))
230 >>> print(six.next(step))
236 >>> print("".join([six.next(step) for x in range(3)]))
242 >>> pausers[1].send("B")
243 >>> print("".join([six.next(step) for x in range(4)]))
251 # Get an iterator because of our funny nested loop below. Wrap the
252 # iterable in enumerate() so we count items that come through.
253 tuples = iter(enumerate(iterable))
254 # If the iterable is empty, this whole function is a no-op, and we can
255 # save ourselves some grief by just quitting out. In particular, once
256 # we enter the outer loop below, we're going to wait on the queue --
257 # but if we launched no coroutines with that queue as the destination,
258 # we could end up waiting a very long time.
260 index, args = six.next(tuples)
261 except StopIteration:
263 # From this point forward, 'args' is the current arguments tuple and
264 # 'index+1' counts how many such tuples we've seen.
265 # This implementation relies on the fact that _execute() accepts an
266 # event-like object, and -- unless it's None -- the completed
267 # coroutine calls send(result). We slyly pass a queue rather than an
268 # event -- the same queue instance for all coroutines. This is why our
269 # queue interface intentionally resembles the event interface.
270 q = coros.queue(max_size=qsize)
271 # How many results have we yielded so far?
273 # This first loop is only until we've launched all the coroutines. Its
274 # complexity is because if iterable contains more args tuples than the
275 # size of our pool, attempting to _execute() the (poolsize+1)th
276 # coroutine would suspend until something completes and send()s its
277 # result to our queue. But to keep down queue overhead and to maximize
278 # responsiveness to our caller, we'd rather suspend on reading the
279 # queue. So we stuff the pool as full as we can, then wait for
280 # something to finish, then stuff more coroutines into the pool.
283 # Before each yield, start as many new coroutines as we can fit.
284 # (The self.free() test isn't 100% accurate: if we happen to be
285 # executing in one of the pool's coroutines, we could _execute()
286 # without waiting even if self.free() reports 0. See _execute().)
287 # The point is that we don't want to wait in the _execute() call,
288 # we want to wait in the q.wait() call.
289 # IMPORTANT: at start, and whenever we've caught up with all
290 # coroutines we've launched so far, we MUST iterate this inner
291 # loop at least once, regardless of self.free() -- otherwise the
292 # q.wait() call below will deadlock!
293 # Recall that index is the index of the NEXT args tuple that we
294 # haven't yet launched. Therefore it counts how many args tuples
295 # we've launched so far.
296 while self.free() > 0 or finished == index:
297 # Just like the implementation of execute_async(), save that
298 # we're passing our queue instead of None as the "event" to
299 # which to send() the result.
300 self._execute(q, function, args, {})
301 # We've consumed that args tuple, advance to next.
302 index, args = six.next(tuples)
303 # Okay, we've filled up the pool again, yield a result -- which
304 # will probably wait for a coroutine to complete. Although we do
305 # have q.ready(), so we could iterate without waiting, we avoid
306 # that because every yield could involve considerable real time.
307 # We don't know how long it takes to return from yield, so every
308 # time we do, take the opportunity to stuff more requests into the
309 # pool before yielding again.
311 # Be sure to count results so we know when to stop!
313 except StopIteration:
315 # Here we've exhausted the input iterable. index+1 is the total number
316 # of coroutines we've launched. We probably haven't yielded that many
317 # results yet. Wait for the rest of the results, yielding them as they
319 while finished < index + 1: