1 from __future__ import print_function
8 from eventlet import event as _event
9 from eventlet import hubs
10 from eventlet import greenthread
11 from eventlet import semaphore as semaphoremod
22 warnings.warn("The Event class has been moved to the event module! "
23 "Please construct event.Event objects instead.",
24 DeprecationWarning, stacklevel=2)
25 return _event.Event(*a, **kw)
30 "The event class has been capitalized and moved! Please "
31 "construct event.Event objects instead.",
32 DeprecationWarning, stacklevel=2)
33 return _event.Event(*a, **kw)
38 "The Semaphore class has moved! Please "
39 "use semaphore.Semaphore instead.",
40 DeprecationWarning, stacklevel=2)
41 return semaphoremod.Semaphore(count)
44 def BoundedSemaphore(count):
46 "The BoundedSemaphore class has moved! Please "
47 "use semaphore.BoundedSemaphore instead.",
48 DeprecationWarning, stacklevel=2)
49 return semaphoremod.BoundedSemaphore(count)
52 def semaphore(count=0, limit=None):
54 "coros.semaphore is deprecated. Please use either "
55 "semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
56 DeprecationWarning, stacklevel=2)
58 return Semaphore(count)
60 return BoundedSemaphore(count)
63 class metaphore(object):
64 """This is sort of an inverse semaphore: a counter that starts at 0 and
65 waits only if nonzero. It's used to implement a "wait for all" scenario.
67 >>> from eventlet import api, coros
68 >>> count = coros.metaphore()
70 >>> def decrementer(count, id):
71 ... print("{0} decrementing".format(id))
74 >>> _ = eventlet.spawn(decrementer, count, 'A')
75 >>> _ = eventlet.spawn(decrementer, count, 'B')
84 self.event = _event.Event()
85 # send() right away, else we'd wait on the default 0 count!
89 """Increment our counter. If this transitions the counter from zero to
90 nonzero, make any subsequent :meth:`wait` call wait.
94 if self.counter == by:
95 # If we just incremented self.counter by 'by', and the new count
96 # equals 'by', then the old value of self.counter was 0.
97 # Transitioning from 0 to a nonzero value means wait() must
102 """Decrement our counter. If this transitions the counter from nonzero
103 to zero, a current or subsequent wait() call need no longer wait.
107 if self.counter <= 0:
108 # Don't leave self.counter < 0, that will screw things up in
111 # Transitioning from nonzero to 0 means wait() need no longer wait.
115 """Suspend the caller only if our count is nonzero. In that case,
116 resume the caller once the count decrements to zero again.
121 def execute(func, *args, **kw):
122 """ Executes an operation asynchronously in a new coroutine, returning
123 an event to retrieve the return value.
125 This has the same api as the :meth:`eventlet.coros.CoroutinePool.execute`
126 method; the only difference is that this one creates a new coroutine
127 instead of drawing from a pool.
129 >>> from eventlet import coros
130 >>> evt = coros.execute(lambda a: ('foo', a), 1)
135 "Coros.execute is deprecated. Please use eventlet.spawn "
136 "instead.", DeprecationWarning, stacklevel=2)
137 return greenthread.spawn(func, *args, **kw)
140 def CoroutinePool(*args, **kwargs):
142 "CoroutinePool is deprecated. Please use "
143 "eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2)
144 from eventlet.pool import Pool
145 return Pool(*args, **kwargs)
152 "coros.Queue is deprecated. Please use "
153 "eventlet.queue.Queue instead.",
154 DeprecationWarning, stacklevel=2)
155 self.items = collections.deque()
156 self._waiters = set()
158 def __nonzero__(self):
159 return len(self.items) > 0
161 __bool__ = __nonzero__
164 return len(self.items)
167 params = (self.__class__.__name__, hex(id(self)),
168 len(self.items), len(self._waiters))
169 return '<%s at %s items[%d] _waiters[%s]>' % params
171 def send(self, result=None, exc=None):
172 if exc is not None and not isinstance(exc, tuple):
174 self.items.append((result, exc))
176 hubs.get_hub().schedule_call_global(0, self._do_send)
178 def send_exception(self, *args):
179 # the arguments are the same as for greenlet.throw
180 return self.send(exc=args)
183 if self._waiters and self.items:
184 waiter = self._waiters.pop()
185 result, exc = self.items.popleft()
186 waiter.switch((result, exc))
190 result, exc = self.items.popleft()
194 eventlet.getcurrent().throw(*exc)
196 self._waiters.add(eventlet.getcurrent())
198 result, exc = hubs.get_hub().switch()
202 eventlet.getcurrent().throw(*exc)
204 self._waiters.discard(eventlet.getcurrent())
207 return len(self.items) > 0
210 # for consistency with Channel
214 return len(self._waiters)
223 class Channel(object):
225 def __init__(self, max_size=0):
227 "coros.Channel is deprecated. Please use "
228 "eventlet.queue.Queue(0) instead.",
229 DeprecationWarning, stacklevel=2)
230 self.max_size = max_size
231 self.items = collections.deque()
232 self._waiters = set()
233 self._senders = set()
235 def __nonzero__(self):
236 return len(self.items) > 0
238 __bool__ = __nonzero__
241 return len(self.items)
244 params = (self.__class__.__name__, hex(id(self)),
245 self.max_size, len(self.items),
246 len(self._waiters), len(self._senders))
247 return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
249 def send(self, result=None, exc=None):
250 if exc is not None and not isinstance(exc, tuple):
252 if eventlet.getcurrent() is hubs.get_hub().greenlet:
253 self.items.append((result, exc))
255 hubs.get_hub().schedule_call_global(0, self._do_switch)
257 self.items.append((result, exc))
258 # note that send() does not work well with timeouts. if your timeout fires
259 # after this point, the item will remain in the queue
261 hubs.get_hub().schedule_call_global(0, self._do_switch)
262 if len(self.items) > self.max_size:
263 self._senders.add(eventlet.getcurrent())
265 hubs.get_hub().switch()
267 self._senders.discard(eventlet.getcurrent())
269 def send_exception(self, *args):
270 # the arguments are the same as for greenlet.throw
271 return self.send(exc=args)
273 def _do_switch(self):
275 if self._waiters and self.items:
276 waiter = self._waiters.pop()
277 result, exc = self.items.popleft()
279 waiter.switch((result, exc))
281 traceback.print_exc()
282 elif self._senders and len(self.items) <= self.max_size:
283 sender = self._senders.pop()
287 traceback.print_exc()
293 result, exc = self.items.popleft()
294 if len(self.items) <= self.max_size:
295 hubs.get_hub().schedule_call_global(0, self._do_switch)
299 eventlet.getcurrent().throw(*exc)
302 hubs.get_hub().schedule_call_global(0, self._do_switch)
303 self._waiters.add(eventlet.getcurrent())
305 result, exc = hubs.get_hub().switch()
309 eventlet.getcurrent().throw(*exc)
311 self._waiters.discard(eventlet.getcurrent())
314 return len(self.items) > 0
317 return len(self.items) >= self.max_size
320 return max(0, len(self._waiters) - len(self.items))
323 def queue(max_size=None):
327 return Channel(max_size)