Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / proc.py
1 """
2 This module provides means to spawn, kill and link coroutines. Linking means
3 subscribing to the coroutine's result, either in form of return value or
4 unhandled exception.
5
6 To create a linkable coroutine use spawn function provided by this module:
7
8     >>> def demofunc(x, y):
9     ...    return x / y
10     >>> p = spawn(demofunc, 6, 2)
11
12 The return value of :func:`spawn` is an instance of :class:`Proc` class that
13 you can "link":
14
15  * ``p.link(obj)`` - notify *obj* when the coroutine is finished
16
17 What "notify" means here depends on the type of *obj*: a callable is simply
18 called, an :class:`~eventlet.coros.Event` or a :class:`~eventlet.coros.queue`
19 is notified using ``send``/``send_exception`` methods and if *obj* is another
20 greenlet it's killed with :class:`LinkedExited` exception.
21
22 Here's an example:
23
24 >>> event = coros.Event()
25 >>> _ = p.link(event)
26 >>> event.wait()
27 3
28
29 Now, even though *p* is finished it's still possible to link it. In this
30 case the notification is performed immediatelly:
31
32 >>> try:
33 ...     p.link()
34 ... except LinkedCompleted:
35 ...     print('LinkedCompleted')
36 LinkedCompleted
37
38 (Without an argument, the link is created to the current greenlet)
39
40 There are also :meth:`~eventlet.proc.Source.link_value` and
41 :func:`link_exception` methods that only deliver a return value and an
42 unhandled exception respectively (plain :meth:`~eventlet.proc.Source.link`
43 delivers both).  Suppose we want to spawn a greenlet to do an important part of
44 the task; if it fails then there's no way to complete the task so the parent
45 must fail as well; :meth:`~eventlet.proc.Source.link_exception` is useful here:
46
47 >>> p = spawn(demofunc, 1, 0)
48 >>> _ = p.link_exception()
49 >>> try:
50 ...     api.sleep(1)
51 ... except LinkedFailed:
52 ...     print('LinkedFailed')
53 LinkedFailed
54
55 One application of linking is :func:`waitall` function: link to a bunch of
56 coroutines and wait for all them to complete. Such a function is provided by
57 this module.
58 """
59 import sys
60
61 from eventlet import api, coros, hubs
62 from eventlet.support import six
63
64 import warnings
65 warnings.warn(
66     "The proc module is deprecated!  Please use the greenthread "
67     "module, or any of the many other Eventlet cross-coroutine "
68     "primitives, instead.",
69     DeprecationWarning, stacklevel=2)
70
71 __all__ = ['LinkedExited',
72            'LinkedFailed',
73            'LinkedCompleted',
74            'LinkedKilled',
75            'ProcExit',
76            'Link',
77            'waitall',
78            'killall',
79            'Source',
80            'Proc',
81            'spawn',
82            'spawn_link',
83            'spawn_link_value',
84            'spawn_link_exception']
85
86
87 class LinkedExited(Exception):
88     """Raised when a linked proc exits"""
89     msg = "%r exited"
90
91     def __init__(self, name=None, msg=None):
92         self.name = name
93         if msg is None:
94             msg = self.msg % self.name
95         Exception.__init__(self, msg)
96
97
98 class LinkedCompleted(LinkedExited):
99     """Raised when a linked proc finishes the execution cleanly"""
100
101     msg = "%r completed successfully"
102
103
104 class LinkedFailed(LinkedExited):
105     """Raised when a linked proc dies because of unhandled exception"""
106     msg = "%r failed with %s"
107
108     def __init__(self, name, typ, value=None, tb=None):
109         msg = self.msg % (name, typ.__name__)
110         LinkedExited.__init__(self, name, msg)
111
112
113 class LinkedKilled(LinkedFailed):
114     """Raised when a linked proc dies because of unhandled GreenletExit
115     (i.e. it was killed)
116     """
117     msg = """%r was killed with %s"""
118
119
120 def getLinkedFailed(name, typ, value=None, tb=None):
121     if issubclass(typ, api.GreenletExit):
122         return LinkedKilled(name, typ, value, tb)
123     return LinkedFailed(name, typ, value, tb)
124
125
126 class ProcExit(api.GreenletExit):
127     """Raised when this proc is killed."""
128
129
130 class Link(object):
131     """
132     A link to a greenlet, triggered when the greenlet exits.
133     """
134
135     def __init__(self, listener):
136         self.listener = listener
137
138     def cancel(self):
139         self.listener = None
140
141     def __enter__(self):
142         pass
143
144     def __exit__(self, *args):
145         self.cancel()
146
147
148 class LinkToEvent(Link):
149
150     def __call__(self, source):
151         if self.listener is None:
152             return
153         if source.has_value():
154             self.listener.send(source.value)
155         else:
156             self.listener.send_exception(*source.exc_info())
157
158
159 class LinkToGreenlet(Link):
160
161     def __call__(self, source):
162         if source.has_value():
163             self.listener.throw(LinkedCompleted(source.name))
164         else:
165             self.listener.throw(getLinkedFailed(source.name, *source.exc_info()))
166
167
168 class LinkToCallable(Link):
169
170     def __call__(self, source):
171         self.listener(source)
172
173
174 def waitall(lst, trap_errors=False, queue=None):
175     if queue is None:
176         queue = coros.queue()
177     index = -1
178     for (index, linkable) in enumerate(lst):
179         linkable.link(decorate_send(queue, index))
180     len = index + 1
181     results = [None] * len
182     count = 0
183     while count < len:
184         try:
185             index, value = queue.wait()
186         except Exception:
187             if not trap_errors:
188                 raise
189         else:
190             results[index] = value
191         count += 1
192     return results
193
194
195 class decorate_send(object):
196
197     def __init__(self, event, tag):
198         self._event = event
199         self._tag = tag
200
201     def __repr__(self):
202         params = (type(self).__name__, self._tag, self._event)
203         return '<%s tag=%r event=%r>' % params
204
205     def __getattr__(self, name):
206         assert name != '_event'
207         return getattr(self._event, name)
208
209     def send(self, value):
210         self._event.send((self._tag, value))
211
212
213 def killall(procs, *throw_args, **kwargs):
214     if not throw_args:
215         throw_args = (ProcExit, )
216     wait = kwargs.pop('wait', False)
217     if kwargs:
218         raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys()))
219     for g in procs:
220         if not g.dead:
221             hubs.get_hub().schedule_call_global(0, g.throw, *throw_args)
222     if wait and api.getcurrent() is not hubs.get_hub().greenlet:
223         api.sleep(0)
224
225
226 class NotUsed(object):
227
228     def __str__(self):
229         return '<Source instance does not hold a value or an exception>'
230
231     __repr__ = __str__
232
233 _NOT_USED = NotUsed()
234
235
236 def spawn_greenlet(function, *args):
237     """Create a new greenlet that will run ``function(*args)``.
238     The current greenlet won't be unscheduled. Keyword arguments aren't
239     supported (limitation of greenlet), use :func:`spawn` to work around that.
240     """
241     g = api.Greenlet(function)
242     g.parent = hubs.get_hub().greenlet
243     hubs.get_hub().schedule_call_global(0, g.switch, *args)
244     return g
245
246
247 class Source(object):
248     """Maintain a set of links to the listeners. Delegate the sent value or
249     the exception to all of them.
250
251     To set up a link, use :meth:`link_value`, :meth:`link_exception` or
252     :meth:`link` method. The latter establishes both "value" and "exception"
253     link. It is possible to link to events, queues, greenlets and callables.
254
255     >>> source = Source()
256     >>> event = coros.Event()
257     >>> _ = source.link(event)
258
259     Once source's :meth:`send` or :meth:`send_exception` method is called, all
260     the listeners with the right type of link will be notified ("right type"
261     means that exceptions won't be delivered to "value" links and values won't
262     be delivered to "exception" links). Once link has been fired it is removed.
263
264     Notifying listeners is performed in the **mainloop** greenlet. Under the
265     hood notifying a link means executing a callback, see :class:`Link` class
266     for details. Notification *must not* attempt to switch to the hub, i.e.
267     call any blocking functions.
268
269     >>> source.send('hello')
270     >>> event.wait()
271     'hello'
272
273     Any error happened while sending will be logged as a regular unhandled
274     exception. This won't prevent other links from being fired.
275
276     There 3 kinds of listeners supported:
277
278      1. If *listener* is a greenlet (regardless if it's a raw greenlet or an
279         extension like :class:`Proc`), a subclass of :class:`LinkedExited`
280         exception is raised in it.
281
282      2. If *listener* is something with send/send_exception methods (event,
283         queue, :class:`Source` but not :class:`Proc`) the relevant method is
284         called.
285
286      3. If *listener* is a callable, it is called with 1 argument (the result)
287         for "value" links and with 3 arguments ``(typ, value, tb)`` for
288         "exception" links.
289     """
290
291     def __init__(self, name=None):
292         self.name = name
293         self._value_links = {}
294         self._exception_links = {}
295         self.value = _NOT_USED
296         self._exc = None
297
298     def _repr_helper(self):
299         result = []
300         result.append(repr(self.name))
301         if self.value is not _NOT_USED:
302             if self._exc is None:
303                 res = repr(self.value)
304                 if len(res) > 50:
305                     res = res[:50] + '...'
306                 result.append('result=%s' % res)
307             else:
308                 result.append('raised=%s' % (self._exc, ))
309         result.append('{%s:%s}' % (len(self._value_links), len(self._exception_links)))
310         return result
311
312     def __repr__(self):
313         klass = type(self).__name__
314         return '<%s at %s %s>' % (klass, hex(id(self)), ' '.join(self._repr_helper()))
315
316     def ready(self):
317         return self.value is not _NOT_USED
318
319     def has_value(self):
320         return self.value is not _NOT_USED and self._exc is None
321
322     def has_exception(self):
323         return self.value is not _NOT_USED and self._exc is not None
324
325     def exc_info(self):
326         if not self._exc:
327             return (None, None, None)
328         elif len(self._exc) == 3:
329             return self._exc
330         elif len(self._exc) == 1:
331             if isinstance(self._exc[0], type):
332                 return self._exc[0], None, None
333             else:
334                 return self._exc[0].__class__, self._exc[0], None
335         elif len(self._exc) == 2:
336             return self._exc[0], self._exc[1], None
337         else:
338             return self._exc
339
340     def link_value(self, listener=None, link=None):
341         if self.ready() and self._exc is not None:
342             return
343         if listener is None:
344             listener = api.getcurrent()
345         if link is None:
346             link = self.getLink(listener)
347         if self.ready() and listener is api.getcurrent():
348             link(self)
349         else:
350             self._value_links[listener] = link
351             if self.value is not _NOT_USED:
352                 self._start_send()
353         return link
354
355     def link_exception(self, listener=None, link=None):
356         if self.value is not _NOT_USED and self._exc is None:
357             return
358         if listener is None:
359             listener = api.getcurrent()
360         if link is None:
361             link = self.getLink(listener)
362         if self.ready() and listener is api.getcurrent():
363             link(self)
364         else:
365             self._exception_links[listener] = link
366             if self.value is not _NOT_USED:
367                 self._start_send_exception()
368         return link
369
370     def link(self, listener=None, link=None):
371         if listener is None:
372             listener = api.getcurrent()
373         if link is None:
374             link = self.getLink(listener)
375         if self.ready() and listener is api.getcurrent():
376             if self._exc is None:
377                 link(self)
378             else:
379                 link(self)
380         else:
381             self._value_links[listener] = link
382             self._exception_links[listener] = link
383             if self.value is not _NOT_USED:
384                 if self._exc is None:
385                     self._start_send()
386                 else:
387                     self._start_send_exception()
388         return link
389
390     def unlink(self, listener=None):
391         if listener is None:
392             listener = api.getcurrent()
393         self._value_links.pop(listener, None)
394         self._exception_links.pop(listener, None)
395
396     @staticmethod
397     def getLink(listener):
398         if hasattr(listener, 'throw'):
399             return LinkToGreenlet(listener)
400         if hasattr(listener, 'send'):
401             return LinkToEvent(listener)
402         elif hasattr(listener, '__call__'):
403             return LinkToCallable(listener)
404         else:
405             raise TypeError("Don't know how to link to %r" % (listener, ))
406
407     def send(self, value):
408         assert not self.ready(), "%s has been fired already" % self
409         self.value = value
410         self._exc = None
411         self._start_send()
412
413     def _start_send(self):
414         links_items = list(six.iteritems(self._value_links))
415         hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._value_links)
416
417     def send_exception(self, *throw_args):
418         assert not self.ready(), "%s has been fired already" % self
419         self.value = None
420         self._exc = throw_args
421         self._start_send_exception()
422
423     def _start_send_exception(self):
424         links_items = list(six.iteritems(self._exception_links))
425         hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._exception_links)
426
427     def _do_send(self, links, consult):
428         while links:
429             listener, link = links.pop()
430             try:
431                 if listener in consult:
432                     try:
433                         link(self)
434                     finally:
435                         consult.pop(listener, None)
436             except:
437                 hubs.get_hub().schedule_call_global(0, self._do_send, links, consult)
438                 raise
439
440     def wait(self, timeout=None, *throw_args):
441         """Wait until :meth:`send` or :meth:`send_exception` is called or
442         *timeout* has expired. Return the argument of :meth:`send` or raise the
443         argument of :meth:`send_exception`. If *timeout* has expired, ``None``
444         is returned.
445
446         The arguments, when provided, specify how many seconds to wait and what
447         to do when *timeout* has expired. They are treated the same way as
448         :func:`~eventlet.api.timeout` treats them.
449         """
450         if self.value is not _NOT_USED:
451             if self._exc is None:
452                 return self.value
453             else:
454                 api.getcurrent().throw(*self._exc)
455         if timeout is not None:
456             timer = api.timeout(timeout, *throw_args)
457             timer.__enter__()
458             if timeout == 0:
459                 if timer.__exit__(None, None, None):
460                     return
461                 else:
462                     try:
463                         api.getcurrent().throw(*timer.throw_args)
464                     except:
465                         if not timer.__exit__(*sys.exc_info()):
466                             raise
467                     return
468             EXC = True
469         try:
470             try:
471                 waiter = Waiter()
472                 self.link(waiter)
473                 try:
474                     return waiter.wait()
475                 finally:
476                     self.unlink(waiter)
477             except:
478                 EXC = False
479                 if timeout is None or not timer.__exit__(*sys.exc_info()):
480                     raise
481         finally:
482             if timeout is not None and EXC:
483                 timer.__exit__(None, None, None)
484
485
486 class Waiter(object):
487
488     def __init__(self):
489         self.greenlet = None
490
491     def send(self, value):
492         """Wake up the greenlet that is calling wait() currently (if there is one).
493         Can only be called from get_hub().greenlet.
494         """
495         assert api.getcurrent() is hubs.get_hub().greenlet
496         if self.greenlet is not None:
497             self.greenlet.switch(value)
498
499     def send_exception(self, *throw_args):
500         """Make greenlet calling wait() wake up (if there is a wait()).
501         Can only be called from get_hub().greenlet.
502         """
503         assert api.getcurrent() is hubs.get_hub().greenlet
504         if self.greenlet is not None:
505             self.greenlet.throw(*throw_args)
506
507     def wait(self):
508         """Wait until send or send_exception is called. Return value passed
509         into send() or raise exception passed into send_exception().
510         """
511         assert self.greenlet is None
512         current = api.getcurrent()
513         assert current is not hubs.get_hub().greenlet
514         self.greenlet = current
515         try:
516             return hubs.get_hub().switch()
517         finally:
518             self.greenlet = None
519
520
521 class Proc(Source):
522     """A linkable coroutine based on Source.
523     Upon completion, delivers coroutine's result to the listeners.
524     """
525
526     def __init__(self, name=None):
527         self.greenlet = None
528         Source.__init__(self, name)
529
530     def _repr_helper(self):
531         if self.greenlet is not None and self.greenlet.dead:
532             dead = '(dead)'
533         else:
534             dead = ''
535         return ['%r%s' % (self.greenlet, dead)] + Source._repr_helper(self)
536
537     def __repr__(self):
538         klass = type(self).__name__
539         return '<%s %s>' % (klass, ' '.join(self._repr_helper()))
540
541     def __nonzero__(self):
542         if self.ready():
543             # with current _run this does not makes any difference
544             # still, let keep it there
545             return False
546         # otherwise bool(proc) is the same as bool(greenlet)
547         if self.greenlet is not None:
548             return bool(self.greenlet)
549
550     __bool__ = __nonzero__
551
552     @property
553     def dead(self):
554         return self.ready() or self.greenlet.dead
555
556     @classmethod
557     def spawn(cls, function, *args, **kwargs):
558         """Return a new :class:`Proc` instance that is scheduled to execute
559         ``function(*args, **kwargs)`` upon the next hub iteration.
560         """
561         proc = cls()
562         proc.run(function, *args, **kwargs)
563         return proc
564
565     def run(self, function, *args, **kwargs):
566         """Create a new greenlet to execute ``function(*args, **kwargs)``.
567         The created greenlet is scheduled to run upon the next hub iteration.
568         """
569         assert self.greenlet is None, "'run' can only be called once per instance"
570         if self.name is None:
571             self.name = str(function)
572         self.greenlet = spawn_greenlet(self._run, function, args, kwargs)
573
574     def _run(self, function, args, kwargs):
575         """Internal top level function.
576         Execute *function* and send its result to the listeners.
577         """
578         try:
579             result = function(*args, **kwargs)
580         except:
581             self.send_exception(*sys.exc_info())
582             raise  # let mainloop log the exception
583         else:
584             self.send(result)
585
586     def throw(self, *throw_args):
587         """Used internally to raise the exception.
588
589         Behaves exactly like greenlet's 'throw' with the exception that
590         :class:`ProcExit` is raised by default. Do not use this function as it
591         leaves the current greenlet unscheduled forever. Use :meth:`kill`
592         method instead.
593         """
594         if not self.dead:
595             if not throw_args:
596                 throw_args = (ProcExit, )
597             self.greenlet.throw(*throw_args)
598
599     def kill(self, *throw_args):
600         """
601         Raise an exception in the greenlet. Unschedule the current greenlet so
602         that this :class:`Proc` can handle the exception (or die).
603
604         The exception can be specified with *throw_args*. By default,
605         :class:`ProcExit` is raised.
606         """
607         if not self.dead:
608             if not throw_args:
609                 throw_args = (ProcExit, )
610             hubs.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args)
611             if api.getcurrent() is not hubs.get_hub().greenlet:
612                 api.sleep(0)
613
614     # QQQ maybe Proc should not inherit from Source (because its send() and send_exception()
615     # QQQ methods are for internal use only)
616
617
618 spawn = Proc.spawn
619
620
621 def spawn_link(function, *args, **kwargs):
622     p = spawn(function, *args, **kwargs)
623     p.link()
624     return p
625
626
627 def spawn_link_value(function, *args, **kwargs):
628     p = spawn(function, *args, **kwargs)
629     p.link_value()
630     return p
631
632
633 def spawn_link_exception(function, *args, **kwargs):
634     p = spawn(function, *args, **kwargs)
635     p.link_exception()
636     return p
637
638
639 class wrap_errors(object):
640     """Helper to make function return an exception, rather than raise it.
641
642     Because every exception that is unhandled by greenlet will be logged by the hub,
643     it is desirable to prevent non-error exceptions from leaving a greenlet.
644     This can done with simple try/except construct:
645
646     def func1(*args, **kwargs):
647         try:
648             return func(*args, **kwargs)
649         except (A, B, C) as ex:
650             return ex
651
652     wrap_errors provides a shortcut to write that in one line:
653
654     func1 = wrap_errors((A, B, C), func)
655
656     It also preserves __str__ and __repr__ of the original function.
657     """
658
659     def __init__(self, errors, func):
660         """Make a new function from `func', such that it catches `errors' (an
661         Exception subclass, or a tuple of Exception subclasses) and return
662         it as a value.
663         """
664         self.errors = errors
665         self.func = func
666
667     def __call__(self, *args, **kwargs):
668         try:
669             return self.func(*args, **kwargs)
670         except self.errors as ex:
671             return ex
672
673     def __str__(self):
674         return str(self.func)
675
676     def __repr__(self):
677         return repr(self.func)
678
679     def __getattr__(self, item):
680         return getattr(self.func, item)
681
682
683 class RunningProcSet(object):
684     """
685     Maintain a set of :class:`Proc` s that are still running, that is,
686     automatically remove a proc when it's finished. Provide a way to wait/kill
687     all of them
688     """
689
690     def __init__(self, *args):
691         self.procs = set(*args)
692         if args:
693             for p in self.args[0]:
694                 p.link(lambda p: self.procs.discard(p))
695
696     def __len__(self):
697         return len(self.procs)
698
699     def __contains__(self, item):
700         if isinstance(item, api.Greenlet):
701             # special case for "api.getcurrent() in running_proc_set" to work
702             for x in self.procs:
703                 if x.greenlet == item:
704                     return True
705         else:
706             return item in self.procs
707
708     def __iter__(self):
709         return iter(self.procs)
710
711     def add(self, p):
712         self.procs.add(p)
713         p.link(lambda p: self.procs.discard(p))
714
715     def spawn(self, func, *args, **kwargs):
716         p = spawn(func, *args, **kwargs)
717         self.add(p)
718         return p
719
720     def waitall(self, trap_errors=False):
721         while self.procs:
722             waitall(self.procs, trap_errors=trap_errors)
723
724     def killall(self, *throw_args, **kwargs):
725         return killall(self.procs, *throw_args, **kwargs)
726
727
728 class Pool(object):
729
730     linkable_class = Proc
731
732     def __init__(self, limit):
733         self.semaphore = coros.Semaphore(limit)
734
735     def allocate(self):
736         self.semaphore.acquire()
737         g = self.linkable_class()
738         g.link(lambda *_args: self.semaphore.release())
739         return g