X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Feventlet%2Fqueue.py;fp=eventlet%2Feventlet%2Fqueue.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=5a82238193670c5f9b2384b7a9075cdaf72c0ef9;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/eventlet/queue.py b/eventlet/eventlet/queue.py deleted file mode 100644 index 5a82238..0000000 --- a/eventlet/eventlet/queue.py +++ /dev/null @@ -1,469 +0,0 @@ -# Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com -# Copyright (c) 2010 Eventlet Contributors (see AUTHORS) -# and licensed under the MIT license: -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -"""Synchronized queues. - -The :mod:`eventlet.queue` module implements multi-producer, multi-consumer -queues that work across greenlets, with the API similar to the classes found in -the standard :mod:`Queue` and :class:`multiprocessing ` -modules. - -A major difference is that queues in this module operate as channels when -initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty` -and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until -a call to :meth:`Queue.get` retrieves the item. - -An interesting difference, made possible because of greenthreads, is -that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be -used as indicators of whether the subsequent :meth:`Queue.get` -or :meth:`Queue.put` will not block. The new methods :meth:`Queue.getting` -and :meth:`Queue.putting` report on the number of greenthreads blocking -in :meth:`put ` or :meth:`get ` respectively. -""" -from __future__ import print_function - -import sys -import heapq -import collections -import traceback - -from eventlet.event import Event -from eventlet.greenthread import getcurrent -from eventlet.hubs import get_hub -from eventlet.support import six -from eventlet.timeout import Timeout - - -__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty'] - -_NONE = object() -Full = six.moves.queue.Full -Empty = six.moves.queue.Empty - - -class Waiter(object): - """A low level synchronization class. - - Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe: - - * switching will occur only if the waiting greenlet is executing :meth:`wait` - method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops. - * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw` - - The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet. - The :meth:`wait` method must be called from a greenlet other than :class:`Hub`. - """ - __slots__ = ['greenlet'] - - def __init__(self): - self.greenlet = None - - def __repr__(self): - if self.waiting: - waiting = ' waiting' - else: - waiting = '' - return '<%s at %s%s greenlet=%r>' % ( - type(self).__name__, hex(id(self)), waiting, self.greenlet, - ) - - def __str__(self): - """ - >>> print(Waiter()) - - """ - if self.waiting: - waiting = ' waiting' - else: - waiting = '' - return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet) - - def __nonzero__(self): - return self.greenlet is not None - - __bool__ = __nonzero__ - - @property - def waiting(self): - return self.greenlet is not None - - def switch(self, value=None): - """Wake up the greenlet that is calling wait() currently (if there is one). - Can only be called from Hub's greenlet. - """ - assert getcurrent() is get_hub( - ).greenlet, "Can only use Waiter.switch method from the mainloop" - if self.greenlet is not None: - try: - self.greenlet.switch(value) - except: - traceback.print_exc() - - def throw(self, *throw_args): - """Make greenlet calling wait() wake up (if there is a wait()). - Can only be called from Hub's greenlet. - """ - assert getcurrent() is get_hub( - ).greenlet, "Can only use Waiter.switch method from the mainloop" - if self.greenlet is not None: - try: - self.greenlet.throw(*throw_args) - except: - traceback.print_exc() - - # XXX should be renamed to get() ? and the whole class is called Receiver? - def wait(self): - """Wait until switch() or throw() is called. - """ - assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) - self.greenlet = getcurrent() - try: - return get_hub().switch() - finally: - self.greenlet = None - - -class LightQueue(object): - """ - This is a variant of Queue that behaves mostly like the standard - :class:`Queue`. It differs by not supporting the - :meth:`task_done ` or :meth:`join ` methods, - and is a little faster for not having that overhead. - """ - - def __init__(self, maxsize=None): - if maxsize is None or maxsize < 0: # None is not comparable in 3.x - self.maxsize = None - else: - self.maxsize = maxsize - self.getters = set() - self.putters = set() - self._event_unlock = None - self._init(maxsize) - - # QQQ make maxsize into a property with setter that schedules unlock if necessary - - def _init(self, maxsize): - self.queue = collections.deque() - - def _get(self): - return self.queue.popleft() - - def _put(self, item): - self.queue.append(item) - - def __repr__(self): - return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format()) - - def __str__(self): - return '<%s %s>' % (type(self).__name__, self._format()) - - def _format(self): - result = 'maxsize=%r' % (self.maxsize, ) - if getattr(self, 'queue', None): - result += ' queue=%r' % self.queue - if self.getters: - result += ' getters[%s]' % len(self.getters) - if self.putters: - result += ' putters[%s]' % len(self.putters) - if self._event_unlock is not None: - result += ' unlocking' - return result - - def qsize(self): - """Return the size of the queue.""" - return len(self.queue) - - def resize(self, size): - """Resizes the queue's maximum size. - - If the size is increased, and there are putters waiting, they may be woken up.""" - # None is not comparable in 3.x - if self.maxsize is not None and (size is None or size > self.maxsize): - # Maybe wake some stuff up - self._schedule_unlock() - self.maxsize = size - - def putting(self): - """Returns the number of greenthreads that are blocked waiting to put - items into the queue.""" - return len(self.putters) - - def getting(self): - """Returns the number of greenthreads that are blocked waiting on an - empty queue.""" - return len(self.getters) - - def empty(self): - """Return ``True`` if the queue is empty, ``False`` otherwise.""" - return not self.qsize() - - def full(self): - """Return ``True`` if the queue is full, ``False`` otherwise. - - ``Queue(None)`` is never full. - """ - # None is not comparable in 3.x - return self.maxsize is not None and self.qsize() >= self.maxsize - - def put(self, item, block=True, timeout=None): - """Put an item into the queue. - - If optional arg *block* is true and *timeout* is ``None`` (the default), - block if necessary until a free slot is available. If *timeout* is - a positive number, it blocks at most *timeout* seconds and raises - the :class:`Full` exception if no free slot was available within that time. - Otherwise (*block* is false), put an item on the queue if a free slot - is immediately available, else raise the :class:`Full` exception (*timeout* - is ignored in that case). - """ - if self.maxsize is None or self.qsize() < self.maxsize: - # there's a free slot, put an item right away - self._put(item) - if self.getters: - self._schedule_unlock() - elif not block and get_hub().greenlet is getcurrent(): - # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though - # find a getter and deliver an item to it - while self.getters: - getter = self.getters.pop() - if getter: - self._put(item) - item = self._get() - getter.switch(item) - return - raise Full - elif block: - waiter = ItemWaiter(item) - self.putters.add(waiter) - timeout = Timeout(timeout, Full) - try: - if self.getters: - self._schedule_unlock() - result = waiter.wait() - assert result is waiter, "Invalid switch into Queue.put: %r" % (result, ) - if waiter.item is not _NONE: - self._put(item) - finally: - timeout.cancel() - self.putters.discard(waiter) - else: - raise Full - - def put_nowait(self, item): - """Put an item into the queue without blocking. - - Only enqueue the item if a free slot is immediately available. - Otherwise raise the :class:`Full` exception. - """ - self.put(item, False) - - def get(self, block=True, timeout=None): - """Remove and return an item from the queue. - - If optional args *block* is true and *timeout* is ``None`` (the default), - block if necessary until an item is available. If *timeout* is a positive number, - it blocks at most *timeout* seconds and raises the :class:`Empty` exception - if no item was available within that time. Otherwise (*block* is false), return - an item if one is immediately available, else raise the :class:`Empty` exception - (*timeout* is ignored in that case). - """ - if self.qsize(): - if self.putters: - self._schedule_unlock() - return self._get() - elif not block and get_hub().greenlet is getcurrent(): - # special case to make get_nowait() runnable in the mainloop greenlet - # there are no items in the queue; try to fix the situation by unlocking putters - while self.putters: - putter = self.putters.pop() - if putter: - putter.switch(putter) - if self.qsize(): - return self._get() - raise Empty - elif block: - waiter = Waiter() - timeout = Timeout(timeout, Empty) - try: - self.getters.add(waiter) - if self.putters: - self._schedule_unlock() - return waiter.wait() - finally: - self.getters.discard(waiter) - timeout.cancel() - else: - raise Empty - - def get_nowait(self): - """Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the :class:`Empty` exception. - """ - return self.get(False) - - def _unlock(self): - try: - while True: - if self.qsize() and self.getters: - getter = self.getters.pop() - if getter: - try: - item = self._get() - except: - getter.throw(*sys.exc_info()) - else: - getter.switch(item) - elif self.putters and self.getters: - putter = self.putters.pop() - if putter: - getter = self.getters.pop() - if getter: - item = putter.item - # this makes greenlet calling put() not to call _put() again - putter.item = _NONE - self._put(item) - item = self._get() - getter.switch(item) - putter.switch(putter) - else: - self.putters.add(putter) - elif self.putters and (self.getters or - self.maxsize is None or - self.qsize() < self.maxsize): - putter = self.putters.pop() - putter.switch(putter) - else: - break - finally: - self._event_unlock = None # QQQ maybe it's possible to obtain this info from libevent? - # i.e. whether this event is pending _OR_ currently executing - # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute - # to avoid this, schedule unlock with timer(0, ...) once in a while - - def _schedule_unlock(self): - if self._event_unlock is None: - self._event_unlock = get_hub().schedule_call_global(0, self._unlock) - - -class ItemWaiter(Waiter): - __slots__ = ['item'] - - def __init__(self, item): - Waiter.__init__(self) - self.item = item - - -class Queue(LightQueue): - '''Create a queue object with a given maximum size. - - If *maxsize* is less than zero or ``None``, the queue size is infinite. - - ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks - until the item is delivered. (This is unlike the standard :class:`Queue`, - where 0 means infinite size). - - In all other respects, this Queue class resembled the standard library, - :class:`Queue`. - ''' - - def __init__(self, maxsize=None): - LightQueue.__init__(self, maxsize) - self.unfinished_tasks = 0 - self._cond = Event() - - def _format(self): - result = LightQueue._format(self) - if self.unfinished_tasks: - result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond) - return result - - def _put(self, item): - LightQueue._put(self, item) - self._put_bookkeeping() - - def _put_bookkeeping(self): - self.unfinished_tasks += 1 - if self._cond.ready(): - self._cond.reset() - - def task_done(self): - '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads. - For each :meth:`get ` used to fetch a task, a subsequent call to - :meth:`task_done` tells the queue that the processing on the task is complete. - - If a :meth:`join` is currently blocking, it will resume when all items have been processed - (meaning that a :meth:`task_done` call was received for every item that had been - :meth:`put ` into the queue). - - Raises a :exc:`ValueError` if called more times than there were items placed in the queue. - ''' - - if self.unfinished_tasks <= 0: - raise ValueError('task_done() called too many times') - self.unfinished_tasks -= 1 - if self.unfinished_tasks == 0: - self._cond.send(None) - - def join(self): - '''Block until all items in the queue have been gotten and processed. - - The count of unfinished tasks goes up whenever an item is added to the queue. - The count goes down whenever a consumer thread calls :meth:`task_done` to indicate - that the item was retrieved and all work on it is complete. When the count of - unfinished tasks drops to zero, :meth:`join` unblocks. - ''' - if self.unfinished_tasks > 0: - self._cond.wait() - - -class PriorityQueue(Queue): - '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first). - - Entries are typically tuples of the form: ``(priority number, data)``. - ''' - - def _init(self, maxsize): - self.queue = [] - - def _put(self, item, heappush=heapq.heappush): - heappush(self.queue, item) - self._put_bookkeeping() - - def _get(self, heappop=heapq.heappop): - return heappop(self.queue) - - -class LifoQueue(Queue): - '''A subclass of :class:`Queue` that retrieves most recently added entries first.''' - - def _init(self, maxsize): - self.queue = [] - - def _put(self, item): - self.queue.append(item) - self._put_bookkeeping() - - def _get(self): - return self.queue.pop()