3 from eventlet import event
4 from eventlet import greenthread
5 from eventlet import queue
6 from eventlet import semaphore
7 from eventlet.support import greenlets as greenlet
8 from eventlet.support import six
10 __all__ = ['GreenPool', 'GreenPile']
15 class GreenPool(object):
16 """The GreenPool class is a pool of green threads.
19 def __init__(self, size=1000):
21 self.coroutines_running = set()
22 self.sem = semaphore.Semaphore(size)
23 self.no_coros_running = event.Event()
25 def resize(self, new_size):
26 """ Change the max number of greenthreads doing work at any given time.
28 If resize is called when there are more than *new_size* greenthreads
29 already working on tasks, they will be allowed to complete but no new
30 tasks will be allowed to get launched until enough greenthreads finish
31 their tasks to drop the overall quantity below *new_size*. Until
32 then, the return value of free() will be negative.
34 size_delta = new_size - self.size
35 self.sem.counter += size_delta
39 """ Returns the number of greenthreads that are currently executing
40 functions in the GreenPool."""
41 return len(self.coroutines_running)
44 """ Returns the number of greenthreads available for use.
46 If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
47 block the calling greenthread until a slot becomes available."""
48 return self.sem.counter
50 def spawn(self, function, *args, **kwargs):
51 """Run the *function* with its arguments in its own green thread.
52 Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
53 object that is running the function, which can be used to retrieve the
56 If the pool is currently at capacity, ``spawn`` will block until one of
57 the running greenthreads completes its task and frees up a slot.
59 This function is reentrant; *function* can call ``spawn`` on the same
60 pool without risk of deadlocking the whole thing.
62 # if reentering an empty pool, don't try to wait on a coroutine freeing
63 # itself -- instead, just execute in the current coroutine
64 current = greenthread.getcurrent()
65 if self.sem.locked() and current in self.coroutines_running:
66 # a bit hacky to use the GT without switching to it
67 gt = greenthread.GreenThread(current)
68 gt.main(function, args, kwargs)
72 gt = greenthread.spawn(function, *args, **kwargs)
73 if not self.coroutines_running:
74 self.no_coros_running = event.Event()
75 self.coroutines_running.add(gt)
76 gt.link(self._spawn_done)
79 def _spawn_n_impl(self, func, args, kwargs, coro):
83 except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
92 coro = greenthread.getcurrent()
93 self._spawn_done(coro)
95 def spawn_n(self, function, *args, **kwargs):
96 """Create a greenthread to run the *function*, the same as
97 :meth:`spawn`. The difference is that :meth:`spawn_n` returns
98 None; the results of *function* are not retrievable.
100 # if reentering an empty pool, don't try to wait on a coroutine freeing
101 # itself -- instead, just execute in the current coroutine
102 current = greenthread.getcurrent()
103 if self.sem.locked() and current in self.coroutines_running:
104 self._spawn_n_impl(function, args, kwargs, None)
107 g = greenthread.spawn_n(
109 function, args, kwargs, True)
110 if not self.coroutines_running:
111 self.no_coros_running = event.Event()
112 self.coroutines_running.add(g)
115 """Waits until all greenthreads in the pool are finished working."""
116 assert greenthread.getcurrent() not in self.coroutines_running, \
117 "Calling waitall() from within one of the " \
118 "GreenPool's greenthreads will never terminate."
120 self.no_coros_running.wait()
122 def _spawn_done(self, coro):
125 self.coroutines_running.remove(coro)
126 # if done processing (no more work is waiting for processing),
127 # we can finish off any waitall() calls that might be pending
128 if self.sem.balance == self.size:
129 self.no_coros_running.send(None)
132 """Return the number of greenthreads waiting to spawn.
134 if self.sem.balance < 0:
135 return -self.sem.balance
139 def _do_map(self, func, it, gi):
141 gi.spawn(func, *args)
142 gi.spawn(return_stop_iteration)
144 def starmap(self, function, iterable):
145 """This is the same as :func:`itertools.starmap`, except that *func* is
146 executed in a separate green thread for each item, with the concurrency
147 limited by the pool's size. In operation, starmap consumes a constant
148 amount of memory, proportional to the size of the pool, and is thus
149 suited for iterating over extremely long input lists.
152 function = lambda *a: a
153 gi = GreenMap(self.size)
154 greenthread.spawn_n(self._do_map, function, iterable, gi)
157 def imap(self, function, *iterables):
158 """This is the same as :func:`itertools.imap`, and has the same
159 concurrency and memory behavior as :meth:`starmap`.
161 It's quite convenient for, e.g., farming out jobs from a file::
164 return do_something(line)
166 for result in pool.imap(worker, open("filename", 'r')):
169 return self.starmap(function, six.moves.zip(*iterables))
172 def return_stop_iteration():
173 return StopIteration()
176 class GreenPile(object):
177 """GreenPile is an abstraction representing a bunch of I/O-related tasks.
179 Construct a GreenPile with an existing GreenPool object. The GreenPile will
180 then use that pool's concurrency as it processes its jobs. There can be
181 many GreenPiles associated with a single GreenPool.
183 A GreenPile can also be constructed standalone, not associated with any
184 GreenPool. To do this, construct it with an integer size parameter instead
187 It is not advisable to iterate over a GreenPile in a different greenthread
188 than the one which is calling spawn. The iterator will exit early in that
192 def __init__(self, size_or_pool=1000):
193 if isinstance(size_or_pool, GreenPool):
194 self.pool = size_or_pool
196 self.pool = GreenPool(size_or_pool)
197 self.waiters = queue.LightQueue()
201 def spawn(self, func, *args, **kw):
202 """Runs *func* in its own green thread, with the result available by
203 iterating over the GreenPile object."""
207 gt = self.pool.spawn(func, *args, **kw)
217 """Wait for the next result, suspending the current greenthread until it
218 is available. Raises StopIteration when there are no more results."""
219 if self.counter == 0 and self.used:
220 raise StopIteration()
222 return self.waiters.get().wait()
228 # this is identical to GreenPile but it blocks on spawn if the results
229 # aren't consumed, and it doesn't generate its own StopIteration exception,
230 # instead relying on the spawning process to send one in when it's done
231 class GreenMap(GreenPile):
232 def __init__(self, size_or_pool):
233 super(GreenMap, self).__init__(size_or_pool)
234 self.waiters = queue.LightQueue(maxsize=self.pool.size)
238 val = self.waiters.get().wait()
239 if isinstance(val, StopIteration):