+++ /dev/null
-import traceback
-
-from eventlet import event
-from eventlet import greenthread
-from eventlet import queue
-from eventlet import semaphore
-from eventlet.support import greenlets as greenlet
-from eventlet.support import six
-
-__all__ = ['GreenPool', 'GreenPile']
-
-DEBUG = True
-
-
-class GreenPool(object):
- """The GreenPool class is a pool of green threads.
- """
-
- def __init__(self, size=1000):
- self.size = size
- self.coroutines_running = set()
- self.sem = semaphore.Semaphore(size)
- self.no_coros_running = event.Event()
-
- def resize(self, new_size):
- """ Change the max number of greenthreads doing work at any given time.
-
- If resize is called when there are more than *new_size* greenthreads
- already working on tasks, they will be allowed to complete but no new
- tasks will be allowed to get launched until enough greenthreads finish
- their tasks to drop the overall quantity below *new_size*. Until
- then, the return value of free() will be negative.
- """
- size_delta = new_size - self.size
- self.sem.counter += size_delta
- self.size = new_size
-
- def running(self):
- """ Returns the number of greenthreads that are currently executing
- functions in the GreenPool."""
- return len(self.coroutines_running)
-
- def free(self):
- """ Returns the number of greenthreads available for use.
-
- If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
- block the calling greenthread until a slot becomes available."""
- return self.sem.counter
-
- def spawn(self, function, *args, **kwargs):
- """Run the *function* with its arguments in its own green thread.
- Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
- object that is running the function, which can be used to retrieve the
- results.
-
- If the pool is currently at capacity, ``spawn`` will block until one of
- the running greenthreads completes its task and frees up a slot.
-
- This function is reentrant; *function* can call ``spawn`` on the same
- pool without risk of deadlocking the whole thing.
- """
- # if reentering an empty pool, don't try to wait on a coroutine freeing
- # itself -- instead, just execute in the current coroutine
- current = greenthread.getcurrent()
- if self.sem.locked() and current in self.coroutines_running:
- # a bit hacky to use the GT without switching to it
- gt = greenthread.GreenThread(current)
- gt.main(function, args, kwargs)
- return gt
- else:
- self.sem.acquire()
- gt = greenthread.spawn(function, *args, **kwargs)
- if not self.coroutines_running:
- self.no_coros_running = event.Event()
- self.coroutines_running.add(gt)
- gt.link(self._spawn_done)
- return gt
-
- def _spawn_n_impl(self, func, args, kwargs, coro):
- try:
- try:
- func(*args, **kwargs)
- except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
- raise
- except:
- if DEBUG:
- traceback.print_exc()
- finally:
- if coro is None:
- return
- else:
- coro = greenthread.getcurrent()
- self._spawn_done(coro)
-
- def spawn_n(self, function, *args, **kwargs):
- """Create a greenthread to run the *function*, the same as
- :meth:`spawn`. The difference is that :meth:`spawn_n` returns
- None; the results of *function* are not retrievable.
- """
- # if reentering an empty pool, don't try to wait on a coroutine freeing
- # itself -- instead, just execute in the current coroutine
- current = greenthread.getcurrent()
- if self.sem.locked() and current in self.coroutines_running:
- self._spawn_n_impl(function, args, kwargs, None)
- else:
- self.sem.acquire()
- g = greenthread.spawn_n(
- self._spawn_n_impl,
- function, args, kwargs, True)
- if not self.coroutines_running:
- self.no_coros_running = event.Event()
- self.coroutines_running.add(g)
-
- def waitall(self):
- """Waits until all greenthreads in the pool are finished working."""
- assert greenthread.getcurrent() not in self.coroutines_running, \
- "Calling waitall() from within one of the " \
- "GreenPool's greenthreads will never terminate."
- if self.running():
- self.no_coros_running.wait()
-
- def _spawn_done(self, coro):
- self.sem.release()
- if coro is not None:
- self.coroutines_running.remove(coro)
- # if done processing (no more work is waiting for processing),
- # we can finish off any waitall() calls that might be pending
- if self.sem.balance == self.size:
- self.no_coros_running.send(None)
-
- def waiting(self):
- """Return the number of greenthreads waiting to spawn.
- """
- if self.sem.balance < 0:
- return -self.sem.balance
- else:
- return 0
-
- def _do_map(self, func, it, gi):
- for args in it:
- gi.spawn(func, *args)
- gi.spawn(return_stop_iteration)
-
- def starmap(self, function, iterable):
- """This is the same as :func:`itertools.starmap`, except that *func* is
- executed in a separate green thread for each item, with the concurrency
- limited by the pool's size. In operation, starmap consumes a constant
- amount of memory, proportional to the size of the pool, and is thus
- suited for iterating over extremely long input lists.
- """
- if function is None:
- function = lambda *a: a
- gi = GreenMap(self.size)
- greenthread.spawn_n(self._do_map, function, iterable, gi)
- return gi
-
- def imap(self, function, *iterables):
- """This is the same as :func:`itertools.imap`, and has the same
- concurrency and memory behavior as :meth:`starmap`.
-
- It's quite convenient for, e.g., farming out jobs from a file::
-
- def worker(line):
- return do_something(line)
- pool = GreenPool()
- for result in pool.imap(worker, open("filename", 'r')):
- print(result)
- """
- return self.starmap(function, six.moves.zip(*iterables))
-
-
-def return_stop_iteration():
- return StopIteration()
-
-
-class GreenPile(object):
- """GreenPile is an abstraction representing a bunch of I/O-related tasks.
-
- Construct a GreenPile with an existing GreenPool object. The GreenPile will
- then use that pool's concurrency as it processes its jobs. There can be
- many GreenPiles associated with a single GreenPool.
-
- A GreenPile can also be constructed standalone, not associated with any
- GreenPool. To do this, construct it with an integer size parameter instead
- of a GreenPool.
-
- It is not advisable to iterate over a GreenPile in a different greenthread
- than the one which is calling spawn. The iterator will exit early in that
- situation.
- """
-
- def __init__(self, size_or_pool=1000):
- if isinstance(size_or_pool, GreenPool):
- self.pool = size_or_pool
- else:
- self.pool = GreenPool(size_or_pool)
- self.waiters = queue.LightQueue()
- self.used = False
- self.counter = 0
-
- def spawn(self, func, *args, **kw):
- """Runs *func* in its own green thread, with the result available by
- iterating over the GreenPile object."""
- self.used = True
- self.counter += 1
- try:
- gt = self.pool.spawn(func, *args, **kw)
- self.waiters.put(gt)
- except:
- self.counter -= 1
- raise
-
- def __iter__(self):
- return self
-
- def next(self):
- """Wait for the next result, suspending the current greenthread until it
- is available. Raises StopIteration when there are no more results."""
- if self.counter == 0 and self.used:
- raise StopIteration()
- try:
- return self.waiters.get().wait()
- finally:
- self.counter -= 1
- __next__ = next
-
-
-# this is identical to GreenPile but it blocks on spawn if the results
-# aren't consumed, and it doesn't generate its own StopIteration exception,
-# instead relying on the spawning process to send one in when it's done
-class GreenMap(GreenPile):
- def __init__(self, size_or_pool):
- super(GreenMap, self).__init__(size_or_pool)
- self.waiters = queue.LightQueue(maxsize=self.pool.size)
-
- def next(self):
- try:
- val = self.waiters.get().wait()
- if isinstance(val, StopIteration):
- raise val
- else:
- return val
- finally:
- self.counter -= 1
- __next__ = next