1 # Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com
2 # Copyright (c) 2010 Eventlet Contributors (see AUTHORS)
3 # and licensed under the MIT license:
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 """Synchronized queues.
25 The :mod:`eventlet.queue` module implements multi-producer, multi-consumer
26 queues that work across greenlets, with the API similar to the classes found in
27 the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>`
30 A major difference is that queues in this module operate as channels when
31 initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
32 and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until
33 a call to :meth:`Queue.get` retrieves the item.
35 An interesting difference, made possible because of greenthreads, is
36 that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be
37 used as indicators of whether the subsequent :meth:`Queue.get`
38 or :meth:`Queue.put` will not block. The new methods :meth:`Queue.getting`
39 and :meth:`Queue.putting` report on the number of greenthreads blocking
40 in :meth:`put <Queue.put>` or :meth:`get <Queue.get>` respectively.
42 from __future__ import print_function
49 from eventlet.event import Event
50 from eventlet.greenthread import getcurrent
51 from eventlet.hubs import get_hub
52 from eventlet.support import six
53 from eventlet.timeout import Timeout
56 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
59 Full = six.moves.queue.Full
60 Empty = six.moves.queue.Empty
64 """A low level synchronization class.
66 Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
68 * switching will occur only if the waiting greenlet is executing :meth:`wait`
69 method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
70 * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
72 The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
73 The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
75 __slots__ = ['greenlet']
85 return '<%s at %s%s greenlet=%r>' % (
86 type(self).__name__, hex(id(self)), waiting, self.greenlet,
92 <Waiter greenlet=None>
98 return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
100 def __nonzero__(self):
101 return self.greenlet is not None
103 __bool__ = __nonzero__
107 return self.greenlet is not None
109 def switch(self, value=None):
110 """Wake up the greenlet that is calling wait() currently (if there is one).
111 Can only be called from Hub's greenlet.
113 assert getcurrent() is get_hub(
114 ).greenlet, "Can only use Waiter.switch method from the mainloop"
115 if self.greenlet is not None:
117 self.greenlet.switch(value)
119 traceback.print_exc()
121 def throw(self, *throw_args):
122 """Make greenlet calling wait() wake up (if there is a wait()).
123 Can only be called from Hub's greenlet.
125 assert getcurrent() is get_hub(
126 ).greenlet, "Can only use Waiter.switch method from the mainloop"
127 if self.greenlet is not None:
129 self.greenlet.throw(*throw_args)
131 traceback.print_exc()
133 # XXX should be renamed to get() ? and the whole class is called Receiver?
135 """Wait until switch() or throw() is called.
137 assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
138 self.greenlet = getcurrent()
140 return get_hub().switch()
145 class LightQueue(object):
147 This is a variant of Queue that behaves mostly like the standard
148 :class:`Queue`. It differs by not supporting the
149 :meth:`task_done <Queue.task_done>` or :meth:`join <Queue.join>` methods,
150 and is a little faster for not having that overhead.
153 def __init__(self, maxsize=None):
154 if maxsize is None or maxsize < 0: # None is not comparable in 3.x
157 self.maxsize = maxsize
160 self._event_unlock = None
163 # QQQ make maxsize into a property with setter that schedules unlock if necessary
165 def _init(self, maxsize):
166 self.queue = collections.deque()
169 return self.queue.popleft()
171 def _put(self, item):
172 self.queue.append(item)
175 return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
178 return '<%s %s>' % (type(self).__name__, self._format())
181 result = 'maxsize=%r' % (self.maxsize, )
182 if getattr(self, 'queue', None):
183 result += ' queue=%r' % self.queue
185 result += ' getters[%s]' % len(self.getters)
187 result += ' putters[%s]' % len(self.putters)
188 if self._event_unlock is not None:
189 result += ' unlocking'
193 """Return the size of the queue."""
194 return len(self.queue)
196 def resize(self, size):
197 """Resizes the queue's maximum size.
199 If the size is increased, and there are putters waiting, they may be woken up."""
200 # None is not comparable in 3.x
201 if self.maxsize is not None and (size is None or size > self.maxsize):
202 # Maybe wake some stuff up
203 self._schedule_unlock()
207 """Returns the number of greenthreads that are blocked waiting to put
208 items into the queue."""
209 return len(self.putters)
212 """Returns the number of greenthreads that are blocked waiting on an
214 return len(self.getters)
217 """Return ``True`` if the queue is empty, ``False`` otherwise."""
218 return not self.qsize()
221 """Return ``True`` if the queue is full, ``False`` otherwise.
223 ``Queue(None)`` is never full.
225 # None is not comparable in 3.x
226 return self.maxsize is not None and self.qsize() >= self.maxsize
228 def put(self, item, block=True, timeout=None):
229 """Put an item into the queue.
231 If optional arg *block* is true and *timeout* is ``None`` (the default),
232 block if necessary until a free slot is available. If *timeout* is
233 a positive number, it blocks at most *timeout* seconds and raises
234 the :class:`Full` exception if no free slot was available within that time.
235 Otherwise (*block* is false), put an item on the queue if a free slot
236 is immediately available, else raise the :class:`Full` exception (*timeout*
237 is ignored in that case).
239 if self.maxsize is None or self.qsize() < self.maxsize:
240 # there's a free slot, put an item right away
243 self._schedule_unlock()
244 elif not block and get_hub().greenlet is getcurrent():
245 # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
246 # find a getter and deliver an item to it
248 getter = self.getters.pop()
256 waiter = ItemWaiter(item)
257 self.putters.add(waiter)
258 timeout = Timeout(timeout, Full)
261 self._schedule_unlock()
262 result = waiter.wait()
263 assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
264 if waiter.item is not _NONE:
268 self.putters.discard(waiter)
272 def put_nowait(self, item):
273 """Put an item into the queue without blocking.
275 Only enqueue the item if a free slot is immediately available.
276 Otherwise raise the :class:`Full` exception.
278 self.put(item, False)
280 def get(self, block=True, timeout=None):
281 """Remove and return an item from the queue.
283 If optional args *block* is true and *timeout* is ``None`` (the default),
284 block if necessary until an item is available. If *timeout* is a positive number,
285 it blocks at most *timeout* seconds and raises the :class:`Empty` exception
286 if no item was available within that time. Otherwise (*block* is false), return
287 an item if one is immediately available, else raise the :class:`Empty` exception
288 (*timeout* is ignored in that case).
292 self._schedule_unlock()
294 elif not block and get_hub().greenlet is getcurrent():
295 # special case to make get_nowait() runnable in the mainloop greenlet
296 # there are no items in the queue; try to fix the situation by unlocking putters
298 putter = self.putters.pop()
300 putter.switch(putter)
306 timeout = Timeout(timeout, Empty)
308 self.getters.add(waiter)
310 self._schedule_unlock()
313 self.getters.discard(waiter)
318 def get_nowait(self):
319 """Remove and return an item from the queue without blocking.
321 Only get an item if one is immediately available. Otherwise
322 raise the :class:`Empty` exception.
324 return self.get(False)
329 if self.qsize() and self.getters:
330 getter = self.getters.pop()
335 getter.throw(*sys.exc_info())
338 elif self.putters and self.getters:
339 putter = self.putters.pop()
341 getter = self.getters.pop()
344 # this makes greenlet calling put() not to call _put() again
349 putter.switch(putter)
351 self.putters.add(putter)
352 elif self.putters and (self.getters or
353 self.maxsize is None or
354 self.qsize() < self.maxsize):
355 putter = self.putters.pop()
356 putter.switch(putter)
360 self._event_unlock = None # QQQ maybe it's possible to obtain this info from libevent?
361 # i.e. whether this event is pending _OR_ currently executing
362 # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
363 # to avoid this, schedule unlock with timer(0, ...) once in a while
365 def _schedule_unlock(self):
366 if self._event_unlock is None:
367 self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
370 class ItemWaiter(Waiter):
373 def __init__(self, item):
374 Waiter.__init__(self)
378 class Queue(LightQueue):
379 '''Create a queue object with a given maximum size.
381 If *maxsize* is less than zero or ``None``, the queue size is infinite.
383 ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
384 until the item is delivered. (This is unlike the standard :class:`Queue`,
385 where 0 means infinite size).
387 In all other respects, this Queue class resembled the standard library,
391 def __init__(self, maxsize=None):
392 LightQueue.__init__(self, maxsize)
393 self.unfinished_tasks = 0
397 result = LightQueue._format(self)
398 if self.unfinished_tasks:
399 result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
402 def _put(self, item):
403 LightQueue._put(self, item)
404 self._put_bookkeeping()
406 def _put_bookkeeping(self):
407 self.unfinished_tasks += 1
408 if self._cond.ready():
412 '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
413 For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
414 :meth:`task_done` tells the queue that the processing on the task is complete.
416 If a :meth:`join` is currently blocking, it will resume when all items have been processed
417 (meaning that a :meth:`task_done` call was received for every item that had been
418 :meth:`put <Queue.put>` into the queue).
420 Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
423 if self.unfinished_tasks <= 0:
424 raise ValueError('task_done() called too many times')
425 self.unfinished_tasks -= 1
426 if self.unfinished_tasks == 0:
427 self._cond.send(None)
430 '''Block until all items in the queue have been gotten and processed.
432 The count of unfinished tasks goes up whenever an item is added to the queue.
433 The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
434 that the item was retrieved and all work on it is complete. When the count of
435 unfinished tasks drops to zero, :meth:`join` unblocks.
437 if self.unfinished_tasks > 0:
441 class PriorityQueue(Queue):
442 '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
444 Entries are typically tuples of the form: ``(priority number, data)``.
447 def _init(self, maxsize):
450 def _put(self, item, heappush=heapq.heappush):
451 heappush(self.queue, item)
452 self._put_bookkeeping()
454 def _get(self, heappop=heapq.heappop):
455 return heappop(self.queue)
458 class LifoQueue(Queue):
459 '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
461 def _init(self, maxsize):
464 def _put(self, item):
465 self.queue.append(item)
466 self._put_bookkeeping()
469 return self.queue.pop()