Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / queue.py
1 # Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com
2 # Copyright (c) 2010 Eventlet Contributors (see AUTHORS)
3 # and licensed under the MIT license:
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 """Synchronized queues.
24
25 The :mod:`eventlet.queue` module implements multi-producer, multi-consumer
26 queues that work across greenlets, with the API similar to the classes found in
27 the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>`
28 modules.
29
30 A major difference is that queues in this module operate as channels when
31 initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
32 and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until
33 a call to :meth:`Queue.get` retrieves the item.
34
35 An interesting difference, made possible because of greenthreads, is
36 that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be
37 used as indicators of whether the subsequent :meth:`Queue.get`
38 or :meth:`Queue.put` will not block.  The new methods :meth:`Queue.getting`
39 and :meth:`Queue.putting` report on the number of greenthreads blocking
40 in :meth:`put <Queue.put>` or :meth:`get <Queue.get>` respectively.
41 """
42 from __future__ import print_function
43
44 import sys
45 import heapq
46 import collections
47 import traceback
48
49 from eventlet.event import Event
50 from eventlet.greenthread import getcurrent
51 from eventlet.hubs import get_hub
52 from eventlet.support import six
53 from eventlet.timeout import Timeout
54
55
56 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
57
58 _NONE = object()
59 Full = six.moves.queue.Full
60 Empty = six.moves.queue.Empty
61
62
63 class Waiter(object):
64     """A low level synchronization class.
65
66     Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
67
68     * switching will occur only if the waiting greenlet is executing :meth:`wait`
69       method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
70     * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
71
72     The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
73     The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
74     """
75     __slots__ = ['greenlet']
76
77     def __init__(self):
78         self.greenlet = None
79
80     def __repr__(self):
81         if self.waiting:
82             waiting = ' waiting'
83         else:
84             waiting = ''
85         return '<%s at %s%s greenlet=%r>' % (
86             type(self).__name__, hex(id(self)), waiting, self.greenlet,
87         )
88
89     def __str__(self):
90         """
91         >>> print(Waiter())
92         <Waiter greenlet=None>
93         """
94         if self.waiting:
95             waiting = ' waiting'
96         else:
97             waiting = ''
98         return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
99
100     def __nonzero__(self):
101         return self.greenlet is not None
102
103     __bool__ = __nonzero__
104
105     @property
106     def waiting(self):
107         return self.greenlet is not None
108
109     def switch(self, value=None):
110         """Wake up the greenlet that is calling wait() currently (if there is one).
111         Can only be called from Hub's greenlet.
112         """
113         assert getcurrent() is get_hub(
114         ).greenlet, "Can only use Waiter.switch method from the mainloop"
115         if self.greenlet is not None:
116             try:
117                 self.greenlet.switch(value)
118             except:
119                 traceback.print_exc()
120
121     def throw(self, *throw_args):
122         """Make greenlet calling wait() wake up (if there is a wait()).
123         Can only be called from Hub's greenlet.
124         """
125         assert getcurrent() is get_hub(
126         ).greenlet, "Can only use Waiter.switch method from the mainloop"
127         if self.greenlet is not None:
128             try:
129                 self.greenlet.throw(*throw_args)
130             except:
131                 traceback.print_exc()
132
133     # XXX should be renamed to get() ? and the whole class is called Receiver?
134     def wait(self):
135         """Wait until switch() or throw() is called.
136         """
137         assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
138         self.greenlet = getcurrent()
139         try:
140             return get_hub().switch()
141         finally:
142             self.greenlet = None
143
144
145 class LightQueue(object):
146     """
147     This is a variant of Queue that behaves mostly like the standard
148     :class:`Queue`.  It differs by not supporting the
149     :meth:`task_done <Queue.task_done>` or :meth:`join <Queue.join>` methods,
150     and is a little faster for not having that overhead.
151     """
152
153     def __init__(self, maxsize=None):
154         if maxsize is None or maxsize < 0:  # None is not comparable in 3.x
155             self.maxsize = None
156         else:
157             self.maxsize = maxsize
158         self.getters = set()
159         self.putters = set()
160         self._event_unlock = None
161         self._init(maxsize)
162
163     # QQQ make maxsize into a property with setter that schedules unlock if necessary
164
165     def _init(self, maxsize):
166         self.queue = collections.deque()
167
168     def _get(self):
169         return self.queue.popleft()
170
171     def _put(self, item):
172         self.queue.append(item)
173
174     def __repr__(self):
175         return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
176
177     def __str__(self):
178         return '<%s %s>' % (type(self).__name__, self._format())
179
180     def _format(self):
181         result = 'maxsize=%r' % (self.maxsize, )
182         if getattr(self, 'queue', None):
183             result += ' queue=%r' % self.queue
184         if self.getters:
185             result += ' getters[%s]' % len(self.getters)
186         if self.putters:
187             result += ' putters[%s]' % len(self.putters)
188         if self._event_unlock is not None:
189             result += ' unlocking'
190         return result
191
192     def qsize(self):
193         """Return the size of the queue."""
194         return len(self.queue)
195
196     def resize(self, size):
197         """Resizes the queue's maximum size.
198
199         If the size is increased, and there are putters waiting, they may be woken up."""
200         # None is not comparable in 3.x
201         if self.maxsize is not None and (size is None or size > self.maxsize):
202             # Maybe wake some stuff up
203             self._schedule_unlock()
204         self.maxsize = size
205
206     def putting(self):
207         """Returns the number of greenthreads that are blocked waiting to put
208         items into the queue."""
209         return len(self.putters)
210
211     def getting(self):
212         """Returns the number of greenthreads that are blocked waiting on an
213         empty queue."""
214         return len(self.getters)
215
216     def empty(self):
217         """Return ``True`` if the queue is empty, ``False`` otherwise."""
218         return not self.qsize()
219
220     def full(self):
221         """Return ``True`` if the queue is full, ``False`` otherwise.
222
223         ``Queue(None)`` is never full.
224         """
225         # None is not comparable in 3.x
226         return self.maxsize is not None and self.qsize() >= self.maxsize
227
228     def put(self, item, block=True, timeout=None):
229         """Put an item into the queue.
230
231         If optional arg *block* is true and *timeout* is ``None`` (the default),
232         block if necessary until a free slot is available. If *timeout* is
233         a positive number, it blocks at most *timeout* seconds and raises
234         the :class:`Full` exception if no free slot was available within that time.
235         Otherwise (*block* is false), put an item on the queue if a free slot
236         is immediately available, else raise the :class:`Full` exception (*timeout*
237         is ignored in that case).
238         """
239         if self.maxsize is None or self.qsize() < self.maxsize:
240             # there's a free slot, put an item right away
241             self._put(item)
242             if self.getters:
243                 self._schedule_unlock()
244         elif not block and get_hub().greenlet is getcurrent():
245             # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
246             # find a getter and deliver an item to it
247             while self.getters:
248                 getter = self.getters.pop()
249                 if getter:
250                     self._put(item)
251                     item = self._get()
252                     getter.switch(item)
253                     return
254             raise Full
255         elif block:
256             waiter = ItemWaiter(item)
257             self.putters.add(waiter)
258             timeout = Timeout(timeout, Full)
259             try:
260                 if self.getters:
261                     self._schedule_unlock()
262                 result = waiter.wait()
263                 assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
264                 if waiter.item is not _NONE:
265                     self._put(item)
266             finally:
267                 timeout.cancel()
268                 self.putters.discard(waiter)
269         else:
270             raise Full
271
272     def put_nowait(self, item):
273         """Put an item into the queue without blocking.
274
275         Only enqueue the item if a free slot is immediately available.
276         Otherwise raise the :class:`Full` exception.
277         """
278         self.put(item, False)
279
280     def get(self, block=True, timeout=None):
281         """Remove and return an item from the queue.
282
283         If optional args *block* is true and *timeout* is ``None`` (the default),
284         block if necessary until an item is available. If *timeout* is a positive number,
285         it blocks at most *timeout* seconds and raises the :class:`Empty` exception
286         if no item was available within that time. Otherwise (*block* is false), return
287         an item if one is immediately available, else raise the :class:`Empty` exception
288         (*timeout* is ignored in that case).
289         """
290         if self.qsize():
291             if self.putters:
292                 self._schedule_unlock()
293             return self._get()
294         elif not block and get_hub().greenlet is getcurrent():
295             # special case to make get_nowait() runnable in the mainloop greenlet
296             # there are no items in the queue; try to fix the situation by unlocking putters
297             while self.putters:
298                 putter = self.putters.pop()
299                 if putter:
300                     putter.switch(putter)
301                     if self.qsize():
302                         return self._get()
303             raise Empty
304         elif block:
305             waiter = Waiter()
306             timeout = Timeout(timeout, Empty)
307             try:
308                 self.getters.add(waiter)
309                 if self.putters:
310                     self._schedule_unlock()
311                 return waiter.wait()
312             finally:
313                 self.getters.discard(waiter)
314                 timeout.cancel()
315         else:
316             raise Empty
317
318     def get_nowait(self):
319         """Remove and return an item from the queue without blocking.
320
321         Only get an item if one is immediately available. Otherwise
322         raise the :class:`Empty` exception.
323         """
324         return self.get(False)
325
326     def _unlock(self):
327         try:
328             while True:
329                 if self.qsize() and self.getters:
330                     getter = self.getters.pop()
331                     if getter:
332                         try:
333                             item = self._get()
334                         except:
335                             getter.throw(*sys.exc_info())
336                         else:
337                             getter.switch(item)
338                 elif self.putters and self.getters:
339                     putter = self.putters.pop()
340                     if putter:
341                         getter = self.getters.pop()
342                         if getter:
343                             item = putter.item
344                             # this makes greenlet calling put() not to call _put() again
345                             putter.item = _NONE
346                             self._put(item)
347                             item = self._get()
348                             getter.switch(item)
349                             putter.switch(putter)
350                         else:
351                             self.putters.add(putter)
352                 elif self.putters and (self.getters or
353                                        self.maxsize is None or
354                                        self.qsize() < self.maxsize):
355                     putter = self.putters.pop()
356                     putter.switch(putter)
357                 else:
358                     break
359         finally:
360             self._event_unlock = None  # QQQ maybe it's possible to obtain this info from libevent?
361             # i.e. whether this event is pending _OR_ currently executing
362         # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
363         # to avoid this, schedule unlock with timer(0, ...) once in a while
364
365     def _schedule_unlock(self):
366         if self._event_unlock is None:
367             self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
368
369
370 class ItemWaiter(Waiter):
371     __slots__ = ['item']
372
373     def __init__(self, item):
374         Waiter.__init__(self)
375         self.item = item
376
377
378 class Queue(LightQueue):
379     '''Create a queue object with a given maximum size.
380
381     If *maxsize* is less than zero or ``None``, the queue size is infinite.
382
383     ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
384     until the item is delivered. (This is unlike the standard :class:`Queue`,
385     where 0 means infinite size).
386
387     In all other respects, this Queue class resembled the standard library,
388     :class:`Queue`.
389     '''
390
391     def __init__(self, maxsize=None):
392         LightQueue.__init__(self, maxsize)
393         self.unfinished_tasks = 0
394         self._cond = Event()
395
396     def _format(self):
397         result = LightQueue._format(self)
398         if self.unfinished_tasks:
399             result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
400         return result
401
402     def _put(self, item):
403         LightQueue._put(self, item)
404         self._put_bookkeeping()
405
406     def _put_bookkeeping(self):
407         self.unfinished_tasks += 1
408         if self._cond.ready():
409             self._cond.reset()
410
411     def task_done(self):
412         '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
413         For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
414         :meth:`task_done` tells the queue that the processing on the task is complete.
415
416         If a :meth:`join` is currently blocking, it will resume when all items have been processed
417         (meaning that a :meth:`task_done` call was received for every item that had been
418         :meth:`put <Queue.put>` into the queue).
419
420         Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
421         '''
422
423         if self.unfinished_tasks <= 0:
424             raise ValueError('task_done() called too many times')
425         self.unfinished_tasks -= 1
426         if self.unfinished_tasks == 0:
427             self._cond.send(None)
428
429     def join(self):
430         '''Block until all items in the queue have been gotten and processed.
431
432         The count of unfinished tasks goes up whenever an item is added to the queue.
433         The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
434         that the item was retrieved and all work on it is complete. When the count of
435         unfinished tasks drops to zero, :meth:`join` unblocks.
436         '''
437         if self.unfinished_tasks > 0:
438             self._cond.wait()
439
440
441 class PriorityQueue(Queue):
442     '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
443
444     Entries are typically tuples of the form: ``(priority number, data)``.
445     '''
446
447     def _init(self, maxsize):
448         self.queue = []
449
450     def _put(self, item, heappush=heapq.heappush):
451         heappush(self.queue, item)
452         self._put_bookkeeping()
453
454     def _get(self, heappop=heapq.heappop):
455         return heappop(self.queue)
456
457
458 class LifoQueue(Queue):
459     '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
460
461     def _init(self, maxsize):
462         self.queue = []
463
464     def _put(self, item):
465         self.queue.append(item)
466         self._put_bookkeeping()
467
468     def _get(self):
469         return self.queue.pop()