c4906c5f4207d99fd33dfd23f82789024458ad5b
[packages/trusty/python-eventlet.git] / eventlet / eventlet / hubs / hub.py
1 import errno
2 import heapq
3 import math
4 import signal
5 import sys
6 import traceback
7
8 arm_alarm = None
9 if hasattr(signal, 'setitimer'):
10     def alarm_itimer(seconds):
11         signal.setitimer(signal.ITIMER_REAL, seconds)
12     arm_alarm = alarm_itimer
13 else:
14     try:
15         import itimer
16         arm_alarm = itimer.alarm
17     except ImportError:
18         def alarm_signal(seconds):
19             signal.alarm(math.ceil(seconds))
20         arm_alarm = alarm_signal
21
22 from eventlet import patcher
23 from eventlet.hubs import timer, IOClosed
24 from eventlet.support import greenlets as greenlet, clear_sys_exc_info
25 time = patcher.original('time')
26
27 g_prevent_multiple_readers = True
28
29 READ = "read"
30 WRITE = "write"
31
32
33 def closed_callback(fileno):
34     """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait
35     """
36     # No-op.
37     pass
38
39
40 class FdListener(object):
41
42     def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
43         """ The following are required:
44         cb - the standard callback, which will switch into the
45             listening greenlet to indicate that the event waited upon
46             is ready
47         tb - a 'throwback'. This is typically greenlet.throw, used
48             to raise a signal into the target greenlet indicating that
49             an event was obsoleted by its underlying filehandle being
50             repurposed.
51         mark_as_closed - if any listener is obsoleted, this is called
52             (in the context of some other client greenlet) to alert
53             underlying filehandle-wrapping objects that they've been
54             closed.
55         """
56         assert (evtype is READ or evtype is WRITE)
57         self.evtype = evtype
58         self.fileno = fileno
59         self.cb = cb
60         self.tb = tb
61         self.mark_as_closed = mark_as_closed
62         self.spent = False
63         self.greenlet = greenlet.getcurrent()
64
65     def __repr__(self):
66         return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
67                                        self.cb, self.tb)
68     __str__ = __repr__
69
70     def defang(self):
71         self.cb = closed_callback
72         if self.mark_as_closed is not None:
73             self.mark_as_closed()
74         self.spent = True
75
76
77 noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
78
79
80 # in debug mode, track the call site that created the listener
81
82
83 class DebugListener(FdListener):
84
85     def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
86         self.where_called = traceback.format_stack()
87         self.greenlet = greenlet.getcurrent()
88         super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed)
89
90     def __repr__(self):
91         return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % (
92             self.evtype,
93             self.fileno,
94             self.cb,
95             self.tb,
96             self.mark_as_closed,
97             self.greenlet,
98             ''.join(self.where_called))
99     __str__ = __repr__
100
101
102 def alarm_handler(signum, frame):
103     import inspect
104     raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame)))
105
106
107 class BaseHub(object):
108     """ Base hub class for easing the implementation of subclasses that are
109     specific to a particular underlying event architecture. """
110
111     SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
112
113     READ = READ
114     WRITE = WRITE
115
116     def __init__(self, clock=time.time):
117         self.listeners = {READ: {}, WRITE: {}}
118         self.secondaries = {READ: {}, WRITE: {}}
119         self.closed = []
120
121         self.clock = clock
122         self.greenlet = greenlet.greenlet(self.run)
123         self.stopping = False
124         self.running = False
125         self.timers = []
126         self.next_timers = []
127         self.lclass = FdListener
128         self.timers_canceled = 0
129         self.debug_exceptions = True
130         self.debug_blocking = False
131         self.debug_blocking_resolution = 1
132
133     def block_detect_pre(self):
134         # shortest alarm we can possibly raise is one second
135         tmp = signal.signal(signal.SIGALRM, alarm_handler)
136         if tmp != alarm_handler:
137             self._old_signal_handler = tmp
138
139         arm_alarm(self.debug_blocking_resolution)
140
141     def block_detect_post(self):
142         if (hasattr(self, "_old_signal_handler") and
143                 self._old_signal_handler):
144             signal.signal(signal.SIGALRM, self._old_signal_handler)
145         signal.alarm(0)
146
147     def add(self, evtype, fileno, cb, tb, mark_as_closed):
148         """ Signals an intent to or write a particular file descriptor.
149
150         The *evtype* argument is either the constant READ or WRITE.
151
152         The *fileno* argument is the file number of the file of interest.
153
154         The *cb* argument is the callback which will be called when the file
155         is ready for reading/writing.
156
157         The *tb* argument is the throwback used to signal (into the greenlet)
158         that the file was closed.
159
160         The *mark_as_closed* is used in the context of the event hub to
161         prepare a Python object as being closed, pre-empting further
162         close operations from accidentally shutting down the wrong OS thread.
163         """
164         listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
165         bucket = self.listeners[evtype]
166         if fileno in bucket:
167             if g_prevent_multiple_readers:
168                 raise RuntimeError(
169                     "Second simultaneous %s on fileno %s "
170                     "detected.  Unless you really know what you're doing, "
171                     "make sure that only one greenthread can %s any "
172                     "particular socket.  Consider using a pools.Pool. "
173                     "If you do know what you're doing and want to disable "
174                     "this error, call "
175                     "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; THAT THREAD=%s" % (
176                     evtype, fileno, evtype, cb, bucket[fileno]))
177             # store off the second listener in another structure
178             self.secondaries[evtype].setdefault(fileno, []).append(listener)
179         else:
180             bucket[fileno] = listener
181         return listener
182
183     def _obsolete(self, fileno):
184         """ We've received an indication that 'fileno' has been obsoleted.
185             Any current listeners must be defanged, and notifications to
186             their greenlets queued up to send.
187         """
188         found = False
189         for evtype, bucket in self.secondaries.items():
190             if fileno in bucket:
191                 for listener in bucket[fileno]:
192                     found = True
193                     self.closed.append(listener)
194                     listener.defang()
195                 del bucket[fileno]
196
197         # For the primary listeners, we actually need to call remove,
198         # which may modify the underlying OS polling objects.
199         for evtype, bucket in self.listeners.items():
200             if fileno in bucket:
201                 listener = bucket[fileno]
202                 found = True
203                 self.closed.append(listener)
204                 self.remove(listener)
205                 listener.defang()
206
207         return found
208
209     def notify_close(self, fileno):
210         """ We might want to do something when a fileno is closed.
211             However, currently it suffices to obsolete listeners only
212             when we detect an old fileno being recycled, on open.
213         """
214         pass
215
216     def remove(self, listener):
217         if listener.spent:
218             # trampoline may trigger this in its finally section.
219             return
220
221         fileno = listener.fileno
222         evtype = listener.evtype
223         self.listeners[evtype].pop(fileno, None)
224         # migrate a secondary listener to be the primary listener
225         if fileno in self.secondaries[evtype]:
226             sec = self.secondaries[evtype].get(fileno, None)
227             if not sec:
228                 return
229             self.listeners[evtype][fileno] = sec.pop(0)
230             if not sec:
231                 del self.secondaries[evtype][fileno]
232
233     def mark_as_reopened(self, fileno):
234         """ If a file descriptor is returned by the OS as the result of some
235             open call (or equivalent), that signals that it might be being
236             recycled.
237
238             Catch the case where the fd was previously in use.
239         """
240         self._obsolete(fileno)
241
242     def remove_descriptor(self, fileno):
243         """ Completely remove all listeners for this fileno.  For internal use
244         only."""
245         listeners = []
246         listeners.append(self.listeners[READ].pop(fileno, noop))
247         listeners.append(self.listeners[WRITE].pop(fileno, noop))
248         listeners.extend(self.secondaries[READ].pop(fileno, ()))
249         listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
250         for listener in listeners:
251             try:
252                 listener.cb(fileno)
253             except Exception:
254                 self.squelch_generic_exception(sys.exc_info())
255
256     def close_one(self):
257         """ Triggered from the main run loop. If a listener's underlying FD was
258             closed somehow, throw an exception back to the trampoline, which should
259             be able to manage it appropriately.
260         """
261         listener = self.closed.pop()
262         if not listener.greenlet.dead:
263             # There's no point signalling a greenlet that's already dead.
264             listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
265
266     def ensure_greenlet(self):
267         if self.greenlet.dead:
268             # create new greenlet sharing same parent as original
269             new = greenlet.greenlet(self.run, self.greenlet.parent)
270             # need to assign as parent of old greenlet
271             # for those greenlets that are currently
272             # children of the dead hub and may subsequently
273             # exit without further switching to hub.
274             self.greenlet.parent = new
275             self.greenlet = new
276
277     def switch(self):
278         cur = greenlet.getcurrent()
279         assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
280         switch_out = getattr(cur, 'switch_out', None)
281         if switch_out is not None:
282             try:
283                 switch_out()
284             except:
285                 self.squelch_generic_exception(sys.exc_info())
286         self.ensure_greenlet()
287         try:
288             if self.greenlet.parent is not cur:
289                 cur.parent = self.greenlet
290         except ValueError:
291             pass  # gets raised if there is a greenlet parent cycle
292         clear_sys_exc_info()
293         return self.greenlet.switch()
294
295     def squelch_exception(self, fileno, exc_info):
296         traceback.print_exception(*exc_info)
297         sys.stderr.write("Removing descriptor: %r\n" % (fileno,))
298         sys.stderr.flush()
299         try:
300             self.remove_descriptor(fileno)
301         except Exception as e:
302             sys.stderr.write("Exception while removing descriptor! %r\n" % (e,))
303             sys.stderr.flush()
304
305     def wait(self, seconds=None):
306         raise NotImplementedError("Implement this in a subclass")
307
308     def default_sleep(self):
309         return 60.0
310
311     def sleep_until(self):
312         t = self.timers
313         if not t:
314             return None
315         return t[0][0]
316
317     def run(self, *a, **kw):
318         """Run the runloop until abort is called.
319         """
320         # accept and discard variable arguments because they will be
321         # supplied if other greenlets have run and exited before the
322         # hub's greenlet gets a chance to run
323         if self.running:
324             raise RuntimeError("Already running!")
325         try:
326             self.running = True
327             self.stopping = False
328             while not self.stopping:
329                 while self.closed:
330                     # We ditch all of these first.
331                     self.close_one()
332                 self.prepare_timers()
333                 if self.debug_blocking:
334                     self.block_detect_pre()
335                 self.fire_timers(self.clock())
336                 if self.debug_blocking:
337                     self.block_detect_post()
338                 self.prepare_timers()
339                 wakeup_when = self.sleep_until()
340                 if wakeup_when is None:
341                     sleep_time = self.default_sleep()
342                 else:
343                     sleep_time = wakeup_when - self.clock()
344                 if sleep_time > 0:
345                     self.wait(sleep_time)
346                 else:
347                     self.wait(0)
348             else:
349                 self.timers_canceled = 0
350                 del self.timers[:]
351                 del self.next_timers[:]
352         finally:
353             self.running = False
354             self.stopping = False
355
356     def abort(self, wait=False):
357         """Stop the runloop. If run is executing, it will exit after
358         completing the next runloop iteration.
359
360         Set *wait* to True to cause abort to switch to the hub immediately and
361         wait until it's finished processing.  Waiting for the hub will only
362         work from the main greenthread; all other greenthreads will become
363         unreachable.
364         """
365         if self.running:
366             self.stopping = True
367         if wait:
368             assert self.greenlet is not greenlet.getcurrent(), "Can't abort with wait from inside the hub's greenlet."
369             # schedule an immediate timer just so the hub doesn't sleep
370             self.schedule_call_global(0, lambda: None)
371             # switch to it; when done the hub will switch back to its parent,
372             # the main greenlet
373             self.switch()
374
375     def squelch_generic_exception(self, exc_info):
376         if self.debug_exceptions:
377             traceback.print_exception(*exc_info)
378             sys.stderr.flush()
379             clear_sys_exc_info()
380
381     def squelch_timer_exception(self, timer, exc_info):
382         if self.debug_exceptions:
383             traceback.print_exception(*exc_info)
384             sys.stderr.flush()
385             clear_sys_exc_info()
386
387     def add_timer(self, timer):
388         scheduled_time = self.clock() + timer.seconds
389         self.next_timers.append((scheduled_time, timer))
390         return scheduled_time
391
392     def timer_canceled(self, timer):
393         self.timers_canceled += 1
394         len_timers = len(self.timers) + len(self.next_timers)
395         if len_timers > 1000 and len_timers / 2 <= self.timers_canceled:
396             self.timers_canceled = 0
397             self.timers = [t for t in self.timers if not t[1].called]
398             self.next_timers = [t for t in self.next_timers if not t[1].called]
399             heapq.heapify(self.timers)
400
401     def prepare_timers(self):
402         heappush = heapq.heappush
403         t = self.timers
404         for item in self.next_timers:
405             if item[1].called:
406                 self.timers_canceled -= 1
407             else:
408                 heappush(t, item)
409         del self.next_timers[:]
410
411     def schedule_call_local(self, seconds, cb, *args, **kw):
412         """Schedule a callable to be called after 'seconds' seconds have
413         elapsed. Cancel the timer if greenlet has exited.
414             seconds: The number of seconds to wait.
415             cb: The callable to call after the given time.
416             *args: Arguments to pass to the callable when called.
417             **kw: Keyword arguments to pass to the callable when called.
418         """
419         t = timer.LocalTimer(seconds, cb, *args, **kw)
420         self.add_timer(t)
421         return t
422
423     def schedule_call_global(self, seconds, cb, *args, **kw):
424         """Schedule a callable to be called after 'seconds' seconds have
425         elapsed. The timer will NOT be canceled if the current greenlet has
426         exited before the timer fires.
427             seconds: The number of seconds to wait.
428             cb: The callable to call after the given time.
429             *args: Arguments to pass to the callable when called.
430             **kw: Keyword arguments to pass to the callable when called.
431         """
432         t = timer.Timer(seconds, cb, *args, **kw)
433         self.add_timer(t)
434         return t
435
436     def fire_timers(self, when):
437         t = self.timers
438         heappop = heapq.heappop
439
440         while t:
441             next = t[0]
442
443             exp = next[0]
444             timer = next[1]
445
446             if when < exp:
447                 break
448
449             heappop(t)
450
451             try:
452                 if timer.called:
453                     self.timers_canceled -= 1
454                 else:
455                     timer()
456             except self.SYSTEM_EXCEPTIONS:
457                 raise
458             except:
459                 self.squelch_timer_exception(timer, sys.exc_info())
460                 clear_sys_exc_info()
461
462     # for debugging:
463
464     def get_readers(self):
465         return self.listeners[READ].values()
466
467     def get_writers(self):
468         return self.listeners[WRITE].values()
469
470     def get_timers_count(hub):
471         return len(hub.timers) + len(hub.next_timers)
472
473     def set_debug_listeners(self, value):
474         if value:
475             self.lclass = DebugListener
476         else:
477             self.lclass = FdListener
478
479     def set_timer_exceptions(self, value):
480         self.debug_exceptions = value