Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / pool.py
1 from __future__ import print_function
2
3 from eventlet import coros, proc, api
4 from eventlet.semaphore import Semaphore
5 from eventlet.support import six
6
7 import warnings
8 warnings.warn(
9     "The pool module is deprecated.  Please use the "
10     "eventlet.GreenPool and eventlet.GreenPile classes instead.",
11     DeprecationWarning, stacklevel=2)
12
13
14 class Pool(object):
15     def __init__(self, min_size=0, max_size=4, track_events=False):
16         if min_size > max_size:
17             raise ValueError('min_size cannot be bigger than max_size')
18         self.max_size = max_size
19         self.sem = Semaphore(max_size)
20         self.procs = proc.RunningProcSet()
21         if track_events:
22             self.results = coros.queue()
23         else:
24             self.results = None
25
26     def resize(self, new_max_size):
27         """ Change the :attr:`max_size` of the pool.
28
29         If the pool gets resized when there are more than *new_max_size*
30         coroutines checked out, when they are returned to the pool they will be
31         discarded.  The return value of :meth:`free` will be negative in this
32         situation.
33         """
34         max_size_delta = new_max_size - self.max_size
35         self.sem.counter += max_size_delta
36         self.max_size = new_max_size
37
38     @property
39     def current_size(self):
40         """ The number of coroutines that are currently executing jobs. """
41         return len(self.procs)
42
43     def free(self):
44         """ Returns the number of coroutines that are available for doing
45         work."""
46         return self.sem.counter
47
48     def execute(self, func, *args, **kwargs):
49         """Execute func in one of the coroutines maintained
50         by the pool, when one is free.
51
52         Immediately returns a :class:`~eventlet.proc.Proc` object which can be
53         queried for the func's result.
54
55         >>> pool = Pool()
56         >>> task = pool.execute(lambda a: ('foo', a), 1)
57         >>> task.wait()
58         ('foo', 1)
59         """
60         # if reentering an empty pool, don't try to wait on a coroutine freeing
61         # itself -- instead, just execute in the current coroutine
62         if self.sem.locked() and api.getcurrent() in self.procs:
63             p = proc.spawn(func, *args, **kwargs)
64             try:
65                 p.wait()
66             except:
67                 pass
68         else:
69             self.sem.acquire()
70             p = self.procs.spawn(func, *args, **kwargs)
71             # assuming the above line cannot raise
72             p.link(lambda p: self.sem.release())
73         if self.results is not None:
74             p.link(self.results)
75         return p
76
77     execute_async = execute
78
79     def _execute(self, evt, func, args, kw):
80         p = self.execute(func, *args, **kw)
81         p.link(evt)
82         return p
83
84     def waitall(self):
85         """ Calling this function blocks until every coroutine
86         completes its work (i.e. there are 0 running coroutines)."""
87         return self.procs.waitall()
88
89     wait_all = waitall
90
91     def wait(self):
92         """Wait for the next execute in the pool to complete,
93         and return the result."""
94         return self.results.wait()
95
96     def waiting(self):
97         """Return the number of coroutines waiting to execute.
98         """
99         if self.sem.balance < 0:
100             return -self.sem.balance
101         else:
102             return 0
103
104     def killall(self):
105         """ Kill every running coroutine as immediately as possible."""
106         return self.procs.killall()
107
108     def launch_all(self, function, iterable):
109         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
110         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
111         Discard values returned by ``function()``. You should call
112         ``wait_all()`` to wait for all coroutines, newly-launched plus any
113         previously-submitted :meth:`execute` or :meth:`execute_async` calls, to
114         complete.
115
116         >>> pool = Pool()
117         >>> def saw(x):
118         ...     print("I saw %s!" % x)
119         ...
120         >>> pool.launch_all(saw, "ABC")
121         >>> pool.wait_all()
122         I saw A!
123         I saw B!
124         I saw C!
125         """
126         for tup in iterable:
127             self.execute(function, *tup)
128
129     def process_all(self, function, iterable):
130         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
131         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
132         Discard values returned by ``function()``. Don't return until all
133         coroutines, newly-launched plus any previously-submitted :meth:`execute()`
134         or :meth:`execute_async` calls, have completed.
135
136         >>> from eventlet import coros
137         >>> pool = coros.CoroutinePool()
138         >>> def saw(x): print("I saw %s!" % x)
139         ...
140         >>> pool.process_all(saw, "DEF")
141         I saw D!
142         I saw E!
143         I saw F!
144         """
145         self.launch_all(function, iterable)
146         self.wait_all()
147
148     def generate_results(self, function, iterable, qsize=None):
149         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
150         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
151         Yield each of the values returned by ``function()``, in the order
152         they're completed rather than the order the coroutines were launched.
153
154         Iteration stops when we've yielded results for each arguments tuple in
155         *iterable*. Unlike :meth:`wait_all` and :meth:`process_all`, this
156         function does not wait for any previously-submitted :meth:`execute` or
157         :meth:`execute_async` calls.
158
159         Results are temporarily buffered in a queue. If you pass *qsize=*, this
160         value is used to limit the max size of the queue: an attempt to buffer
161         too many results will suspend the completed :class:`CoroutinePool`
162         coroutine until the requesting coroutine (the caller of
163         :meth:`generate_results`) has retrieved one or more results by calling
164         this generator-iterator's ``next()``.
165
166         If any coroutine raises an uncaught exception, that exception will
167         propagate to the requesting coroutine via the corresponding ``next()``
168         call.
169
170         What I particularly want these tests to illustrate is that using this
171         generator function::
172
173             for result in generate_results(function, iterable):
174                 # ... do something with result ...
175                 pass
176
177         executes coroutines at least as aggressively as the classic eventlet
178         idiom::
179
180             events = [pool.execute(function, *args) for args in iterable]
181             for event in events:
182                 result = event.wait()
183                 # ... do something with result ...
184
185         even without a distinct event object for every arg tuple in *iterable*,
186         and despite the funny flow control from interleaving launches of new
187         coroutines with yields of completed coroutines' results.
188
189         (The use case that makes this function preferable to the classic idiom
190         above is when the *iterable*, which may itself be a generator, produces
191         millions of items.)
192
193         >>> from eventlet import coros
194         >>> from eventlet.support import six
195         >>> import string
196         >>> pool = coros.CoroutinePool(max_size=5)
197         >>> pausers = [coros.Event() for x in range(2)]
198         >>> def longtask(evt, desc):
199         ...     print("%s woke up with %s" % (desc, evt.wait()))
200         ...
201         >>> pool.launch_all(longtask, zip(pausers, "AB"))
202         >>> def quicktask(desc):
203         ...     print("returning %s" % desc)
204         ...     return desc
205         ...
206
207         (Instead of using a ``for`` loop, step through :meth:`generate_results`
208         items individually to illustrate timing)
209
210         >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
211         >>> print(six.next(step))
212         returning a
213         returning b
214         returning c
215         a
216         >>> print(six.next(step))
217         b
218         >>> print(six.next(step))
219         c
220         >>> print(six.next(step))
221         returning d
222         returning e
223         returning f
224         d
225         >>> pausers[0].send("A")
226         >>> print(six.next(step))
227         e
228         >>> print(six.next(step))
229         f
230         >>> print(six.next(step))
231         A woke up with A
232         returning g
233         returning h
234         returning i
235         g
236         >>> print("".join([six.next(step) for x in range(3)]))
237         returning j
238         returning k
239         returning l
240         returning m
241         hij
242         >>> pausers[1].send("B")
243         >>> print("".join([six.next(step) for x in range(4)]))
244         B woke up with B
245         returning n
246         returning o
247         returning p
248         returning q
249         klmn
250         """
251         # Get an iterator because of our funny nested loop below. Wrap the
252         # iterable in enumerate() so we count items that come through.
253         tuples = iter(enumerate(iterable))
254         # If the iterable is empty, this whole function is a no-op, and we can
255         # save ourselves some grief by just quitting out. In particular, once
256         # we enter the outer loop below, we're going to wait on the queue --
257         # but if we launched no coroutines with that queue as the destination,
258         # we could end up waiting a very long time.
259         try:
260             index, args = six.next(tuples)
261         except StopIteration:
262             return
263         # From this point forward, 'args' is the current arguments tuple and
264         # 'index+1' counts how many such tuples we've seen.
265         # This implementation relies on the fact that _execute() accepts an
266         # event-like object, and -- unless it's None -- the completed
267         # coroutine calls send(result). We slyly pass a queue rather than an
268         # event -- the same queue instance for all coroutines. This is why our
269         # queue interface intentionally resembles the event interface.
270         q = coros.queue(max_size=qsize)
271         # How many results have we yielded so far?
272         finished = 0
273         # This first loop is only until we've launched all the coroutines. Its
274         # complexity is because if iterable contains more args tuples than the
275         # size of our pool, attempting to _execute() the (poolsize+1)th
276         # coroutine would suspend until something completes and send()s its
277         # result to our queue. But to keep down queue overhead and to maximize
278         # responsiveness to our caller, we'd rather suspend on reading the
279         # queue. So we stuff the pool as full as we can, then wait for
280         # something to finish, then stuff more coroutines into the pool.
281         try:
282             while True:
283                 # Before each yield, start as many new coroutines as we can fit.
284                 # (The self.free() test isn't 100% accurate: if we happen to be
285                 # executing in one of the pool's coroutines, we could _execute()
286                 # without waiting even if self.free() reports 0. See _execute().)
287                 # The point is that we don't want to wait in the _execute() call,
288                 # we want to wait in the q.wait() call.
289                 # IMPORTANT: at start, and whenever we've caught up with all
290                 # coroutines we've launched so far, we MUST iterate this inner
291                 # loop at least once, regardless of self.free() -- otherwise the
292                 # q.wait() call below will deadlock!
293                 # Recall that index is the index of the NEXT args tuple that we
294                 # haven't yet launched. Therefore it counts how many args tuples
295                 # we've launched so far.
296                 while self.free() > 0 or finished == index:
297                     # Just like the implementation of execute_async(), save that
298                     # we're passing our queue instead of None as the "event" to
299                     # which to send() the result.
300                     self._execute(q, function, args, {})
301                     # We've consumed that args tuple, advance to next.
302                     index, args = six.next(tuples)
303                 # Okay, we've filled up the pool again, yield a result -- which
304                 # will probably wait for a coroutine to complete. Although we do
305                 # have q.ready(), so we could iterate without waiting, we avoid
306                 # that because every yield could involve considerable real time.
307                 # We don't know how long it takes to return from yield, so every
308                 # time we do, take the opportunity to stuff more requests into the
309                 # pool before yielding again.
310                 yield q.wait()
311                 # Be sure to count results so we know when to stop!
312                 finished += 1
313         except StopIteration:
314             pass
315         # Here we've exhausted the input iterable. index+1 is the total number
316         # of coroutines we've launched. We probably haven't yielded that many
317         # results yet. Wait for the rest of the results, yielding them as they
318         # arrive.
319         while finished < index + 1:
320             yield q.wait()
321             finished += 1