Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / queue.py
1 # Copyright (c) 2009 Denis Bilenko, denis.bilenko at gmail com
2 # Copyright (c) 2010 Eventlet Contributors (see AUTHORS)
3 # and licensed under the MIT license:
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 """Synchronized queues.
24
25 The :mod:`eventlet.queue` module implements multi-producer, multi-consumer
26 queues that work across greenlets, with the API similar to the classes found in
27 the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>`
28 modules.
29
30 A major difference is that queues in this module operate as channels when
31 initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
32 and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until
33 a call to :meth:`Queue.get` retrieves the item.
34
35 An interesting difference, made possible because of greenthreads, is
36 that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be
37 used as indicators of whether the subsequent :meth:`Queue.get`
38 or :meth:`Queue.put` will not block.  The new methods :meth:`Queue.getting`
39 and :meth:`Queue.putting` report on the number of greenthreads blocking
40 in :meth:`put <Queue.put>` or :meth:`get <Queue.get>` respectively.
41 """
42 from __future__ import print_function
43
44 import sys
45 import heapq
46 import collections
47 import traceback
48
49 from eventlet.event import Event
50 from eventlet.greenthread import getcurrent
51 from eventlet.hubs import get_hub
52 from eventlet.support import six
53 from eventlet.timeout import Timeout
54
55
56 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
57
58 _NONE = object()
59 Full = six.moves.queue.Full
60 Empty = six.moves.queue.Empty
61
62
63 class Waiter(object):
64     """A low level synchronization class.
65
66     Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
67
68     * switching will occur only if the waiting greenlet is executing :meth:`wait`
69       method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
70     * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
71
72     The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
73     The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
74     """
75     __slots__ = ['greenlet']
76
77     def __init__(self):
78         self.greenlet = None
79
80     def __repr__(self):
81         if self.waiting:
82             waiting = ' waiting'
83         else:
84             waiting = ''
85         return '<%s at %s%s greenlet=%r>' % (type(self).__name__, hex(id(self)), waiting, self.greenlet)
86
87     def __str__(self):
88         """
89         >>> print(Waiter())
90         <Waiter greenlet=None>
91         """
92         if self.waiting:
93             waiting = ' waiting'
94         else:
95             waiting = ''
96         return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
97
98     def __nonzero__(self):
99         return self.greenlet is not None
100
101     __bool__ = __nonzero__
102
103     @property
104     def waiting(self):
105         return self.greenlet is not None
106
107     def switch(self, value=None):
108         """Wake up the greenlet that is calling wait() currently (if there is one).
109         Can only be called from Hub's greenlet.
110         """
111         assert getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
112         if self.greenlet is not None:
113             try:
114                 self.greenlet.switch(value)
115             except:
116                 traceback.print_exc()
117
118     def throw(self, *throw_args):
119         """Make greenlet calling wait() wake up (if there is a wait()).
120         Can only be called from Hub's greenlet.
121         """
122         assert getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
123         if self.greenlet is not None:
124             try:
125                 self.greenlet.throw(*throw_args)
126             except:
127                 traceback.print_exc()
128
129     # XXX should be renamed to get() ? and the whole class is called Receiver?
130     def wait(self):
131         """Wait until switch() or throw() is called.
132         """
133         assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
134         self.greenlet = getcurrent()
135         try:
136             return get_hub().switch()
137         finally:
138             self.greenlet = None
139
140
141 class LightQueue(object):
142     """
143     This is a variant of Queue that behaves mostly like the standard
144     :class:`Queue`.  It differs by not supporting the
145     :meth:`task_done <Queue.task_done>` or :meth:`join <Queue.join>` methods,
146     and is a little faster for not having that overhead.
147     """
148
149     def __init__(self, maxsize=None):
150         if maxsize is None or maxsize < 0:  # None is not comparable in 3.x
151             self.maxsize = None
152         else:
153             self.maxsize = maxsize
154         self.getters = set()
155         self.putters = set()
156         self._event_unlock = None
157         self._init(maxsize)
158
159     # QQQ make maxsize into a property with setter that schedules unlock if necessary
160
161     def _init(self, maxsize):
162         self.queue = collections.deque()
163
164     def _get(self):
165         return self.queue.popleft()
166
167     def _put(self, item):
168         self.queue.append(item)
169
170     def __repr__(self):
171         return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
172
173     def __str__(self):
174         return '<%s %s>' % (type(self).__name__, self._format())
175
176     def _format(self):
177         result = 'maxsize=%r' % (self.maxsize, )
178         if getattr(self, 'queue', None):
179             result += ' queue=%r' % self.queue
180         if self.getters:
181             result += ' getters[%s]' % len(self.getters)
182         if self.putters:
183             result += ' putters[%s]' % len(self.putters)
184         if self._event_unlock is not None:
185             result += ' unlocking'
186         return result
187
188     def qsize(self):
189         """Return the size of the queue."""
190         return len(self.queue)
191
192     def resize(self, size):
193         """Resizes the queue's maximum size.
194
195         If the size is increased, and there are putters waiting, they may be woken up."""
196         if self.maxsize is not None and (size is None or size > self.maxsize):  # None is not comparable in 3.x
197             # Maybe wake some stuff up
198             self._schedule_unlock()
199         self.maxsize = size
200
201     def putting(self):
202         """Returns the number of greenthreads that are blocked waiting to put
203         items into the queue."""
204         return len(self.putters)
205
206     def getting(self):
207         """Returns the number of greenthreads that are blocked waiting on an
208         empty queue."""
209         return len(self.getters)
210
211     def empty(self):
212         """Return ``True`` if the queue is empty, ``False`` otherwise."""
213         return not self.qsize()
214
215     def full(self):
216         """Return ``True`` if the queue is full, ``False`` otherwise.
217
218         ``Queue(None)`` is never full.
219         """
220         return self.maxsize is not None and self.qsize() >= self.maxsize  # None is not comparable in 3.x
221
222     def put(self, item, block=True, timeout=None):
223         """Put an item into the queue.
224
225         If optional arg *block* is true and *timeout* is ``None`` (the default),
226         block if necessary until a free slot is available. If *timeout* is
227         a positive number, it blocks at most *timeout* seconds and raises
228         the :class:`Full` exception if no free slot was available within that time.
229         Otherwise (*block* is false), put an item on the queue if a free slot
230         is immediately available, else raise the :class:`Full` exception (*timeout*
231         is ignored in that case).
232         """
233         if self.maxsize is None or self.qsize() < self.maxsize:
234             # there's a free slot, put an item right away
235             self._put(item)
236             if self.getters:
237                 self._schedule_unlock()
238         elif not block and get_hub().greenlet is getcurrent():
239             # we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
240             # find a getter and deliver an item to it
241             while self.getters:
242                 getter = self.getters.pop()
243                 if getter:
244                     self._put(item)
245                     item = self._get()
246                     getter.switch(item)
247                     return
248             raise Full
249         elif block:
250             waiter = ItemWaiter(item)
251             self.putters.add(waiter)
252             timeout = Timeout(timeout, Full)
253             try:
254                 if self.getters:
255                     self._schedule_unlock()
256                 result = waiter.wait()
257                 assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
258                 if waiter.item is not _NONE:
259                     self._put(item)
260             finally:
261                 timeout.cancel()
262                 self.putters.discard(waiter)
263         else:
264             raise Full
265
266     def put_nowait(self, item):
267         """Put an item into the queue without blocking.
268
269         Only enqueue the item if a free slot is immediately available.
270         Otherwise raise the :class:`Full` exception.
271         """
272         self.put(item, False)
273
274     def get(self, block=True, timeout=None):
275         """Remove and return an item from the queue.
276
277         If optional args *block* is true and *timeout* is ``None`` (the default),
278         block if necessary until an item is available. If *timeout* is a positive number,
279         it blocks at most *timeout* seconds and raises the :class:`Empty` exception
280         if no item was available within that time. Otherwise (*block* is false), return
281         an item if one is immediately available, else raise the :class:`Empty` exception
282         (*timeout* is ignored in that case).
283         """
284         if self.qsize():
285             if self.putters:
286                 self._schedule_unlock()
287             return self._get()
288         elif not block and get_hub().greenlet is getcurrent():
289             # special case to make get_nowait() runnable in the mainloop greenlet
290             # there are no items in the queue; try to fix the situation by unlocking putters
291             while self.putters:
292                 putter = self.putters.pop()
293                 if putter:
294                     putter.switch(putter)
295                     if self.qsize():
296                         return self._get()
297             raise Empty
298         elif block:
299             waiter = Waiter()
300             timeout = Timeout(timeout, Empty)
301             try:
302                 self.getters.add(waiter)
303                 if self.putters:
304                     self._schedule_unlock()
305                 return waiter.wait()
306             finally:
307                 self.getters.discard(waiter)
308                 timeout.cancel()
309         else:
310             raise Empty
311
312     def get_nowait(self):
313         """Remove and return an item from the queue without blocking.
314
315         Only get an item if one is immediately available. Otherwise
316         raise the :class:`Empty` exception.
317         """
318         return self.get(False)
319
320     def _unlock(self):
321         try:
322             while True:
323                 if self.qsize() and self.getters:
324                     getter = self.getters.pop()
325                     if getter:
326                         try:
327                             item = self._get()
328                         except:
329                             getter.throw(*sys.exc_info())
330                         else:
331                             getter.switch(item)
332                 elif self.putters and self.getters:
333                     putter = self.putters.pop()
334                     if putter:
335                         getter = self.getters.pop()
336                         if getter:
337                             item = putter.item
338                             putter.item = _NONE  # this makes greenlet calling put() not to call _put() again
339                             self._put(item)
340                             item = self._get()
341                             getter.switch(item)
342                             putter.switch(putter)
343                         else:
344                             self.putters.add(putter)
345                 elif self.putters and (self.getters or self.maxsize is None or self.qsize() < self.maxsize):
346                     putter = self.putters.pop()
347                     putter.switch(putter)
348                 else:
349                     break
350         finally:
351             self._event_unlock = None  # QQQ maybe it's possible to obtain this info from libevent?
352             # i.e. whether this event is pending _OR_ currently executing
353         # testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
354         # to avoid this, schedule unlock with timer(0, ...) once in a while
355
356     def _schedule_unlock(self):
357         if self._event_unlock is None:
358             self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
359
360
361 class ItemWaiter(Waiter):
362     __slots__ = ['item']
363
364     def __init__(self, item):
365         Waiter.__init__(self)
366         self.item = item
367
368
369 class Queue(LightQueue):
370     '''Create a queue object with a given maximum size.
371
372     If *maxsize* is less than zero or ``None``, the queue size is infinite.
373
374     ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
375     until the item is delivered. (This is unlike the standard :class:`Queue`,
376     where 0 means infinite size).
377
378     In all other respects, this Queue class resembled the standard library,
379     :class:`Queue`.
380     '''
381
382     def __init__(self, maxsize=None):
383         LightQueue.__init__(self, maxsize)
384         self.unfinished_tasks = 0
385         self._cond = Event()
386
387     def _format(self):
388         result = LightQueue._format(self)
389         if self.unfinished_tasks:
390             result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
391         return result
392
393     def _put(self, item):
394         LightQueue._put(self, item)
395         self._put_bookkeeping()
396
397     def _put_bookkeeping(self):
398         self.unfinished_tasks += 1
399         if self._cond.ready():
400             self._cond.reset()
401
402     def task_done(self):
403         '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
404         For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
405         that the processing on the task is complete.
406
407         If a :meth:`join` is currently blocking, it will resume when all items have been processed
408         (meaning that a :meth:`task_done` call was received for every item that had been
409         :meth:`put <Queue.put>` into the queue).
410
411         Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
412         '''
413
414         if self.unfinished_tasks <= 0:
415             raise ValueError('task_done() called too many times')
416         self.unfinished_tasks -= 1
417         if self.unfinished_tasks == 0:
418             self._cond.send(None)
419
420     def join(self):
421         '''Block until all items in the queue have been gotten and processed.
422
423         The count of unfinished tasks goes up whenever an item is added to the queue.
424         The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
425         that the item was retrieved and all work on it is complete. When the count of
426         unfinished tasks drops to zero, :meth:`join` unblocks.
427         '''
428         if self.unfinished_tasks > 0:
429             self._cond.wait()
430
431
432 class PriorityQueue(Queue):
433     '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
434
435     Entries are typically tuples of the form: ``(priority number, data)``.
436     '''
437
438     def _init(self, maxsize):
439         self.queue = []
440
441     def _put(self, item, heappush=heapq.heappush):
442         heappush(self.queue, item)
443         self._put_bookkeeping()
444
445     def _get(self, heappop=heapq.heappop):
446         return heappop(self.queue)
447
448
449 class LifoQueue(Queue):
450     '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
451
452     def _init(self, maxsize):
453         self.queue = []
454
455     def _put(self, item):
456         self.queue.append(item)
457         self._put_bookkeeping()
458
459     def _get(self):
460         return self.queue.pop()