1 # Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com
2 # Copyright (c) 2010 Eventlet Contributors (see AUTHORS)
3 # and licensed under the MIT license:
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 """Synchronized queues.
25 The :mod:`eventlet.queue` module implements multi-producer, multi-consumer
26 queues that work across greenlets, with the API similar to the classes found in
27 the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>`
30 A major difference is that queues in this module operate as channels when
31 initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
32 and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until
33 a call to :meth:`Queue.get` retrieves the item.
35 An interesting difference, made possible because of greenthreads, is
36 that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be
37 used as indicators of whether the subsequent :meth:`Queue.get`
38 or :meth:`Queue.put` will not block. The new methods :meth:`Queue.getting`
39 and :meth:`Queue.putting` report on the number of greenthreads blocking
40 in :meth:`put <Queue.put>` or :meth:`get <Queue.get>` respectively.
42 from __future__ import print_function
49 from eventlet.event import Event
50 from eventlet.greenthread import getcurrent
51 from eventlet.hubs import get_hub
52 from eventlet.support import six
53 from eventlet.support.six.moves import queue as Stdlib_Queue
54 from eventlet.timeout import Timeout
57 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
60 Full = six.moves.queue.Full
61 Empty = six.moves.queue.Empty
65 """A low level synchronization class.
67 Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
69 * switching will occur only if the waiting greenlet is executing :meth:`wait`
70 method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
71 * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
73 The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
74 The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
76 __slots__ = ['greenlet']
86 return '<%s at %s%s greenlet=%r>' % (
87 type(self).__name__, hex(id(self)), waiting, self.greenlet,
93 <Waiter greenlet=None>
99 return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
101 def __nonzero__(self):
102 return self.greenlet is not None
104 __bool__ = __nonzero__
108 return self.greenlet is not None
110 def switch(self, value=None):
111 """Wake up the greenlet that is calling wait() currently (if there is one).
112 Can only be called from Hub's greenlet.
114 assert getcurrent() is get_hub(
115 ).greenlet, "Can only use Waiter.switch method from the mainloop"
116 if self.greenlet is not None:
118 self.greenlet.switch(value)
120 traceback.print_exc()
122 def throw(self, *throw_args):
123 """Make greenlet calling wait() wake up (if there is a wait()).
124 Can only be called from Hub's greenlet.
126 assert getcurrent() is get_hub(
127 ).greenlet, "Can only use Waiter.switch method from the mainloop"
128 if self.greenlet is not None:
130 self.greenlet.throw(*throw_args)
132 traceback.print_exc()
134 # XXX should be renamed to get() ? and the whole class is called Receiver?
136 """Wait until switch() or throw() is called.
138 assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
139 self.greenlet = getcurrent()
141 return get_hub().switch()
146 class LightQueue(object):
148 This is a variant of Queue that behaves mostly like the standard
149 :class:`Stdlib_Queue`. It differs by not supporting the
150 :meth:`task_done <Stdlib_Queue.task_done>` or
151 :meth:`join <Stdlib_Queue.join>` methods, and is a little faster for
152 not having that overhead.
155 def __init__(self, maxsize=None):
156 if maxsize is None or maxsize < 0: # None is not comparable in 3.x
159 self.maxsize = maxsize
162 self._event_unlock = None
165 # QQQ make maxsize into a property with setter that schedules unlock if necessary
167 def _init(self, maxsize):
168 self.queue = collections.deque()
171 return self.queue.popleft()
173 def _put(self, item):
174 self.queue.append(item)
177 return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
180 return '<%s %s>' % (type(self).__name__, self._format())
183 result = 'maxsize=%r' % (self.maxsize, )
184 if getattr(self, 'queue', None):
185 result += ' queue=%r' % self.queue
187 result += ' getters[%s]' % len(self.getters)
189 result += ' putters[%s]' % len(self.putters)
190 if self._event_unlock is not None:
191 result += ' unlocking'
195 """Return the size of the queue."""
196 return len(self.queue)
198 def resize(self, size):
199 """Resizes the queue's maximum size.
201 If the size is increased, and there are putters waiting, they may be woken up."""
202 # None is not comparable in 3.x
203 if self.maxsize is not None and (size is None or size > self.maxsize):
204 # Maybe wake some stuff up
205 self._schedule_unlock()
209 """Returns the number of greenthreads that are blocked waiting to put
210 items into the queue."""
211 return len(self.putters)
214 """Returns the number of greenthreads that are blocked waiting on an
216 return len(self.getters)
219 """Return ``True`` if the queue is empty, ``False`` otherwise."""
220 return not self.qsize()
223 """Return ``True`` if the queue is full, ``False`` otherwise.
225 ``Queue(None)`` is never full.
227 # None is not comparable in 3.x
228 return self.maxsize is not None and self.qsize() >= self.maxsize
230 def put(self, item, block=True, timeout=None):
231 """Put an item into the queue.
233 If optional arg *block* is true and *timeout* is ``None`` (the default),
234 block if necessary until a free slot is available. If *timeout* is
235 a positive number, it blocks at most *timeout* seconds and raises
236 the :class:`Full` exception if no free slot was available within that time.
237 Otherwise (*block* is false), put an item on the queue if a free slot
238 is immediately available, else raise the :class:`Full` exception (*timeout*
239 is ignored in that case).
241 if self.maxsize is None or self.qsize() < self.maxsize:
242 # there's a free slot, put an item right away
245 self._schedule_unlock()
246 elif not block and get_hub().greenlet is getcurrent():
247 # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
248 # find a getter and deliver an item to it
250 getter = self.getters.pop()
258 waiter = ItemWaiter(item)
259 self.putters.add(waiter)
260 timeout = Timeout(timeout, Full)
263 self._schedule_unlock()
264 result = waiter.wait()
265 assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
266 if waiter.item is not _NONE:
270 self.putters.discard(waiter)
274 def put_nowait(self, item):
275 """Put an item into the queue without blocking.
277 Only enqueue the item if a free slot is immediately available.
278 Otherwise raise the :class:`Full` exception.
280 self.put(item, False)
282 def get(self, block=True, timeout=None):
283 """Remove and return an item from the queue.
285 If optional args *block* is true and *timeout* is ``None`` (the default),
286 block if necessary until an item is available. If *timeout* is a positive number,
287 it blocks at most *timeout* seconds and raises the :class:`Empty` exception
288 if no item was available within that time. Otherwise (*block* is false), return
289 an item if one is immediately available, else raise the :class:`Empty` exception
290 (*timeout* is ignored in that case).
294 self._schedule_unlock()
296 elif not block and get_hub().greenlet is getcurrent():
297 # special case to make get_nowait() runnable in the mainloop greenlet
298 # there are no items in the queue; try to fix the situation by unlocking putters
300 putter = self.putters.pop()
302 putter.switch(putter)
308 timeout = Timeout(timeout, Empty)
310 self.getters.add(waiter)
312 self._schedule_unlock()
315 self.getters.discard(waiter)
320 def get_nowait(self):
321 """Remove and return an item from the queue without blocking.
323 Only get an item if one is immediately available. Otherwise
324 raise the :class:`Empty` exception.
326 return self.get(False)
331 if self.qsize() and self.getters:
332 getter = self.getters.pop()
337 getter.throw(*sys.exc_info())
340 elif self.putters and self.getters:
341 putter = self.putters.pop()
343 getter = self.getters.pop()
346 # this makes greenlet calling put() not to call _put() again
351 putter.switch(putter)
353 self.putters.add(putter)
354 elif self.putters and (self.getters or
355 self.maxsize is None or
356 self.qsize() < self.maxsize):
357 putter = self.putters.pop()
358 putter.switch(putter)
362 self._event_unlock = None # QQQ maybe it's possible to obtain this info from libevent?
363 # i.e. whether this event is pending _OR_ currently executing
364 # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
365 # to avoid this, schedule unlock with timer(0, ...) once in a while
367 def _schedule_unlock(self):
368 if self._event_unlock is None:
369 self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
372 class ItemWaiter(Waiter):
375 def __init__(self, item):
376 Waiter.__init__(self)
380 class Queue(LightQueue):
381 '''Create a queue object with a given maximum size.
383 If *maxsize* is less than zero or ``None``, the queue size is infinite.
385 ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
386 until the item is delivered. (This is unlike the standard
387 :class:`Stdlib_Queue`, where 0 means infinite size).
389 In all other respects, this Queue class resembles the standard library,
390 :class:`Stdlib_Queue`.
393 def __init__(self, maxsize=None):
394 LightQueue.__init__(self, maxsize)
395 self.unfinished_tasks = 0
399 result = LightQueue._format(self)
400 if self.unfinished_tasks:
401 result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
404 def _put(self, item):
405 LightQueue._put(self, item)
406 self._put_bookkeeping()
408 def _put_bookkeeping(self):
409 self.unfinished_tasks += 1
410 if self._cond.ready():
414 '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
415 For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
416 :meth:`task_done` tells the queue that the processing on the task is complete.
418 If a :meth:`join` is currently blocking, it will resume when all items have been processed
419 (meaning that a :meth:`task_done` call was received for every item that had been
420 :meth:`put <Queue.put>` into the queue).
422 Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
425 if self.unfinished_tasks <= 0:
426 raise ValueError('task_done() called too many times')
427 self.unfinished_tasks -= 1
428 if self.unfinished_tasks == 0:
429 self._cond.send(None)
432 '''Block until all items in the queue have been gotten and processed.
434 The count of unfinished tasks goes up whenever an item is added to the queue.
435 The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
436 that the item was retrieved and all work on it is complete. When the count of
437 unfinished tasks drops to zero, :meth:`join` unblocks.
439 if self.unfinished_tasks > 0:
443 class PriorityQueue(Queue):
444 '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
446 Entries are typically tuples of the form: ``(priority number, data)``.
449 def _init(self, maxsize):
452 def _put(self, item, heappush=heapq.heappush):
453 heappush(self.queue, item)
454 self._put_bookkeeping()
456 def _get(self, heappop=heapq.heappop):
457 return heappop(self.queue)
460 class LifoQueue(Queue):
461 '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
463 def _init(self, maxsize):
466 def _put(self, item):
467 self.queue.append(item)
468 self._put_bookkeeping()
471 return self.queue.pop()