Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / greenpool.py
1 import traceback
2
3 from eventlet import event
4 from eventlet import greenthread
5 from eventlet import queue
6 from eventlet import semaphore
7 from eventlet.support import greenlets as greenlet
8 from eventlet.support import six
9
10 __all__ = ['GreenPool', 'GreenPile']
11
12 DEBUG = True
13
14
15 class GreenPool(object):
16     """The GreenPool class is a pool of green threads.
17     """
18
19     def __init__(self, size=1000):
20         self.size = size
21         self.coroutines_running = set()
22         self.sem = semaphore.Semaphore(size)
23         self.no_coros_running = event.Event()
24
25     def resize(self, new_size):
26         """ Change the max number of greenthreads doing work at any given time.
27
28         If resize is called when there are more than *new_size* greenthreads
29         already working on tasks, they will be allowed to complete but no new
30         tasks will be allowed to get launched until enough greenthreads finish
31         their tasks to drop the overall quantity below *new_size*.  Until
32         then, the return value of free() will be negative.
33         """
34         size_delta = new_size - self.size
35         self.sem.counter += size_delta
36         self.size = new_size
37
38     def running(self):
39         """ Returns the number of greenthreads that are currently executing
40         functions in the GreenPool."""
41         return len(self.coroutines_running)
42
43     def free(self):
44         """ Returns the number of greenthreads available for use.
45
46         If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
47         block the calling greenthread until a slot becomes available."""
48         return self.sem.counter
49
50     def spawn(self, function, *args, **kwargs):
51         """Run the *function* with its arguments in its own green thread.
52         Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
53         object that is running the function, which can be used to retrieve the
54         results.
55
56         If the pool is currently at capacity, ``spawn`` will block until one of
57         the running greenthreads completes its task and frees up a slot.
58
59         This function is reentrant; *function* can call ``spawn`` on the same
60         pool without risk of deadlocking the whole thing.
61         """
62         # if reentering an empty pool, don't try to wait on a coroutine freeing
63         # itself -- instead, just execute in the current coroutine
64         current = greenthread.getcurrent()
65         if self.sem.locked() and current in self.coroutines_running:
66             # a bit hacky to use the GT without switching to it
67             gt = greenthread.GreenThread(current)
68             gt.main(function, args, kwargs)
69             return gt
70         else:
71             self.sem.acquire()
72             gt = greenthread.spawn(function, *args, **kwargs)
73             if not self.coroutines_running:
74                 self.no_coros_running = event.Event()
75             self.coroutines_running.add(gt)
76             gt.link(self._spawn_done)
77         return gt
78
79     def _spawn_n_impl(self, func, args, kwargs, coro):
80         try:
81             try:
82                 func(*args, **kwargs)
83             except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
84                 raise
85             except:
86                 if DEBUG:
87                     traceback.print_exc()
88         finally:
89             if coro is None:
90                 return
91             else:
92                 coro = greenthread.getcurrent()
93                 self._spawn_done(coro)
94
95     def spawn_n(self, function, *args, **kwargs):
96         """Create a greenthread to run the *function*, the same as
97         :meth:`spawn`.  The difference is that :meth:`spawn_n` returns
98         None; the results of *function* are not retrievable.
99         """
100         # if reentering an empty pool, don't try to wait on a coroutine freeing
101         # itself -- instead, just execute in the current coroutine
102         current = greenthread.getcurrent()
103         if self.sem.locked() and current in self.coroutines_running:
104             self._spawn_n_impl(function, args, kwargs, None)
105         else:
106             self.sem.acquire()
107             g = greenthread.spawn_n(
108                 self._spawn_n_impl,
109                 function, args, kwargs, True)
110             if not self.coroutines_running:
111                 self.no_coros_running = event.Event()
112             self.coroutines_running.add(g)
113
114     def waitall(self):
115         """Waits until all greenthreads in the pool are finished working."""
116         assert greenthread.getcurrent() not in self.coroutines_running, \
117             "Calling waitall() from within one of the " \
118             "GreenPool's greenthreads will never terminate."
119         if self.running():
120             self.no_coros_running.wait()
121
122     def _spawn_done(self, coro):
123         self.sem.release()
124         if coro is not None:
125             self.coroutines_running.remove(coro)
126         # if done processing (no more work is waiting for processing),
127         # we can finish off any waitall() calls that might be pending
128         if self.sem.balance == self.size:
129             self.no_coros_running.send(None)
130
131     def waiting(self):
132         """Return the number of greenthreads waiting to spawn.
133         """
134         if self.sem.balance < 0:
135             return -self.sem.balance
136         else:
137             return 0
138
139     def _do_map(self, func, it, gi):
140         for args in it:
141             gi.spawn(func, *args)
142         gi.spawn(return_stop_iteration)
143
144     def starmap(self, function, iterable):
145         """This is the same as :func:`itertools.starmap`, except that *func* is
146         executed in a separate green thread for each item, with the concurrency
147         limited by the pool's size. In operation, starmap consumes a constant
148         amount of memory, proportional to the size of the pool, and is thus
149         suited for iterating over extremely long input lists.
150         """
151         if function is None:
152             function = lambda *a: a
153         gi = GreenMap(self.size)
154         greenthread.spawn_n(self._do_map, function, iterable, gi)
155         return gi
156
157     def imap(self, function, *iterables):
158         """This is the same as :func:`itertools.imap`, and has the same
159         concurrency and memory behavior as :meth:`starmap`.
160
161         It's quite convenient for, e.g., farming out jobs from a file::
162
163            def worker(line):
164                return do_something(line)
165            pool = GreenPool()
166            for result in pool.imap(worker, open("filename", 'r')):
167                print(result)
168         """
169         return self.starmap(function, six.moves.zip(*iterables))
170
171
172 def return_stop_iteration():
173     return StopIteration()
174
175
176 class GreenPile(object):
177     """GreenPile is an abstraction representing a bunch of I/O-related tasks.
178
179     Construct a GreenPile with an existing GreenPool object.  The GreenPile will
180     then use that pool's concurrency as it processes its jobs.  There can be
181     many GreenPiles associated with a single GreenPool.
182
183     A GreenPile can also be constructed standalone, not associated with any
184     GreenPool.  To do this, construct it with an integer size parameter instead
185     of a GreenPool.
186
187     It is not advisable to iterate over a GreenPile in a different greenthread
188     than the one which is calling spawn.  The iterator will exit early in that
189     situation.
190     """
191
192     def __init__(self, size_or_pool=1000):
193         if isinstance(size_or_pool, GreenPool):
194             self.pool = size_or_pool
195         else:
196             self.pool = GreenPool(size_or_pool)
197         self.waiters = queue.LightQueue()
198         self.used = False
199         self.counter = 0
200
201     def spawn(self, func, *args, **kw):
202         """Runs *func* in its own green thread, with the result available by
203         iterating over the GreenPile object."""
204         self.used = True
205         self.counter += 1
206         try:
207             gt = self.pool.spawn(func, *args, **kw)
208             self.waiters.put(gt)
209         except:
210             self.counter -= 1
211             raise
212
213     def __iter__(self):
214         return self
215
216     def next(self):
217         """Wait for the next result, suspending the current greenthread until it
218         is available.  Raises StopIteration when there are no more results."""
219         if self.counter == 0 and self.used:
220             raise StopIteration()
221         try:
222             return self.waiters.get().wait()
223         finally:
224             self.counter -= 1
225     __next__ = next
226
227
228 # this is identical to GreenPile but it blocks on spawn if the results
229 # aren't consumed, and it doesn't generate its own StopIteration exception,
230 # instead relying on the spawning process to send one in when it's done
231 class GreenMap(GreenPile):
232     def __init__(self, size_or_pool):
233         super(GreenMap, self).__init__(size_or_pool)
234         self.waiters = queue.LightQueue(maxsize=self.pool.size)
235
236     def next(self):
237         try:
238             val = self.waiters.get().wait()
239             if isinstance(val, StopIteration):
240                 raise val
241             else:
242                 return val
243         finally:
244             self.counter -= 1
245     __next__ = next