Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / coros.py
1 from __future__ import print_function
2
3 import collections
4 import traceback
5 import warnings
6
7 import eventlet
8 from eventlet import event as _event
9 from eventlet import hubs
10 from eventlet import greenthread
11 from eventlet import semaphore as semaphoremod
12
13
14 class NOT_USED:
15     def __repr__(self):
16         return 'NOT_USED'
17
18 NOT_USED = NOT_USED()
19
20
21 def Event(*a, **kw):
22     warnings.warn("The Event class has been moved to the event module! "
23                   "Please construct event.Event objects instead.",
24                   DeprecationWarning, stacklevel=2)
25     return _event.Event(*a, **kw)
26
27
28 def event(*a, **kw):
29     warnings.warn(
30         "The event class has been capitalized and moved!  Please "
31         "construct event.Event objects instead.",
32         DeprecationWarning, stacklevel=2)
33     return _event.Event(*a, **kw)
34
35
36 def Semaphore(count):
37     warnings.warn(
38         "The Semaphore class has moved!  Please "
39         "use semaphore.Semaphore instead.",
40         DeprecationWarning, stacklevel=2)
41     return semaphoremod.Semaphore(count)
42
43
44 def BoundedSemaphore(count):
45     warnings.warn(
46         "The BoundedSemaphore class has moved!  Please "
47         "use semaphore.BoundedSemaphore instead.",
48         DeprecationWarning, stacklevel=2)
49     return semaphoremod.BoundedSemaphore(count)
50
51
52 def semaphore(count=0, limit=None):
53     warnings.warn(
54         "coros.semaphore is deprecated.  Please use either "
55         "semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
56         DeprecationWarning, stacklevel=2)
57     if limit is None:
58         return Semaphore(count)
59     else:
60         return BoundedSemaphore(count)
61
62
63 class metaphore(object):
64     """This is sort of an inverse semaphore: a counter that starts at 0 and
65     waits only if nonzero. It's used to implement a "wait for all" scenario.
66
67     >>> from eventlet import api, coros
68     >>> count = coros.metaphore()
69     >>> count.wait()
70     >>> def decrementer(count, id):
71     ...     print("{0} decrementing".format(id))
72     ...     count.dec()
73     ...
74     >>> _ = eventlet.spawn(decrementer, count, 'A')
75     >>> _ = eventlet.spawn(decrementer, count, 'B')
76     >>> count.inc(2)
77     >>> count.wait()
78     A decrementing
79     B decrementing
80     """
81
82     def __init__(self):
83         self.counter = 0
84         self.event = _event.Event()
85         # send() right away, else we'd wait on the default 0 count!
86         self.event.send()
87
88     def inc(self, by=1):
89         """Increment our counter. If this transitions the counter from zero to
90         nonzero, make any subsequent :meth:`wait` call wait.
91         """
92         assert by > 0
93         self.counter += by
94         if self.counter == by:
95             # If we just incremented self.counter by 'by', and the new count
96             # equals 'by', then the old value of self.counter was 0.
97             # Transitioning from 0 to a nonzero value means wait() must
98             # actually wait.
99             self.event.reset()
100
101     def dec(self, by=1):
102         """Decrement our counter. If this transitions the counter from nonzero
103         to zero, a current or subsequent wait() call need no longer wait.
104         """
105         assert by > 0
106         self.counter -= by
107         if self.counter <= 0:
108             # Don't leave self.counter < 0, that will screw things up in
109             # future calls.
110             self.counter = 0
111             # Transitioning from nonzero to 0 means wait() need no longer wait.
112             self.event.send()
113
114     def wait(self):
115         """Suspend the caller only if our count is nonzero. In that case,
116         resume the caller once the count decrements to zero again.
117         """
118         self.event.wait()
119
120
121 def execute(func, *args, **kw):
122     """ Executes an operation asynchronously in a new coroutine, returning
123     an event to retrieve the return value.
124
125     This has the same api as the :meth:`eventlet.coros.CoroutinePool.execute`
126     method; the only difference is that this one creates a new coroutine
127     instead of drawing from a pool.
128
129     >>> from eventlet import coros
130     >>> evt = coros.execute(lambda a: ('foo', a), 1)
131     >>> evt.wait()
132     ('foo', 1)
133     """
134     warnings.warn(
135         "Coros.execute is deprecated.  Please use eventlet.spawn "
136         "instead.", DeprecationWarning, stacklevel=2)
137     return greenthread.spawn(func, *args, **kw)
138
139
140 def CoroutinePool(*args, **kwargs):
141     warnings.warn(
142         "CoroutinePool is deprecated.  Please use "
143         "eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2)
144     from eventlet.pool import Pool
145     return Pool(*args, **kwargs)
146
147
148 class Queue(object):
149
150     def __init__(self):
151         warnings.warn(
152             "coros.Queue is deprecated.  Please use "
153             "eventlet.queue.Queue instead.",
154             DeprecationWarning, stacklevel=2)
155         self.items = collections.deque()
156         self._waiters = set()
157
158     def __nonzero__(self):
159         return len(self.items) > 0
160
161     __bool__ = __nonzero__
162
163     def __len__(self):
164         return len(self.items)
165
166     def __repr__(self):
167         params = (self.__class__.__name__, hex(id(self)),
168                   len(self.items), len(self._waiters))
169         return '<%s at %s items[%d] _waiters[%s]>' % params
170
171     def send(self, result=None, exc=None):
172         if exc is not None and not isinstance(exc, tuple):
173             exc = (exc, )
174         self.items.append((result, exc))
175         if self._waiters:
176             hubs.get_hub().schedule_call_global(0, self._do_send)
177
178     def send_exception(self, *args):
179         # the arguments are the same as for greenlet.throw
180         return self.send(exc=args)
181
182     def _do_send(self):
183         if self._waiters and self.items:
184             waiter = self._waiters.pop()
185             result, exc = self.items.popleft()
186             waiter.switch((result, exc))
187
188     def wait(self):
189         if self.items:
190             result, exc = self.items.popleft()
191             if exc is None:
192                 return result
193             else:
194                 eventlet.getcurrent().throw(*exc)
195         else:
196             self._waiters.add(eventlet.getcurrent())
197             try:
198                 result, exc = hubs.get_hub().switch()
199                 if exc is None:
200                     return result
201                 else:
202                     eventlet.getcurrent().throw(*exc)
203             finally:
204                 self._waiters.discard(eventlet.getcurrent())
205
206     def ready(self):
207         return len(self.items) > 0
208
209     def full(self):
210         # for consistency with Channel
211         return False
212
213     def waiting(self):
214         return len(self._waiters)
215
216     def __iter__(self):
217         return self
218
219     def next(self):
220         return self.wait()
221
222
223 class Channel(object):
224
225     def __init__(self, max_size=0):
226         warnings.warn(
227             "coros.Channel is deprecated.  Please use "
228             "eventlet.queue.Queue(0) instead.",
229             DeprecationWarning, stacklevel=2)
230         self.max_size = max_size
231         self.items = collections.deque()
232         self._waiters = set()
233         self._senders = set()
234
235     def __nonzero__(self):
236         return len(self.items) > 0
237
238     __bool__ = __nonzero__
239
240     def __len__(self):
241         return len(self.items)
242
243     def __repr__(self):
244         params = (self.__class__.__name__, hex(id(self)),
245                   self.max_size, len(self.items),
246                   len(self._waiters), len(self._senders))
247         return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
248
249     def send(self, result=None, exc=None):
250         if exc is not None and not isinstance(exc, tuple):
251             exc = (exc, )
252         if eventlet.getcurrent() is hubs.get_hub().greenlet:
253             self.items.append((result, exc))
254             if self._waiters:
255                 hubs.get_hub().schedule_call_global(0, self._do_switch)
256         else:
257             self.items.append((result, exc))
258             # note that send() does not work well with timeouts. if your timeout fires
259             # after this point, the item will remain in the queue
260             if self._waiters:
261                 hubs.get_hub().schedule_call_global(0, self._do_switch)
262             if len(self.items) > self.max_size:
263                 self._senders.add(eventlet.getcurrent())
264                 try:
265                     hubs.get_hub().switch()
266                 finally:
267                     self._senders.discard(eventlet.getcurrent())
268
269     def send_exception(self, *args):
270         # the arguments are the same as for greenlet.throw
271         return self.send(exc=args)
272
273     def _do_switch(self):
274         while True:
275             if self._waiters and self.items:
276                 waiter = self._waiters.pop()
277                 result, exc = self.items.popleft()
278                 try:
279                     waiter.switch((result, exc))
280                 except:
281                     traceback.print_exc()
282             elif self._senders and len(self.items) <= self.max_size:
283                 sender = self._senders.pop()
284                 try:
285                     sender.switch()
286                 except:
287                     traceback.print_exc()
288             else:
289                 break
290
291     def wait(self):
292         if self.items:
293             result, exc = self.items.popleft()
294             if len(self.items) <= self.max_size:
295                 hubs.get_hub().schedule_call_global(0, self._do_switch)
296             if exc is None:
297                 return result
298             else:
299                 eventlet.getcurrent().throw(*exc)
300         else:
301             if self._senders:
302                 hubs.get_hub().schedule_call_global(0, self._do_switch)
303             self._waiters.add(eventlet.getcurrent())
304             try:
305                 result, exc = hubs.get_hub().switch()
306                 if exc is None:
307                     return result
308                 else:
309                     eventlet.getcurrent().throw(*exc)
310             finally:
311                 self._waiters.discard(eventlet.getcurrent())
312
313     def ready(self):
314         return len(self.items) > 0
315
316     def full(self):
317         return len(self.items) >= self.max_size
318
319     def waiting(self):
320         return max(0, len(self._waiters) - len(self.items))
321
322
323 def queue(max_size=None):
324     if max_size is None:
325         return Queue()
326     else:
327         return Channel(max_size)