Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / hubs / hub.py
1 import errno
2 import heapq
3 import math
4 import signal
5 import sys
6 import traceback
7
8 arm_alarm = None
9 if hasattr(signal, 'setitimer'):
10     def alarm_itimer(seconds):
11         signal.setitimer(signal.ITIMER_REAL, seconds)
12     arm_alarm = alarm_itimer
13 else:
14     try:
15         import itimer
16         arm_alarm = itimer.alarm
17     except ImportError:
18         def alarm_signal(seconds):
19             signal.alarm(math.ceil(seconds))
20         arm_alarm = alarm_signal
21
22 from eventlet import patcher
23 from eventlet.hubs import timer, IOClosed
24 from eventlet.support import greenlets as greenlet, clear_sys_exc_info
25 time = patcher.original('time')
26
27 g_prevent_multiple_readers = True
28
29 READ = "read"
30 WRITE = "write"
31
32
33 def closed_callback(fileno):
34     """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait
35     """
36     # No-op.
37     pass
38
39
40 class FdListener(object):
41
42     def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
43         """ The following are required:
44         cb - the standard callback, which will switch into the
45             listening greenlet to indicate that the event waited upon
46             is ready
47         tb - a 'throwback'. This is typically greenlet.throw, used
48             to raise a signal into the target greenlet indicating that
49             an event was obsoleted by its underlying filehandle being
50             repurposed.
51         mark_as_closed - if any listener is obsoleted, this is called
52             (in the context of some other client greenlet) to alert
53             underlying filehandle-wrapping objects that they've been
54             closed.
55         """
56         assert (evtype is READ or evtype is WRITE)
57         self.evtype = evtype
58         self.fileno = fileno
59         self.cb = cb
60         self.tb = tb
61         self.mark_as_closed = mark_as_closed
62         self.spent = False
63         self.greenlet = greenlet.getcurrent()
64
65     def __repr__(self):
66         return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
67                                        self.cb, self.tb)
68     __str__ = __repr__
69
70     def defang(self):
71         self.cb = closed_callback
72         if self.mark_as_closed is not None:
73             self.mark_as_closed()
74         self.spent = True
75
76
77 noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
78
79
80 # in debug mode, track the call site that created the listener
81
82
83 class DebugListener(FdListener):
84
85     def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
86         self.where_called = traceback.format_stack()
87         self.greenlet = greenlet.getcurrent()
88         super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed)
89
90     def __repr__(self):
91         return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % (
92             self.evtype,
93             self.fileno,
94             self.cb,
95             self.tb,
96             self.mark_as_closed,
97             self.greenlet,
98             ''.join(self.where_called))
99     __str__ = __repr__
100
101
102 def alarm_handler(signum, frame):
103     import inspect
104     raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame)))
105
106
107 class BaseHub(object):
108     """ Base hub class for easing the implementation of subclasses that are
109     specific to a particular underlying event architecture. """
110
111     SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
112
113     READ = READ
114     WRITE = WRITE
115
116     def __init__(self, clock=time.time):
117         self.listeners = {READ: {}, WRITE: {}}
118         self.secondaries = {READ: {}, WRITE: {}}
119         self.closed = []
120
121         self.clock = clock
122         self.greenlet = greenlet.greenlet(self.run)
123         self.stopping = False
124         self.running = False
125         self.timers = []
126         self.next_timers = []
127         self.lclass = FdListener
128         self.timers_canceled = 0
129         self.debug_exceptions = True
130         self.debug_blocking = False
131         self.debug_blocking_resolution = 1
132
133     def block_detect_pre(self):
134         # shortest alarm we can possibly raise is one second
135         tmp = signal.signal(signal.SIGALRM, alarm_handler)
136         if tmp != alarm_handler:
137             self._old_signal_handler = tmp
138
139         arm_alarm(self.debug_blocking_resolution)
140
141     def block_detect_post(self):
142         if (hasattr(self, "_old_signal_handler") and
143                 self._old_signal_handler):
144             signal.signal(signal.SIGALRM, self._old_signal_handler)
145         signal.alarm(0)
146
147     def add(self, evtype, fileno, cb, tb, mark_as_closed):
148         """ Signals an intent to or write a particular file descriptor.
149
150         The *evtype* argument is either the constant READ or WRITE.
151
152         The *fileno* argument is the file number of the file of interest.
153
154         The *cb* argument is the callback which will be called when the file
155         is ready for reading/writing.
156
157         The *tb* argument is the throwback used to signal (into the greenlet)
158         that the file was closed.
159
160         The *mark_as_closed* is used in the context of the event hub to
161         prepare a Python object as being closed, pre-empting further
162         close operations from accidentally shutting down the wrong OS thread.
163         """
164         listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
165         bucket = self.listeners[evtype]
166         if fileno in bucket:
167             if g_prevent_multiple_readers:
168                 raise RuntimeError(
169                     "Second simultaneous %s on fileno %s "
170                     "detected.  Unless you really know what you're doing, "
171                     "make sure that only one greenthread can %s any "
172                     "particular socket.  Consider using a pools.Pool. "
173                     "If you do know what you're doing and want to disable "
174                     "this error, call "
175                     "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; "
176                     "THAT THREAD=%s" % (
177                         evtype, fileno, evtype, cb, bucket[fileno]))
178             # store off the second listener in another structure
179             self.secondaries[evtype].setdefault(fileno, []).append(listener)
180         else:
181             bucket[fileno] = listener
182         return listener
183
184     def _obsolete(self, fileno):
185         """ We've received an indication that 'fileno' has been obsoleted.
186             Any current listeners must be defanged, and notifications to
187             their greenlets queued up to send.
188         """
189         found = False
190         for evtype, bucket in self.secondaries.items():
191             if fileno in bucket:
192                 for listener in bucket[fileno]:
193                     found = True
194                     self.closed.append(listener)
195                     listener.defang()
196                 del bucket[fileno]
197
198         # For the primary listeners, we actually need to call remove,
199         # which may modify the underlying OS polling objects.
200         for evtype, bucket in self.listeners.items():
201             if fileno in bucket:
202                 listener = bucket[fileno]
203                 found = True
204                 self.closed.append(listener)
205                 self.remove(listener)
206                 listener.defang()
207
208         return found
209
210     def notify_close(self, fileno):
211         """ We might want to do something when a fileno is closed.
212             However, currently it suffices to obsolete listeners only
213             when we detect an old fileno being recycled, on open.
214         """
215         pass
216
217     def remove(self, listener):
218         if listener.spent:
219             # trampoline may trigger this in its finally section.
220             return
221
222         fileno = listener.fileno
223         evtype = listener.evtype
224         self.listeners[evtype].pop(fileno, None)
225         # migrate a secondary listener to be the primary listener
226         if fileno in self.secondaries[evtype]:
227             sec = self.secondaries[evtype].get(fileno, None)
228             if not sec:
229                 return
230             self.listeners[evtype][fileno] = sec.pop(0)
231             if not sec:
232                 del self.secondaries[evtype][fileno]
233
234     def mark_as_reopened(self, fileno):
235         """ If a file descriptor is returned by the OS as the result of some
236             open call (or equivalent), that signals that it might be being
237             recycled.
238
239             Catch the case where the fd was previously in use.
240         """
241         self._obsolete(fileno)
242
243     def remove_descriptor(self, fileno):
244         """ Completely remove all listeners for this fileno.  For internal use
245         only."""
246         listeners = []
247         listeners.append(self.listeners[READ].pop(fileno, noop))
248         listeners.append(self.listeners[WRITE].pop(fileno, noop))
249         listeners.extend(self.secondaries[READ].pop(fileno, ()))
250         listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
251         for listener in listeners:
252             try:
253                 listener.cb(fileno)
254             except Exception:
255                 self.squelch_generic_exception(sys.exc_info())
256
257     def close_one(self):
258         """ Triggered from the main run loop. If a listener's underlying FD was
259             closed somehow, throw an exception back to the trampoline, which should
260             be able to manage it appropriately.
261         """
262         listener = self.closed.pop()
263         if not listener.greenlet.dead:
264             # There's no point signalling a greenlet that's already dead.
265             listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
266
267     def ensure_greenlet(self):
268         if self.greenlet.dead:
269             # create new greenlet sharing same parent as original
270             new = greenlet.greenlet(self.run, self.greenlet.parent)
271             # need to assign as parent of old greenlet
272             # for those greenlets that are currently
273             # children of the dead hub and may subsequently
274             # exit without further switching to hub.
275             self.greenlet.parent = new
276             self.greenlet = new
277
278     def switch(self):
279         cur = greenlet.getcurrent()
280         assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
281         switch_out = getattr(cur, 'switch_out', None)
282         if switch_out is not None:
283             try:
284                 switch_out()
285             except:
286                 self.squelch_generic_exception(sys.exc_info())
287         self.ensure_greenlet()
288         try:
289             if self.greenlet.parent is not cur:
290                 cur.parent = self.greenlet
291         except ValueError:
292             pass  # gets raised if there is a greenlet parent cycle
293         clear_sys_exc_info()
294         return self.greenlet.switch()
295
296     def squelch_exception(self, fileno, exc_info):
297         traceback.print_exception(*exc_info)
298         sys.stderr.write("Removing descriptor: %r\n" % (fileno,))
299         sys.stderr.flush()
300         try:
301             self.remove_descriptor(fileno)
302         except Exception as e:
303             sys.stderr.write("Exception while removing descriptor! %r\n" % (e,))
304             sys.stderr.flush()
305
306     def wait(self, seconds=None):
307         raise NotImplementedError("Implement this in a subclass")
308
309     def default_sleep(self):
310         return 60.0
311
312     def sleep_until(self):
313         t = self.timers
314         if not t:
315             return None
316         return t[0][0]
317
318     def run(self, *a, **kw):
319         """Run the runloop until abort is called.
320         """
321         # accept and discard variable arguments because they will be
322         # supplied if other greenlets have run and exited before the
323         # hub's greenlet gets a chance to run
324         if self.running:
325             raise RuntimeError("Already running!")
326         try:
327             self.running = True
328             self.stopping = False
329             while not self.stopping:
330                 while self.closed:
331                     # We ditch all of these first.
332                     self.close_one()
333                 self.prepare_timers()
334                 if self.debug_blocking:
335                     self.block_detect_pre()
336                 self.fire_timers(self.clock())
337                 if self.debug_blocking:
338                     self.block_detect_post()
339                 self.prepare_timers()
340                 wakeup_when = self.sleep_until()
341                 if wakeup_when is None:
342                     sleep_time = self.default_sleep()
343                 else:
344                     sleep_time = wakeup_when - self.clock()
345                 if sleep_time > 0:
346                     self.wait(sleep_time)
347                 else:
348                     self.wait(0)
349             else:
350                 self.timers_canceled = 0
351                 del self.timers[:]
352                 del self.next_timers[:]
353         finally:
354             self.running = False
355             self.stopping = False
356
357     def abort(self, wait=False):
358         """Stop the runloop. If run is executing, it will exit after
359         completing the next runloop iteration.
360
361         Set *wait* to True to cause abort to switch to the hub immediately and
362         wait until it's finished processing.  Waiting for the hub will only
363         work from the main greenthread; all other greenthreads will become
364         unreachable.
365         """
366         if self.running:
367             self.stopping = True
368         if wait:
369             assert self.greenlet is not greenlet.getcurrent(
370             ), "Can't abort with wait from inside the hub's greenlet."
371             # schedule an immediate timer just so the hub doesn't sleep
372             self.schedule_call_global(0, lambda: None)
373             # switch to it; when done the hub will switch back to its parent,
374             # the main greenlet
375             self.switch()
376
377     def squelch_generic_exception(self, exc_info):
378         if self.debug_exceptions:
379             traceback.print_exception(*exc_info)
380             sys.stderr.flush()
381             clear_sys_exc_info()
382
383     def squelch_timer_exception(self, timer, exc_info):
384         if self.debug_exceptions:
385             traceback.print_exception(*exc_info)
386             sys.stderr.flush()
387             clear_sys_exc_info()
388
389     def add_timer(self, timer):
390         scheduled_time = self.clock() + timer.seconds
391         self.next_timers.append((scheduled_time, timer))
392         return scheduled_time
393
394     def timer_canceled(self, timer):
395         self.timers_canceled += 1
396         len_timers = len(self.timers) + len(self.next_timers)
397         if len_timers > 1000 and len_timers / 2 <= self.timers_canceled:
398             self.timers_canceled = 0
399             self.timers = [t for t in self.timers if not t[1].called]
400             self.next_timers = [t for t in self.next_timers if not t[1].called]
401             heapq.heapify(self.timers)
402
403     def prepare_timers(self):
404         heappush = heapq.heappush
405         t = self.timers
406         for item in self.next_timers:
407             if item[1].called:
408                 self.timers_canceled -= 1
409             else:
410                 heappush(t, item)
411         del self.next_timers[:]
412
413     def schedule_call_local(self, seconds, cb, *args, **kw):
414         """Schedule a callable to be called after 'seconds' seconds have
415         elapsed. Cancel the timer if greenlet has exited.
416             seconds: The number of seconds to wait.
417             cb: The callable to call after the given time.
418             *args: Arguments to pass to the callable when called.
419             **kw: Keyword arguments to pass to the callable when called.
420         """
421         t = timer.LocalTimer(seconds, cb, *args, **kw)
422         self.add_timer(t)
423         return t
424
425     def schedule_call_global(self, seconds, cb, *args, **kw):
426         """Schedule a callable to be called after 'seconds' seconds have
427         elapsed. The timer will NOT be canceled if the current greenlet has
428         exited before the timer fires.
429             seconds: The number of seconds to wait.
430             cb: The callable to call after the given time.
431             *args: Arguments to pass to the callable when called.
432             **kw: Keyword arguments to pass to the callable when called.
433         """
434         t = timer.Timer(seconds, cb, *args, **kw)
435         self.add_timer(t)
436         return t
437
438     def fire_timers(self, when):
439         t = self.timers
440         heappop = heapq.heappop
441
442         while t:
443             next = t[0]
444
445             exp = next[0]
446             timer = next[1]
447
448             if when < exp:
449                 break
450
451             heappop(t)
452
453             try:
454                 if timer.called:
455                     self.timers_canceled -= 1
456                 else:
457                     timer()
458             except self.SYSTEM_EXCEPTIONS:
459                 raise
460             except:
461                 self.squelch_timer_exception(timer, sys.exc_info())
462                 clear_sys_exc_info()
463
464     # for debugging:
465
466     def get_readers(self):
467         return self.listeners[READ].values()
468
469     def get_writers(self):
470         return self.listeners[WRITE].values()
471
472     def get_timers_count(hub):
473         return len(hub.timers) + len(hub.next_timers)
474
475     def set_debug_listeners(self, value):
476         if value:
477             self.lclass = DebugListener
478         else:
479             self.lclass = FdListener
480
481     def set_timer_exceptions(self, value):
482         self.debug_exceptions = value