X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;ds=sidebyside;f=eventlet%2Feventlet%2Fgreenpool.py;fp=eventlet%2Feventlet%2Fgreenpool.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=a2503fc88e56be6caf9a16af9952991b04bebf7a;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/eventlet/greenpool.py b/eventlet/eventlet/greenpool.py deleted file mode 100644 index a2503fc..0000000 --- a/eventlet/eventlet/greenpool.py +++ /dev/null @@ -1,245 +0,0 @@ -import traceback - -from eventlet import event -from eventlet import greenthread -from eventlet import queue -from eventlet import semaphore -from eventlet.support import greenlets as greenlet -from eventlet.support import six - -__all__ = ['GreenPool', 'GreenPile'] - -DEBUG = True - - -class GreenPool(object): - """The GreenPool class is a pool of green threads. - """ - - def __init__(self, size=1000): - self.size = size - self.coroutines_running = set() - self.sem = semaphore.Semaphore(size) - self.no_coros_running = event.Event() - - def resize(self, new_size): - """ Change the max number of greenthreads doing work at any given time. - - If resize is called when there are more than *new_size* greenthreads - already working on tasks, they will be allowed to complete but no new - tasks will be allowed to get launched until enough greenthreads finish - their tasks to drop the overall quantity below *new_size*. Until - then, the return value of free() will be negative. - """ - size_delta = new_size - self.size - self.sem.counter += size_delta - self.size = new_size - - def running(self): - """ Returns the number of greenthreads that are currently executing - functions in the GreenPool.""" - return len(self.coroutines_running) - - def free(self): - """ Returns the number of greenthreads available for use. - - If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will - block the calling greenthread until a slot becomes available.""" - return self.sem.counter - - def spawn(self, function, *args, **kwargs): - """Run the *function* with its arguments in its own green thread. - Returns the :class:`GreenThread ` - object that is running the function, which can be used to retrieve the - results. - - If the pool is currently at capacity, ``spawn`` will block until one of - the running greenthreads completes its task and frees up a slot. - - This function is reentrant; *function* can call ``spawn`` on the same - pool without risk of deadlocking the whole thing. - """ - # if reentering an empty pool, don't try to wait on a coroutine freeing - # itself -- instead, just execute in the current coroutine - current = greenthread.getcurrent() - if self.sem.locked() and current in self.coroutines_running: - # a bit hacky to use the GT without switching to it - gt = greenthread.GreenThread(current) - gt.main(function, args, kwargs) - return gt - else: - self.sem.acquire() - gt = greenthread.spawn(function, *args, **kwargs) - if not self.coroutines_running: - self.no_coros_running = event.Event() - self.coroutines_running.add(gt) - gt.link(self._spawn_done) - return gt - - def _spawn_n_impl(self, func, args, kwargs, coro): - try: - try: - func(*args, **kwargs) - except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit): - raise - except: - if DEBUG: - traceback.print_exc() - finally: - if coro is None: - return - else: - coro = greenthread.getcurrent() - self._spawn_done(coro) - - def spawn_n(self, function, *args, **kwargs): - """Create a greenthread to run the *function*, the same as - :meth:`spawn`. The difference is that :meth:`spawn_n` returns - None; the results of *function* are not retrievable. - """ - # if reentering an empty pool, don't try to wait on a coroutine freeing - # itself -- instead, just execute in the current coroutine - current = greenthread.getcurrent() - if self.sem.locked() and current in self.coroutines_running: - self._spawn_n_impl(function, args, kwargs, None) - else: - self.sem.acquire() - g = greenthread.spawn_n( - self._spawn_n_impl, - function, args, kwargs, True) - if not self.coroutines_running: - self.no_coros_running = event.Event() - self.coroutines_running.add(g) - - def waitall(self): - """Waits until all greenthreads in the pool are finished working.""" - assert greenthread.getcurrent() not in self.coroutines_running, \ - "Calling waitall() from within one of the " \ - "GreenPool's greenthreads will never terminate." - if self.running(): - self.no_coros_running.wait() - - def _spawn_done(self, coro): - self.sem.release() - if coro is not None: - self.coroutines_running.remove(coro) - # if done processing (no more work is waiting for processing), - # we can finish off any waitall() calls that might be pending - if self.sem.balance == self.size: - self.no_coros_running.send(None) - - def waiting(self): - """Return the number of greenthreads waiting to spawn. - """ - if self.sem.balance < 0: - return -self.sem.balance - else: - return 0 - - def _do_map(self, func, it, gi): - for args in it: - gi.spawn(func, *args) - gi.spawn(return_stop_iteration) - - def starmap(self, function, iterable): - """This is the same as :func:`itertools.starmap`, except that *func* is - executed in a separate green thread for each item, with the concurrency - limited by the pool's size. In operation, starmap consumes a constant - amount of memory, proportional to the size of the pool, and is thus - suited for iterating over extremely long input lists. - """ - if function is None: - function = lambda *a: a - gi = GreenMap(self.size) - greenthread.spawn_n(self._do_map, function, iterable, gi) - return gi - - def imap(self, function, *iterables): - """This is the same as :func:`itertools.imap`, and has the same - concurrency and memory behavior as :meth:`starmap`. - - It's quite convenient for, e.g., farming out jobs from a file:: - - def worker(line): - return do_something(line) - pool = GreenPool() - for result in pool.imap(worker, open("filename", 'r')): - print(result) - """ - return self.starmap(function, six.moves.zip(*iterables)) - - -def return_stop_iteration(): - return StopIteration() - - -class GreenPile(object): - """GreenPile is an abstraction representing a bunch of I/O-related tasks. - - Construct a GreenPile with an existing GreenPool object. The GreenPile will - then use that pool's concurrency as it processes its jobs. There can be - many GreenPiles associated with a single GreenPool. - - A GreenPile can also be constructed standalone, not associated with any - GreenPool. To do this, construct it with an integer size parameter instead - of a GreenPool. - - It is not advisable to iterate over a GreenPile in a different greenthread - than the one which is calling spawn. The iterator will exit early in that - situation. - """ - - def __init__(self, size_or_pool=1000): - if isinstance(size_or_pool, GreenPool): - self.pool = size_or_pool - else: - self.pool = GreenPool(size_or_pool) - self.waiters = queue.LightQueue() - self.used = False - self.counter = 0 - - def spawn(self, func, *args, **kw): - """Runs *func* in its own green thread, with the result available by - iterating over the GreenPile object.""" - self.used = True - self.counter += 1 - try: - gt = self.pool.spawn(func, *args, **kw) - self.waiters.put(gt) - except: - self.counter -= 1 - raise - - def __iter__(self): - return self - - def next(self): - """Wait for the next result, suspending the current greenthread until it - is available. Raises StopIteration when there are no more results.""" - if self.counter == 0 and self.used: - raise StopIteration() - try: - return self.waiters.get().wait() - finally: - self.counter -= 1 - __next__ = next - - -# this is identical to GreenPile but it blocks on spawn if the results -# aren't consumed, and it doesn't generate its own StopIteration exception, -# instead relying on the spawning process to send one in when it's done -class GreenMap(GreenPile): - def __init__(self, size_or_pool): - super(GreenMap, self).__init__(size_or_pool) - self.waiters = queue.LightQueue(maxsize=self.pool.size) - - def next(self): - try: - val = self.waiters.get().wait() - if isinstance(val, StopIteration): - raise val - else: - return val - finally: - self.counter -= 1 - __next__ = next