2 This module provides means to spawn, kill and link coroutines. Linking means
3 subscribing to the coroutine's result, either in form of return value or
6 To create a linkable coroutine use spawn function provided by this module:
8 >>> def demofunc(x, y):
10 >>> p = spawn(demofunc, 6, 2)
12 The return value of :func:`spawn` is an instance of :class:`Proc` class that
15 * ``p.link(obj)`` - notify *obj* when the coroutine is finished
17 What "notify" means here depends on the type of *obj*: a callable is simply
18 called, an :class:`~eventlet.coros.Event` or a :class:`~eventlet.coros.queue`
19 is notified using ``send``/``send_exception`` methods and if *obj* is another
20 greenlet it's killed with :class:`LinkedExited` exception.
24 >>> event = coros.Event()
29 Now, even though *p* is finished it's still possible to link it. In this
30 case the notification is performed immediatelly:
34 ... except LinkedCompleted:
35 ... print('LinkedCompleted')
38 (Without an argument, the link is created to the current greenlet)
40 There are also :meth:`~eventlet.proc.Source.link_value` and
41 :func:`link_exception` methods that only deliver a return value and an
42 unhandled exception respectively (plain :meth:`~eventlet.proc.Source.link`
43 delivers both). Suppose we want to spawn a greenlet to do an important part of
44 the task; if it fails then there's no way to complete the task so the parent
45 must fail as well; :meth:`~eventlet.proc.Source.link_exception` is useful here:
47 >>> p = spawn(demofunc, 1, 0)
48 >>> _ = p.link_exception()
51 ... except LinkedFailed:
52 ... print('LinkedFailed')
55 One application of linking is :func:`waitall` function: link to a bunch of
56 coroutines and wait for all them to complete. Such a function is provided by
61 from eventlet import api, coros, hubs
62 from eventlet.support import six
66 "The proc module is deprecated! Please use the greenthread "
67 "module, or any of the many other Eventlet cross-coroutine "
68 "primitives, instead.",
69 DeprecationWarning, stacklevel=2)
71 __all__ = ['LinkedExited',
84 'spawn_link_exception']
87 class LinkedExited(Exception):
88 """Raised when a linked proc exits"""
91 def __init__(self, name=None, msg=None):
94 msg = self.msg % self.name
95 Exception.__init__(self, msg)
98 class LinkedCompleted(LinkedExited):
99 """Raised when a linked proc finishes the execution cleanly"""
101 msg = "%r completed successfully"
104 class LinkedFailed(LinkedExited):
105 """Raised when a linked proc dies because of unhandled exception"""
106 msg = "%r failed with %s"
108 def __init__(self, name, typ, value=None, tb=None):
109 msg = self.msg % (name, typ.__name__)
110 LinkedExited.__init__(self, name, msg)
113 class LinkedKilled(LinkedFailed):
114 """Raised when a linked proc dies because of unhandled GreenletExit
117 msg = """%r was killed with %s"""
120 def getLinkedFailed(name, typ, value=None, tb=None):
121 if issubclass(typ, api.GreenletExit):
122 return LinkedKilled(name, typ, value, tb)
123 return LinkedFailed(name, typ, value, tb)
126 class ProcExit(api.GreenletExit):
127 """Raised when this proc is killed."""
132 A link to a greenlet, triggered when the greenlet exits.
135 def __init__(self, listener):
136 self.listener = listener
144 def __exit__(self, *args):
148 class LinkToEvent(Link):
150 def __call__(self, source):
151 if self.listener is None:
153 if source.has_value():
154 self.listener.send(source.value)
156 self.listener.send_exception(*source.exc_info())
159 class LinkToGreenlet(Link):
161 def __call__(self, source):
162 if source.has_value():
163 self.listener.throw(LinkedCompleted(source.name))
165 self.listener.throw(getLinkedFailed(source.name, *source.exc_info()))
168 class LinkToCallable(Link):
170 def __call__(self, source):
171 self.listener(source)
174 def waitall(lst, trap_errors=False, queue=None):
176 queue = coros.queue()
178 for (index, linkable) in enumerate(lst):
179 linkable.link(decorate_send(queue, index))
181 results = [None] * len
185 index, value = queue.wait()
190 results[index] = value
195 class decorate_send(object):
197 def __init__(self, event, tag):
202 params = (type(self).__name__, self._tag, self._event)
203 return '<%s tag=%r event=%r>' % params
205 def __getattr__(self, name):
206 assert name != '_event'
207 return getattr(self._event, name)
209 def send(self, value):
210 self._event.send((self._tag, value))
213 def killall(procs, *throw_args, **kwargs):
215 throw_args = (ProcExit, )
216 wait = kwargs.pop('wait', False)
218 raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys()))
221 hubs.get_hub().schedule_call_global(0, g.throw, *throw_args)
222 if wait and api.getcurrent() is not hubs.get_hub().greenlet:
226 class NotUsed(object):
229 return '<Source instance does not hold a value or an exception>'
233 _NOT_USED = NotUsed()
236 def spawn_greenlet(function, *args):
237 """Create a new greenlet that will run ``function(*args)``.
238 The current greenlet won't be unscheduled. Keyword arguments aren't
239 supported (limitation of greenlet), use :func:`spawn` to work around that.
241 g = api.Greenlet(function)
242 g.parent = hubs.get_hub().greenlet
243 hubs.get_hub().schedule_call_global(0, g.switch, *args)
247 class Source(object):
248 """Maintain a set of links to the listeners. Delegate the sent value or
249 the exception to all of them.
251 To set up a link, use :meth:`link_value`, :meth:`link_exception` or
252 :meth:`link` method. The latter establishes both "value" and "exception"
253 link. It is possible to link to events, queues, greenlets and callables.
255 >>> source = Source()
256 >>> event = coros.Event()
257 >>> _ = source.link(event)
259 Once source's :meth:`send` or :meth:`send_exception` method is called, all
260 the listeners with the right type of link will be notified ("right type"
261 means that exceptions won't be delivered to "value" links and values won't
262 be delivered to "exception" links). Once link has been fired it is removed.
264 Notifying listeners is performed in the **mainloop** greenlet. Under the
265 hood notifying a link means executing a callback, see :class:`Link` class
266 for details. Notification *must not* attempt to switch to the hub, i.e.
267 call any blocking functions.
269 >>> source.send('hello')
273 Any error happened while sending will be logged as a regular unhandled
274 exception. This won't prevent other links from being fired.
276 There 3 kinds of listeners supported:
278 1. If *listener* is a greenlet (regardless if it's a raw greenlet or an
279 extension like :class:`Proc`), a subclass of :class:`LinkedExited`
280 exception is raised in it.
282 2. If *listener* is something with send/send_exception methods (event,
283 queue, :class:`Source` but not :class:`Proc`) the relevant method is
286 3. If *listener* is a callable, it is called with 1 argument (the result)
287 for "value" links and with 3 arguments ``(typ, value, tb)`` for
291 def __init__(self, name=None):
293 self._value_links = {}
294 self._exception_links = {}
295 self.value = _NOT_USED
298 def _repr_helper(self):
300 result.append(repr(self.name))
301 if self.value is not _NOT_USED:
302 if self._exc is None:
303 res = repr(self.value)
305 res = res[:50] + '...'
306 result.append('result=%s' % res)
308 result.append('raised=%s' % (self._exc, ))
309 result.append('{%s:%s}' % (len(self._value_links), len(self._exception_links)))
313 klass = type(self).__name__
314 return '<%s at %s %s>' % (klass, hex(id(self)), ' '.join(self._repr_helper()))
317 return self.value is not _NOT_USED
320 return self.value is not _NOT_USED and self._exc is None
322 def has_exception(self):
323 return self.value is not _NOT_USED and self._exc is not None
327 return (None, None, None)
328 elif len(self._exc) == 3:
330 elif len(self._exc) == 1:
331 if isinstance(self._exc[0], type):
332 return self._exc[0], None, None
334 return self._exc[0].__class__, self._exc[0], None
335 elif len(self._exc) == 2:
336 return self._exc[0], self._exc[1], None
340 def link_value(self, listener=None, link=None):
341 if self.ready() and self._exc is not None:
344 listener = api.getcurrent()
346 link = self.getLink(listener)
347 if self.ready() and listener is api.getcurrent():
350 self._value_links[listener] = link
351 if self.value is not _NOT_USED:
355 def link_exception(self, listener=None, link=None):
356 if self.value is not _NOT_USED and self._exc is None:
359 listener = api.getcurrent()
361 link = self.getLink(listener)
362 if self.ready() and listener is api.getcurrent():
365 self._exception_links[listener] = link
366 if self.value is not _NOT_USED:
367 self._start_send_exception()
370 def link(self, listener=None, link=None):
372 listener = api.getcurrent()
374 link = self.getLink(listener)
375 if self.ready() and listener is api.getcurrent():
376 if self._exc is None:
381 self._value_links[listener] = link
382 self._exception_links[listener] = link
383 if self.value is not _NOT_USED:
384 if self._exc is None:
387 self._start_send_exception()
390 def unlink(self, listener=None):
392 listener = api.getcurrent()
393 self._value_links.pop(listener, None)
394 self._exception_links.pop(listener, None)
397 def getLink(listener):
398 if hasattr(listener, 'throw'):
399 return LinkToGreenlet(listener)
400 if hasattr(listener, 'send'):
401 return LinkToEvent(listener)
402 elif hasattr(listener, '__call__'):
403 return LinkToCallable(listener)
405 raise TypeError("Don't know how to link to %r" % (listener, ))
407 def send(self, value):
408 assert not self.ready(), "%s has been fired already" % self
413 def _start_send(self):
414 links_items = list(six.iteritems(self._value_links))
415 hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._value_links)
417 def send_exception(self, *throw_args):
418 assert not self.ready(), "%s has been fired already" % self
420 self._exc = throw_args
421 self._start_send_exception()
423 def _start_send_exception(self):
424 links_items = list(six.iteritems(self._exception_links))
425 hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._exception_links)
427 def _do_send(self, links, consult):
429 listener, link = links.pop()
431 if listener in consult:
435 consult.pop(listener, None)
437 hubs.get_hub().schedule_call_global(0, self._do_send, links, consult)
440 def wait(self, timeout=None, *throw_args):
441 """Wait until :meth:`send` or :meth:`send_exception` is called or
442 *timeout* has expired. Return the argument of :meth:`send` or raise the
443 argument of :meth:`send_exception`. If *timeout* has expired, ``None``
446 The arguments, when provided, specify how many seconds to wait and what
447 to do when *timeout* has expired. They are treated the same way as
448 :func:`~eventlet.api.timeout` treats them.
450 if self.value is not _NOT_USED:
451 if self._exc is None:
454 api.getcurrent().throw(*self._exc)
455 if timeout is not None:
456 timer = api.timeout(timeout, *throw_args)
459 if timer.__exit__(None, None, None):
463 api.getcurrent().throw(*timer.throw_args)
465 if not timer.__exit__(*sys.exc_info()):
479 if timeout is None or not timer.__exit__(*sys.exc_info()):
482 if timeout is not None and EXC:
483 timer.__exit__(None, None, None)
486 class Waiter(object):
491 def send(self, value):
492 """Wake up the greenlet that is calling wait() currently (if there is one).
493 Can only be called from get_hub().greenlet.
495 assert api.getcurrent() is hubs.get_hub().greenlet
496 if self.greenlet is not None:
497 self.greenlet.switch(value)
499 def send_exception(self, *throw_args):
500 """Make greenlet calling wait() wake up (if there is a wait()).
501 Can only be called from get_hub().greenlet.
503 assert api.getcurrent() is hubs.get_hub().greenlet
504 if self.greenlet is not None:
505 self.greenlet.throw(*throw_args)
508 """Wait until send or send_exception is called. Return value passed
509 into send() or raise exception passed into send_exception().
511 assert self.greenlet is None
512 current = api.getcurrent()
513 assert current is not hubs.get_hub().greenlet
514 self.greenlet = current
516 return hubs.get_hub().switch()
522 """A linkable coroutine based on Source.
523 Upon completion, delivers coroutine's result to the listeners.
526 def __init__(self, name=None):
528 Source.__init__(self, name)
530 def _repr_helper(self):
531 if self.greenlet is not None and self.greenlet.dead:
535 return ['%r%s' % (self.greenlet, dead)] + Source._repr_helper(self)
538 klass = type(self).__name__
539 return '<%s %s>' % (klass, ' '.join(self._repr_helper()))
541 def __nonzero__(self):
543 # with current _run this does not makes any difference
544 # still, let keep it there
546 # otherwise bool(proc) is the same as bool(greenlet)
547 if self.greenlet is not None:
548 return bool(self.greenlet)
550 __bool__ = __nonzero__
554 return self.ready() or self.greenlet.dead
557 def spawn(cls, function, *args, **kwargs):
558 """Return a new :class:`Proc` instance that is scheduled to execute
559 ``function(*args, **kwargs)`` upon the next hub iteration.
562 proc.run(function, *args, **kwargs)
565 def run(self, function, *args, **kwargs):
566 """Create a new greenlet to execute ``function(*args, **kwargs)``.
567 The created greenlet is scheduled to run upon the next hub iteration.
569 assert self.greenlet is None, "'run' can only be called once per instance"
570 if self.name is None:
571 self.name = str(function)
572 self.greenlet = spawn_greenlet(self._run, function, args, kwargs)
574 def _run(self, function, args, kwargs):
575 """Internal top level function.
576 Execute *function* and send its result to the listeners.
579 result = function(*args, **kwargs)
581 self.send_exception(*sys.exc_info())
582 raise # let mainloop log the exception
586 def throw(self, *throw_args):
587 """Used internally to raise the exception.
589 Behaves exactly like greenlet's 'throw' with the exception that
590 :class:`ProcExit` is raised by default. Do not use this function as it
591 leaves the current greenlet unscheduled forever. Use :meth:`kill`
596 throw_args = (ProcExit, )
597 self.greenlet.throw(*throw_args)
599 def kill(self, *throw_args):
601 Raise an exception in the greenlet. Unschedule the current greenlet so
602 that this :class:`Proc` can handle the exception (or die).
604 The exception can be specified with *throw_args*. By default,
605 :class:`ProcExit` is raised.
609 throw_args = (ProcExit, )
610 hubs.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args)
611 if api.getcurrent() is not hubs.get_hub().greenlet:
614 # QQQ maybe Proc should not inherit from Source (because its send() and send_exception()
615 # QQQ methods are for internal use only)
621 def spawn_link(function, *args, **kwargs):
622 p = spawn(function, *args, **kwargs)
627 def spawn_link_value(function, *args, **kwargs):
628 p = spawn(function, *args, **kwargs)
633 def spawn_link_exception(function, *args, **kwargs):
634 p = spawn(function, *args, **kwargs)
639 class wrap_errors(object):
640 """Helper to make function return an exception, rather than raise it.
642 Because every exception that is unhandled by greenlet will be logged by the hub,
643 it is desirable to prevent non-error exceptions from leaving a greenlet.
644 This can done with simple try/except construct:
646 def func1(*args, **kwargs):
648 return func(*args, **kwargs)
649 except (A, B, C) as ex:
652 wrap_errors provides a shortcut to write that in one line:
654 func1 = wrap_errors((A, B, C), func)
656 It also preserves __str__ and __repr__ of the original function.
659 def __init__(self, errors, func):
660 """Make a new function from `func', such that it catches `errors' (an
661 Exception subclass, or a tuple of Exception subclasses) and return
667 def __call__(self, *args, **kwargs):
669 return self.func(*args, **kwargs)
670 except self.errors as ex:
674 return str(self.func)
677 return repr(self.func)
679 def __getattr__(self, item):
680 return getattr(self.func, item)
683 class RunningProcSet(object):
685 Maintain a set of :class:`Proc` s that are still running, that is,
686 automatically remove a proc when it's finished. Provide a way to wait/kill
690 def __init__(self, *args):
691 self.procs = set(*args)
693 for p in self.args[0]:
694 p.link(lambda p: self.procs.discard(p))
697 return len(self.procs)
699 def __contains__(self, item):
700 if isinstance(item, api.Greenlet):
701 # special case for "api.getcurrent() in running_proc_set" to work
703 if x.greenlet == item:
706 return item in self.procs
709 return iter(self.procs)
713 p.link(lambda p: self.procs.discard(p))
715 def spawn(self, func, *args, **kwargs):
716 p = spawn(func, *args, **kwargs)
720 def waitall(self, trap_errors=False):
722 waitall(self.procs, trap_errors=trap_errors)
724 def killall(self, *throw_args, **kwargs):
725 return killall(self.procs, *throw_args, **kwargs)
730 linkable_class = Proc
732 def __init__(self, limit):
733 self.semaphore = coros.Semaphore(limit)
736 self.semaphore.acquire()
737 g = self.linkable_class()
738 g.link(lambda *_args: self.semaphore.release())