Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / queue.py
1 # Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com
2 # Copyright (c) 2010 Eventlet Contributors (see AUTHORS)
3 # and licensed under the MIT license:
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 """Synchronized queues.
24
25 The :mod:`eventlet.queue` module implements multi-producer, multi-consumer
26 queues that work across greenlets, with the API similar to the classes found in
27 the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>`
28 modules.
29
30 A major difference is that queues in this module operate as channels when
31 initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
32 and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until
33 a call to :meth:`Queue.get` retrieves the item.
34
35 An interesting difference, made possible because of greenthreads, is
36 that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be
37 used as indicators of whether the subsequent :meth:`Queue.get`
38 or :meth:`Queue.put` will not block.  The new methods :meth:`Queue.getting`
39 and :meth:`Queue.putting` report on the number of greenthreads blocking
40 in :meth:`put <Queue.put>` or :meth:`get <Queue.get>` respectively.
41 """
42 from __future__ import print_function
43
44 import sys
45 import heapq
46 import collections
47 import traceback
48
49 from eventlet.event import Event
50 from eventlet.greenthread import getcurrent
51 from eventlet.hubs import get_hub
52 from eventlet.support import six
53 from eventlet.support.six.moves import queue as Stdlib_Queue
54 from eventlet.timeout import Timeout
55
56
57 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
58
59 _NONE = object()
60 Full = six.moves.queue.Full
61 Empty = six.moves.queue.Empty
62
63
64 class Waiter(object):
65     """A low level synchronization class.
66
67     Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
68
69     * switching will occur only if the waiting greenlet is executing :meth:`wait`
70       method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
71     * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
72
73     The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
74     The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
75     """
76     __slots__ = ['greenlet']
77
78     def __init__(self):
79         self.greenlet = None
80
81     def __repr__(self):
82         if self.waiting:
83             waiting = ' waiting'
84         else:
85             waiting = ''
86         return '<%s at %s%s greenlet=%r>' % (
87             type(self).__name__, hex(id(self)), waiting, self.greenlet,
88         )
89
90     def __str__(self):
91         """
92         >>> print(Waiter())
93         <Waiter greenlet=None>
94         """
95         if self.waiting:
96             waiting = ' waiting'
97         else:
98             waiting = ''
99         return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
100
101     def __nonzero__(self):
102         return self.greenlet is not None
103
104     __bool__ = __nonzero__
105
106     @property
107     def waiting(self):
108         return self.greenlet is not None
109
110     def switch(self, value=None):
111         """Wake up the greenlet that is calling wait() currently (if there is one).
112         Can only be called from Hub's greenlet.
113         """
114         assert getcurrent() is get_hub(
115         ).greenlet, "Can only use Waiter.switch method from the mainloop"
116         if self.greenlet is not None:
117             try:
118                 self.greenlet.switch(value)
119             except:
120                 traceback.print_exc()
121
122     def throw(self, *throw_args):
123         """Make greenlet calling wait() wake up (if there is a wait()).
124         Can only be called from Hub's greenlet.
125         """
126         assert getcurrent() is get_hub(
127         ).greenlet, "Can only use Waiter.switch method from the mainloop"
128         if self.greenlet is not None:
129             try:
130                 self.greenlet.throw(*throw_args)
131             except:
132                 traceback.print_exc()
133
134     # XXX should be renamed to get() ? and the whole class is called Receiver?
135     def wait(self):
136         """Wait until switch() or throw() is called.
137         """
138         assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
139         self.greenlet = getcurrent()
140         try:
141             return get_hub().switch()
142         finally:
143             self.greenlet = None
144
145
146 class LightQueue(object):
147     """
148     This is a variant of Queue that behaves mostly like the standard
149     :class:`Stdlib_Queue`.  It differs by not supporting the
150     :meth:`task_done <Stdlib_Queue.task_done>` or
151     :meth:`join <Stdlib_Queue.join>` methods, and is a little faster for
152     not having that overhead.
153     """
154
155     def __init__(self, maxsize=None):
156         if maxsize is None or maxsize < 0:  # None is not comparable in 3.x
157             self.maxsize = None
158         else:
159             self.maxsize = maxsize
160         self.getters = set()
161         self.putters = set()
162         self._event_unlock = None
163         self._init(maxsize)
164
165     # QQQ make maxsize into a property with setter that schedules unlock if necessary
166
167     def _init(self, maxsize):
168         self.queue = collections.deque()
169
170     def _get(self):
171         return self.queue.popleft()
172
173     def _put(self, item):
174         self.queue.append(item)
175
176     def __repr__(self):
177         return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
178
179     def __str__(self):
180         return '<%s %s>' % (type(self).__name__, self._format())
181
182     def _format(self):
183         result = 'maxsize=%r' % (self.maxsize, )
184         if getattr(self, 'queue', None):
185             result += ' queue=%r' % self.queue
186         if self.getters:
187             result += ' getters[%s]' % len(self.getters)
188         if self.putters:
189             result += ' putters[%s]' % len(self.putters)
190         if self._event_unlock is not None:
191             result += ' unlocking'
192         return result
193
194     def qsize(self):
195         """Return the size of the queue."""
196         return len(self.queue)
197
198     def resize(self, size):
199         """Resizes the queue's maximum size.
200
201         If the size is increased, and there are putters waiting, they may be woken up."""
202         # None is not comparable in 3.x
203         if self.maxsize is not None and (size is None or size > self.maxsize):
204             # Maybe wake some stuff up
205             self._schedule_unlock()
206         self.maxsize = size
207
208     def putting(self):
209         """Returns the number of greenthreads that are blocked waiting to put
210         items into the queue."""
211         return len(self.putters)
212
213     def getting(self):
214         """Returns the number of greenthreads that are blocked waiting on an
215         empty queue."""
216         return len(self.getters)
217
218     def empty(self):
219         """Return ``True`` if the queue is empty, ``False`` otherwise."""
220         return not self.qsize()
221
222     def full(self):
223         """Return ``True`` if the queue is full, ``False`` otherwise.
224
225         ``Queue(None)`` is never full.
226         """
227         # None is not comparable in 3.x
228         return self.maxsize is not None and self.qsize() >= self.maxsize
229
230     def put(self, item, block=True, timeout=None):
231         """Put an item into the queue.
232
233         If optional arg *block* is true and *timeout* is ``None`` (the default),
234         block if necessary until a free slot is available. If *timeout* is
235         a positive number, it blocks at most *timeout* seconds and raises
236         the :class:`Full` exception if no free slot was available within that time.
237         Otherwise (*block* is false), put an item on the queue if a free slot
238         is immediately available, else raise the :class:`Full` exception (*timeout*
239         is ignored in that case).
240         """
241         if self.maxsize is None or self.qsize() < self.maxsize:
242             # there's a free slot, put an item right away
243             self._put(item)
244             if self.getters:
245                 self._schedule_unlock()
246         elif not block and get_hub().greenlet is getcurrent():
247             # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
248             # find a getter and deliver an item to it
249             while self.getters:
250                 getter = self.getters.pop()
251                 if getter:
252                     self._put(item)
253                     item = self._get()
254                     getter.switch(item)
255                     return
256             raise Full
257         elif block:
258             waiter = ItemWaiter(item)
259             self.putters.add(waiter)
260             timeout = Timeout(timeout, Full)
261             try:
262                 if self.getters:
263                     self._schedule_unlock()
264                 result = waiter.wait()
265                 assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
266                 if waiter.item is not _NONE:
267                     self._put(item)
268             finally:
269                 timeout.cancel()
270                 self.putters.discard(waiter)
271         else:
272             raise Full
273
274     def put_nowait(self, item):
275         """Put an item into the queue without blocking.
276
277         Only enqueue the item if a free slot is immediately available.
278         Otherwise raise the :class:`Full` exception.
279         """
280         self.put(item, False)
281
282     def get(self, block=True, timeout=None):
283         """Remove and return an item from the queue.
284
285         If optional args *block* is true and *timeout* is ``None`` (the default),
286         block if necessary until an item is available. If *timeout* is a positive number,
287         it blocks at most *timeout* seconds and raises the :class:`Empty` exception
288         if no item was available within that time. Otherwise (*block* is false), return
289         an item if one is immediately available, else raise the :class:`Empty` exception
290         (*timeout* is ignored in that case).
291         """
292         if self.qsize():
293             if self.putters:
294                 self._schedule_unlock()
295             return self._get()
296         elif not block and get_hub().greenlet is getcurrent():
297             # special case to make get_nowait() runnable in the mainloop greenlet
298             # there are no items in the queue; try to fix the situation by unlocking putters
299             while self.putters:
300                 putter = self.putters.pop()
301                 if putter:
302                     putter.switch(putter)
303                     if self.qsize():
304                         return self._get()
305             raise Empty
306         elif block:
307             waiter = Waiter()
308             timeout = Timeout(timeout, Empty)
309             try:
310                 self.getters.add(waiter)
311                 if self.putters:
312                     self._schedule_unlock()
313                 return waiter.wait()
314             finally:
315                 self.getters.discard(waiter)
316                 timeout.cancel()
317         else:
318             raise Empty
319
320     def get_nowait(self):
321         """Remove and return an item from the queue without blocking.
322
323         Only get an item if one is immediately available. Otherwise
324         raise the :class:`Empty` exception.
325         """
326         return self.get(False)
327
328     def _unlock(self):
329         try:
330             while True:
331                 if self.qsize() and self.getters:
332                     getter = self.getters.pop()
333                     if getter:
334                         try:
335                             item = self._get()
336                         except:
337                             getter.throw(*sys.exc_info())
338                         else:
339                             getter.switch(item)
340                 elif self.putters and self.getters:
341                     putter = self.putters.pop()
342                     if putter:
343                         getter = self.getters.pop()
344                         if getter:
345                             item = putter.item
346                             # this makes greenlet calling put() not to call _put() again
347                             putter.item = _NONE
348                             self._put(item)
349                             item = self._get()
350                             getter.switch(item)
351                             putter.switch(putter)
352                         else:
353                             self.putters.add(putter)
354                 elif self.putters and (self.getters or
355                                        self.maxsize is None or
356                                        self.qsize() < self.maxsize):
357                     putter = self.putters.pop()
358                     putter.switch(putter)
359                 else:
360                     break
361         finally:
362             self._event_unlock = None  # QQQ maybe it's possible to obtain this info from libevent?
363             # i.e. whether this event is pending _OR_ currently executing
364         # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
365         # to avoid this, schedule unlock with timer(0, ...) once in a while
366
367     def _schedule_unlock(self):
368         if self._event_unlock is None:
369             self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
370
371
372 class ItemWaiter(Waiter):
373     __slots__ = ['item']
374
375     def __init__(self, item):
376         Waiter.__init__(self)
377         self.item = item
378
379
380 class Queue(LightQueue):
381     '''Create a queue object with a given maximum size.
382
383     If *maxsize* is less than zero or ``None``, the queue size is infinite.
384
385     ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
386     until the item is delivered. (This is unlike the standard
387     :class:`Stdlib_Queue`, where 0 means infinite size).
388
389     In all other respects, this Queue class resembles the standard library,
390     :class:`Stdlib_Queue`.
391     '''
392
393     def __init__(self, maxsize=None):
394         LightQueue.__init__(self, maxsize)
395         self.unfinished_tasks = 0
396         self._cond = Event()
397
398     def _format(self):
399         result = LightQueue._format(self)
400         if self.unfinished_tasks:
401             result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
402         return result
403
404     def _put(self, item):
405         LightQueue._put(self, item)
406         self._put_bookkeeping()
407
408     def _put_bookkeeping(self):
409         self.unfinished_tasks += 1
410         if self._cond.ready():
411             self._cond.reset()
412
413     def task_done(self):
414         '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
415         For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
416         :meth:`task_done` tells the queue that the processing on the task is complete.
417
418         If a :meth:`join` is currently blocking, it will resume when all items have been processed
419         (meaning that a :meth:`task_done` call was received for every item that had been
420         :meth:`put <Queue.put>` into the queue).
421
422         Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
423         '''
424
425         if self.unfinished_tasks <= 0:
426             raise ValueError('task_done() called too many times')
427         self.unfinished_tasks -= 1
428         if self.unfinished_tasks == 0:
429             self._cond.send(None)
430
431     def join(self):
432         '''Block until all items in the queue have been gotten and processed.
433
434         The count of unfinished tasks goes up whenever an item is added to the queue.
435         The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
436         that the item was retrieved and all work on it is complete. When the count of
437         unfinished tasks drops to zero, :meth:`join` unblocks.
438         '''
439         if self.unfinished_tasks > 0:
440             self._cond.wait()
441
442
443 class PriorityQueue(Queue):
444     '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
445
446     Entries are typically tuples of the form: ``(priority number, data)``.
447     '''
448
449     def _init(self, maxsize):
450         self.queue = []
451
452     def _put(self, item, heappush=heapq.heappush):
453         heappush(self.queue, item)
454         self._put_bookkeeping()
455
456     def _get(self, heappop=heapq.heappop):
457         return heappop(self.queue)
458
459
460 class LifoQueue(Queue):
461     '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
462
463     def _init(self, maxsize):
464         self.queue = []
465
466     def _put(self, item):
467         self.queue.append(item)
468         self._put_bookkeeping()
469
470     def _get(self):
471         return self.queue.pop()