9 if hasattr(signal, 'setitimer'):
10 def alarm_itimer(seconds):
11 signal.setitimer(signal.ITIMER_REAL, seconds)
12 arm_alarm = alarm_itimer
16 arm_alarm = itimer.alarm
18 def alarm_signal(seconds):
19 signal.alarm(math.ceil(seconds))
20 arm_alarm = alarm_signal
22 from eventlet import patcher
23 from eventlet.hubs import timer, IOClosed
24 from eventlet.support import greenlets as greenlet, clear_sys_exc_info
25 time = patcher.original('time')
27 g_prevent_multiple_readers = True
33 def closed_callback(fileno):
34 """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait
40 class FdListener(object):
42 def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
43 """ The following are required:
44 cb - the standard callback, which will switch into the
45 listening greenlet to indicate that the event waited upon
47 tb - a 'throwback'. This is typically greenlet.throw, used
48 to raise a signal into the target greenlet indicating that
49 an event was obsoleted by its underlying filehandle being
51 mark_as_closed - if any listener is obsoleted, this is called
52 (in the context of some other client greenlet) to alert
53 underlying filehandle-wrapping objects that they've been
56 assert (evtype is READ or evtype is WRITE)
61 self.mark_as_closed = mark_as_closed
63 self.greenlet = greenlet.getcurrent()
66 return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
71 self.cb = closed_callback
72 if self.mark_as_closed is not None:
77 noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
80 # in debug mode, track the call site that created the listener
83 class DebugListener(FdListener):
85 def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
86 self.where_called = traceback.format_stack()
87 self.greenlet = greenlet.getcurrent()
88 super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed)
91 return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % (
98 ''.join(self.where_called))
102 def alarm_handler(signum, frame):
104 raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame)))
107 class BaseHub(object):
108 """ Base hub class for easing the implementation of subclasses that are
109 specific to a particular underlying event architecture. """
111 SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
116 def __init__(self, clock=time.time):
117 self.listeners = {READ: {}, WRITE: {}}
118 self.secondaries = {READ: {}, WRITE: {}}
122 self.greenlet = greenlet.greenlet(self.run)
123 self.stopping = False
126 self.next_timers = []
127 self.lclass = FdListener
128 self.timers_canceled = 0
129 self.debug_exceptions = True
130 self.debug_blocking = False
131 self.debug_blocking_resolution = 1
133 def block_detect_pre(self):
134 # shortest alarm we can possibly raise is one second
135 tmp = signal.signal(signal.SIGALRM, alarm_handler)
136 if tmp != alarm_handler:
137 self._old_signal_handler = tmp
139 arm_alarm(self.debug_blocking_resolution)
141 def block_detect_post(self):
142 if (hasattr(self, "_old_signal_handler") and
143 self._old_signal_handler):
144 signal.signal(signal.SIGALRM, self._old_signal_handler)
147 def add(self, evtype, fileno, cb, tb, mark_as_closed):
148 """ Signals an intent to or write a particular file descriptor.
150 The *evtype* argument is either the constant READ or WRITE.
152 The *fileno* argument is the file number of the file of interest.
154 The *cb* argument is the callback which will be called when the file
155 is ready for reading/writing.
157 The *tb* argument is the throwback used to signal (into the greenlet)
158 that the file was closed.
160 The *mark_as_closed* is used in the context of the event hub to
161 prepare a Python object as being closed, pre-empting further
162 close operations from accidentally shutting down the wrong OS thread.
164 listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
165 bucket = self.listeners[evtype]
167 if g_prevent_multiple_readers:
169 "Second simultaneous %s on fileno %s "
170 "detected. Unless you really know what you're doing, "
171 "make sure that only one greenthread can %s any "
172 "particular socket. Consider using a pools.Pool. "
173 "If you do know what you're doing and want to disable "
175 "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; THAT THREAD=%s" % (
176 evtype, fileno, evtype, cb, bucket[fileno]))
177 # store off the second listener in another structure
178 self.secondaries[evtype].setdefault(fileno, []).append(listener)
180 bucket[fileno] = listener
183 def _obsolete(self, fileno):
184 """ We've received an indication that 'fileno' has been obsoleted.
185 Any current listeners must be defanged, and notifications to
186 their greenlets queued up to send.
189 for evtype, bucket in self.secondaries.items():
191 for listener in bucket[fileno]:
193 self.closed.append(listener)
197 # For the primary listeners, we actually need to call remove,
198 # which may modify the underlying OS polling objects.
199 for evtype, bucket in self.listeners.items():
201 listener = bucket[fileno]
203 self.closed.append(listener)
204 self.remove(listener)
209 def notify_close(self, fileno):
210 """ We might want to do something when a fileno is closed.
211 However, currently it suffices to obsolete listeners only
212 when we detect an old fileno being recycled, on open.
216 def remove(self, listener):
218 # trampoline may trigger this in its finally section.
221 fileno = listener.fileno
222 evtype = listener.evtype
223 self.listeners[evtype].pop(fileno, None)
224 # migrate a secondary listener to be the primary listener
225 if fileno in self.secondaries[evtype]:
226 sec = self.secondaries[evtype].get(fileno, None)
229 self.listeners[evtype][fileno] = sec.pop(0)
231 del self.secondaries[evtype][fileno]
233 def mark_as_reopened(self, fileno):
234 """ If a file descriptor is returned by the OS as the result of some
235 open call (or equivalent), that signals that it might be being
238 Catch the case where the fd was previously in use.
240 self._obsolete(fileno)
242 def remove_descriptor(self, fileno):
243 """ Completely remove all listeners for this fileno. For internal use
246 listeners.append(self.listeners[READ].pop(fileno, noop))
247 listeners.append(self.listeners[WRITE].pop(fileno, noop))
248 listeners.extend(self.secondaries[READ].pop(fileno, ()))
249 listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
250 for listener in listeners:
254 self.squelch_generic_exception(sys.exc_info())
257 """ Triggered from the main run loop. If a listener's underlying FD was
258 closed somehow, throw an exception back to the trampoline, which should
259 be able to manage it appropriately.
261 listener = self.closed.pop()
262 if not listener.greenlet.dead:
263 # There's no point signalling a greenlet that's already dead.
264 listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
266 def ensure_greenlet(self):
267 if self.greenlet.dead:
268 # create new greenlet sharing same parent as original
269 new = greenlet.greenlet(self.run, self.greenlet.parent)
270 # need to assign as parent of old greenlet
271 # for those greenlets that are currently
272 # children of the dead hub and may subsequently
273 # exit without further switching to hub.
274 self.greenlet.parent = new
278 cur = greenlet.getcurrent()
279 assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
280 switch_out = getattr(cur, 'switch_out', None)
281 if switch_out is not None:
285 self.squelch_generic_exception(sys.exc_info())
286 self.ensure_greenlet()
288 if self.greenlet.parent is not cur:
289 cur.parent = self.greenlet
291 pass # gets raised if there is a greenlet parent cycle
293 return self.greenlet.switch()
295 def squelch_exception(self, fileno, exc_info):
296 traceback.print_exception(*exc_info)
297 sys.stderr.write("Removing descriptor: %r\n" % (fileno,))
300 self.remove_descriptor(fileno)
301 except Exception as e:
302 sys.stderr.write("Exception while removing descriptor! %r\n" % (e,))
305 def wait(self, seconds=None):
306 raise NotImplementedError("Implement this in a subclass")
308 def default_sleep(self):
311 def sleep_until(self):
317 def run(self, *a, **kw):
318 """Run the runloop until abort is called.
320 # accept and discard variable arguments because they will be
321 # supplied if other greenlets have run and exited before the
322 # hub's greenlet gets a chance to run
324 raise RuntimeError("Already running!")
327 self.stopping = False
328 while not self.stopping:
330 # We ditch all of these first.
332 self.prepare_timers()
333 if self.debug_blocking:
334 self.block_detect_pre()
335 self.fire_timers(self.clock())
336 if self.debug_blocking:
337 self.block_detect_post()
338 self.prepare_timers()
339 wakeup_when = self.sleep_until()
340 if wakeup_when is None:
341 sleep_time = self.default_sleep()
343 sleep_time = wakeup_when - self.clock()
345 self.wait(sleep_time)
349 self.timers_canceled = 0
351 del self.next_timers[:]
354 self.stopping = False
356 def abort(self, wait=False):
357 """Stop the runloop. If run is executing, it will exit after
358 completing the next runloop iteration.
360 Set *wait* to True to cause abort to switch to the hub immediately and
361 wait until it's finished processing. Waiting for the hub will only
362 work from the main greenthread; all other greenthreads will become
368 assert self.greenlet is not greenlet.getcurrent(), "Can't abort with wait from inside the hub's greenlet."
369 # schedule an immediate timer just so the hub doesn't sleep
370 self.schedule_call_global(0, lambda: None)
371 # switch to it; when done the hub will switch back to its parent,
375 def squelch_generic_exception(self, exc_info):
376 if self.debug_exceptions:
377 traceback.print_exception(*exc_info)
381 def squelch_timer_exception(self, timer, exc_info):
382 if self.debug_exceptions:
383 traceback.print_exception(*exc_info)
387 def add_timer(self, timer):
388 scheduled_time = self.clock() + timer.seconds
389 self.next_timers.append((scheduled_time, timer))
390 return scheduled_time
392 def timer_canceled(self, timer):
393 self.timers_canceled += 1
394 len_timers = len(self.timers) + len(self.next_timers)
395 if len_timers > 1000 and len_timers / 2 <= self.timers_canceled:
396 self.timers_canceled = 0
397 self.timers = [t for t in self.timers if not t[1].called]
398 self.next_timers = [t for t in self.next_timers if not t[1].called]
399 heapq.heapify(self.timers)
401 def prepare_timers(self):
402 heappush = heapq.heappush
404 for item in self.next_timers:
406 self.timers_canceled -= 1
409 del self.next_timers[:]
411 def schedule_call_local(self, seconds, cb, *args, **kw):
412 """Schedule a callable to be called after 'seconds' seconds have
413 elapsed. Cancel the timer if greenlet has exited.
414 seconds: The number of seconds to wait.
415 cb: The callable to call after the given time.
416 *args: Arguments to pass to the callable when called.
417 **kw: Keyword arguments to pass to the callable when called.
419 t = timer.LocalTimer(seconds, cb, *args, **kw)
423 def schedule_call_global(self, seconds, cb, *args, **kw):
424 """Schedule a callable to be called after 'seconds' seconds have
425 elapsed. The timer will NOT be canceled if the current greenlet has
426 exited before the timer fires.
427 seconds: The number of seconds to wait.
428 cb: The callable to call after the given time.
429 *args: Arguments to pass to the callable when called.
430 **kw: Keyword arguments to pass to the callable when called.
432 t = timer.Timer(seconds, cb, *args, **kw)
436 def fire_timers(self, when):
438 heappop = heapq.heappop
453 self.timers_canceled -= 1
456 except self.SYSTEM_EXCEPTIONS:
459 self.squelch_timer_exception(timer, sys.exc_info())
464 def get_readers(self):
465 return self.listeners[READ].values()
467 def get_writers(self):
468 return self.listeners[WRITE].values()
470 def get_timers_count(hub):
471 return len(hub.timers) + len(hub.next_timers)
473 def set_debug_listeners(self, value):
475 self.lclass = DebugListener
477 self.lclass = FdListener
479 def set_timer_exceptions(self, value):
480 self.debug_exceptions = value