9 if hasattr(signal, 'setitimer'):
10 def alarm_itimer(seconds):
11 signal.setitimer(signal.ITIMER_REAL, seconds)
12 arm_alarm = alarm_itimer
16 arm_alarm = itimer.alarm
18 def alarm_signal(seconds):
19 signal.alarm(math.ceil(seconds))
20 arm_alarm = alarm_signal
22 from eventlet import patcher
23 from eventlet.hubs import timer, IOClosed
24 from eventlet.support import greenlets as greenlet, clear_sys_exc_info
25 time = patcher.original('time')
27 g_prevent_multiple_readers = True
33 def closed_callback(fileno):
34 """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait
40 class FdListener(object):
42 def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
43 """ The following are required:
44 cb - the standard callback, which will switch into the
45 listening greenlet to indicate that the event waited upon
47 tb - a 'throwback'. This is typically greenlet.throw, used
48 to raise a signal into the target greenlet indicating that
49 an event was obsoleted by its underlying filehandle being
51 mark_as_closed - if any listener is obsoleted, this is called
52 (in the context of some other client greenlet) to alert
53 underlying filehandle-wrapping objects that they've been
56 assert (evtype is READ or evtype is WRITE)
61 self.mark_as_closed = mark_as_closed
63 self.greenlet = greenlet.getcurrent()
66 return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
71 self.cb = closed_callback
72 if self.mark_as_closed is not None:
77 noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
80 # in debug mode, track the call site that created the listener
83 class DebugListener(FdListener):
85 def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
86 self.where_called = traceback.format_stack()
87 self.greenlet = greenlet.getcurrent()
88 super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed)
91 return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % (
98 ''.join(self.where_called))
102 def alarm_handler(signum, frame):
104 raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame)))
107 class BaseHub(object):
108 """ Base hub class for easing the implementation of subclasses that are
109 specific to a particular underlying event architecture. """
111 SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
116 def __init__(self, clock=time.time):
117 self.listeners = {READ: {}, WRITE: {}}
118 self.secondaries = {READ: {}, WRITE: {}}
122 self.greenlet = greenlet.greenlet(self.run)
123 self.stopping = False
126 self.next_timers = []
127 self.lclass = FdListener
128 self.timers_canceled = 0
129 self.debug_exceptions = True
130 self.debug_blocking = False
131 self.debug_blocking_resolution = 1
133 def block_detect_pre(self):
134 # shortest alarm we can possibly raise is one second
135 tmp = signal.signal(signal.SIGALRM, alarm_handler)
136 if tmp != alarm_handler:
137 self._old_signal_handler = tmp
139 arm_alarm(self.debug_blocking_resolution)
141 def block_detect_post(self):
142 if (hasattr(self, "_old_signal_handler") and
143 self._old_signal_handler):
144 signal.signal(signal.SIGALRM, self._old_signal_handler)
147 def add(self, evtype, fileno, cb, tb, mark_as_closed):
148 """ Signals an intent to or write a particular file descriptor.
150 The *evtype* argument is either the constant READ or WRITE.
152 The *fileno* argument is the file number of the file of interest.
154 The *cb* argument is the callback which will be called when the file
155 is ready for reading/writing.
157 The *tb* argument is the throwback used to signal (into the greenlet)
158 that the file was closed.
160 The *mark_as_closed* is used in the context of the event hub to
161 prepare a Python object as being closed, pre-empting further
162 close operations from accidentally shutting down the wrong OS thread.
164 listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
165 bucket = self.listeners[evtype]
167 if g_prevent_multiple_readers:
169 "Second simultaneous %s on fileno %s "
170 "detected. Unless you really know what you're doing, "
171 "make sure that only one greenthread can %s any "
172 "particular socket. Consider using a pools.Pool. "
173 "If you do know what you're doing and want to disable "
175 "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; "
177 evtype, fileno, evtype, cb, bucket[fileno]))
178 # store off the second listener in another structure
179 self.secondaries[evtype].setdefault(fileno, []).append(listener)
181 bucket[fileno] = listener
184 def _obsolete(self, fileno):
185 """ We've received an indication that 'fileno' has been obsoleted.
186 Any current listeners must be defanged, and notifications to
187 their greenlets queued up to send.
190 for evtype, bucket in self.secondaries.items():
192 for listener in bucket[fileno]:
194 self.closed.append(listener)
198 # For the primary listeners, we actually need to call remove,
199 # which may modify the underlying OS polling objects.
200 for evtype, bucket in self.listeners.items():
202 listener = bucket[fileno]
204 self.closed.append(listener)
205 self.remove(listener)
210 def notify_close(self, fileno):
211 """ We might want to do something when a fileno is closed.
212 However, currently it suffices to obsolete listeners only
213 when we detect an old fileno being recycled, on open.
217 def remove(self, listener):
219 # trampoline may trigger this in its finally section.
222 fileno = listener.fileno
223 evtype = listener.evtype
224 self.listeners[evtype].pop(fileno, None)
225 # migrate a secondary listener to be the primary listener
226 if fileno in self.secondaries[evtype]:
227 sec = self.secondaries[evtype].get(fileno, None)
230 self.listeners[evtype][fileno] = sec.pop(0)
232 del self.secondaries[evtype][fileno]
234 def mark_as_reopened(self, fileno):
235 """ If a file descriptor is returned by the OS as the result of some
236 open call (or equivalent), that signals that it might be being
239 Catch the case where the fd was previously in use.
241 self._obsolete(fileno)
243 def remove_descriptor(self, fileno):
244 """ Completely remove all listeners for this fileno. For internal use
247 listeners.append(self.listeners[READ].pop(fileno, noop))
248 listeners.append(self.listeners[WRITE].pop(fileno, noop))
249 listeners.extend(self.secondaries[READ].pop(fileno, ()))
250 listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
251 for listener in listeners:
255 self.squelch_generic_exception(sys.exc_info())
258 """ Triggered from the main run loop. If a listener's underlying FD was
259 closed somehow, throw an exception back to the trampoline, which should
260 be able to manage it appropriately.
262 listener = self.closed.pop()
263 if not listener.greenlet.dead:
264 # There's no point signalling a greenlet that's already dead.
265 listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
267 def ensure_greenlet(self):
268 if self.greenlet.dead:
269 # create new greenlet sharing same parent as original
270 new = greenlet.greenlet(self.run, self.greenlet.parent)
271 # need to assign as parent of old greenlet
272 # for those greenlets that are currently
273 # children of the dead hub and may subsequently
274 # exit without further switching to hub.
275 self.greenlet.parent = new
279 cur = greenlet.getcurrent()
280 assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
281 switch_out = getattr(cur, 'switch_out', None)
282 if switch_out is not None:
286 self.squelch_generic_exception(sys.exc_info())
287 self.ensure_greenlet()
289 if self.greenlet.parent is not cur:
290 cur.parent = self.greenlet
292 pass # gets raised if there is a greenlet parent cycle
294 return self.greenlet.switch()
296 def squelch_exception(self, fileno, exc_info):
297 traceback.print_exception(*exc_info)
298 sys.stderr.write("Removing descriptor: %r\n" % (fileno,))
301 self.remove_descriptor(fileno)
302 except Exception as e:
303 sys.stderr.write("Exception while removing descriptor! %r\n" % (e,))
306 def wait(self, seconds=None):
307 raise NotImplementedError("Implement this in a subclass")
309 def default_sleep(self):
312 def sleep_until(self):
318 def run(self, *a, **kw):
319 """Run the runloop until abort is called.
321 # accept and discard variable arguments because they will be
322 # supplied if other greenlets have run and exited before the
323 # hub's greenlet gets a chance to run
325 raise RuntimeError("Already running!")
328 self.stopping = False
329 while not self.stopping:
331 # We ditch all of these first.
333 self.prepare_timers()
334 if self.debug_blocking:
335 self.block_detect_pre()
336 self.fire_timers(self.clock())
337 if self.debug_blocking:
338 self.block_detect_post()
339 self.prepare_timers()
340 wakeup_when = self.sleep_until()
341 if wakeup_when is None:
342 sleep_time = self.default_sleep()
344 sleep_time = wakeup_when - self.clock()
346 self.wait(sleep_time)
350 self.timers_canceled = 0
352 del self.next_timers[:]
355 self.stopping = False
357 def abort(self, wait=False):
358 """Stop the runloop. If run is executing, it will exit after
359 completing the next runloop iteration.
361 Set *wait* to True to cause abort to switch to the hub immediately and
362 wait until it's finished processing. Waiting for the hub will only
363 work from the main greenthread; all other greenthreads will become
369 assert self.greenlet is not greenlet.getcurrent(
370 ), "Can't abort with wait from inside the hub's greenlet."
371 # schedule an immediate timer just so the hub doesn't sleep
372 self.schedule_call_global(0, lambda: None)
373 # switch to it; when done the hub will switch back to its parent,
377 def squelch_generic_exception(self, exc_info):
378 if self.debug_exceptions:
379 traceback.print_exception(*exc_info)
383 def squelch_timer_exception(self, timer, exc_info):
384 if self.debug_exceptions:
385 traceback.print_exception(*exc_info)
389 def add_timer(self, timer):
390 scheduled_time = self.clock() + timer.seconds
391 self.next_timers.append((scheduled_time, timer))
392 return scheduled_time
394 def timer_canceled(self, timer):
395 self.timers_canceled += 1
396 len_timers = len(self.timers) + len(self.next_timers)
397 if len_timers > 1000 and len_timers / 2 <= self.timers_canceled:
398 self.timers_canceled = 0
399 self.timers = [t for t in self.timers if not t[1].called]
400 self.next_timers = [t for t in self.next_timers if not t[1].called]
401 heapq.heapify(self.timers)
403 def prepare_timers(self):
404 heappush = heapq.heappush
406 for item in self.next_timers:
408 self.timers_canceled -= 1
411 del self.next_timers[:]
413 def schedule_call_local(self, seconds, cb, *args, **kw):
414 """Schedule a callable to be called after 'seconds' seconds have
415 elapsed. Cancel the timer if greenlet has exited.
416 seconds: The number of seconds to wait.
417 cb: The callable to call after the given time.
418 *args: Arguments to pass to the callable when called.
419 **kw: Keyword arguments to pass to the callable when called.
421 t = timer.LocalTimer(seconds, cb, *args, **kw)
425 def schedule_call_global(self, seconds, cb, *args, **kw):
426 """Schedule a callable to be called after 'seconds' seconds have
427 elapsed. The timer will NOT be canceled if the current greenlet has
428 exited before the timer fires.
429 seconds: The number of seconds to wait.
430 cb: The callable to call after the given time.
431 *args: Arguments to pass to the callable when called.
432 **kw: Keyword arguments to pass to the callable when called.
434 t = timer.Timer(seconds, cb, *args, **kw)
438 def fire_timers(self, when):
440 heappop = heapq.heappop
455 self.timers_canceled -= 1
458 except self.SYSTEM_EXCEPTIONS:
461 self.squelch_timer_exception(timer, sys.exc_info())
466 def get_readers(self):
467 return self.listeners[READ].values()
469 def get_writers(self):
470 return self.listeners[WRITE].values()
472 def get_timers_count(hub):
473 return len(hub.timers) + len(hub.next_timers)
475 def set_debug_listeners(self, value):
477 self.lclass = DebugListener
479 self.lclass = FdListener
481 def set_timer_exceptions(self, value):
482 self.debug_exceptions = value