X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Feventlet%2Fhubs%2Fhub.py;fp=eventlet%2Feventlet%2Fhubs%2Fhub.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=8dda018b50a580faeb1e0ab18d2f4192e927b36c;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/eventlet/hubs/hub.py b/eventlet/eventlet/hubs/hub.py deleted file mode 100644 index 8dda018..0000000 --- a/eventlet/eventlet/hubs/hub.py +++ /dev/null @@ -1,482 +0,0 @@ -import errno -import heapq -import math -import signal -import sys -import traceback - -arm_alarm = None -if hasattr(signal, 'setitimer'): - def alarm_itimer(seconds): - signal.setitimer(signal.ITIMER_REAL, seconds) - arm_alarm = alarm_itimer -else: - try: - import itimer - arm_alarm = itimer.alarm - except ImportError: - def alarm_signal(seconds): - signal.alarm(math.ceil(seconds)) - arm_alarm = alarm_signal - -from eventlet import patcher -from eventlet.hubs import timer, IOClosed -from eventlet.support import greenlets as greenlet, clear_sys_exc_info -time = patcher.original('time') - -g_prevent_multiple_readers = True - -READ = "read" -WRITE = "write" - - -def closed_callback(fileno): - """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait - """ - # No-op. - pass - - -class FdListener(object): - - def __init__(self, evtype, fileno, cb, tb, mark_as_closed): - """ The following are required: - cb - the standard callback, which will switch into the - listening greenlet to indicate that the event waited upon - is ready - tb - a 'throwback'. This is typically greenlet.throw, used - to raise a signal into the target greenlet indicating that - an event was obsoleted by its underlying filehandle being - repurposed. - mark_as_closed - if any listener is obsoleted, this is called - (in the context of some other client greenlet) to alert - underlying filehandle-wrapping objects that they've been - closed. - """ - assert (evtype is READ or evtype is WRITE) - self.evtype = evtype - self.fileno = fileno - self.cb = cb - self.tb = tb - self.mark_as_closed = mark_as_closed - self.spent = False - self.greenlet = greenlet.getcurrent() - - def __repr__(self): - return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, - self.cb, self.tb) - __str__ = __repr__ - - def defang(self): - self.cb = closed_callback - if self.mark_as_closed is not None: - self.mark_as_closed() - self.spent = True - - -noop = FdListener(READ, 0, lambda x: None, lambda x: None, None) - - -# in debug mode, track the call site that created the listener - - -class DebugListener(FdListener): - - def __init__(self, evtype, fileno, cb, tb, mark_as_closed): - self.where_called = traceback.format_stack() - self.greenlet = greenlet.getcurrent() - super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed) - - def __repr__(self): - return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % ( - self.evtype, - self.fileno, - self.cb, - self.tb, - self.mark_as_closed, - self.greenlet, - ''.join(self.where_called)) - __str__ = __repr__ - - -def alarm_handler(signum, frame): - import inspect - raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame))) - - -class BaseHub(object): - """ Base hub class for easing the implementation of subclasses that are - specific to a particular underlying event architecture. """ - - SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) - - READ = READ - WRITE = WRITE - - def __init__(self, clock=time.time): - self.listeners = {READ: {}, WRITE: {}} - self.secondaries = {READ: {}, WRITE: {}} - self.closed = [] - - self.clock = clock - self.greenlet = greenlet.greenlet(self.run) - self.stopping = False - self.running = False - self.timers = [] - self.next_timers = [] - self.lclass = FdListener - self.timers_canceled = 0 - self.debug_exceptions = True - self.debug_blocking = False - self.debug_blocking_resolution = 1 - - def block_detect_pre(self): - # shortest alarm we can possibly raise is one second - tmp = signal.signal(signal.SIGALRM, alarm_handler) - if tmp != alarm_handler: - self._old_signal_handler = tmp - - arm_alarm(self.debug_blocking_resolution) - - def block_detect_post(self): - if (hasattr(self, "_old_signal_handler") and - self._old_signal_handler): - signal.signal(signal.SIGALRM, self._old_signal_handler) - signal.alarm(0) - - def add(self, evtype, fileno, cb, tb, mark_as_closed): - """ Signals an intent to or write a particular file descriptor. - - The *evtype* argument is either the constant READ or WRITE. - - The *fileno* argument is the file number of the file of interest. - - The *cb* argument is the callback which will be called when the file - is ready for reading/writing. - - The *tb* argument is the throwback used to signal (into the greenlet) - that the file was closed. - - The *mark_as_closed* is used in the context of the event hub to - prepare a Python object as being closed, pre-empting further - close operations from accidentally shutting down the wrong OS thread. - """ - listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed) - bucket = self.listeners[evtype] - if fileno in bucket: - if g_prevent_multiple_readers: - raise RuntimeError( - "Second simultaneous %s on fileno %s " - "detected. Unless you really know what you're doing, " - "make sure that only one greenthread can %s any " - "particular socket. Consider using a pools.Pool. " - "If you do know what you're doing and want to disable " - "this error, call " - "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; " - "THAT THREAD=%s" % ( - evtype, fileno, evtype, cb, bucket[fileno])) - # store off the second listener in another structure - self.secondaries[evtype].setdefault(fileno, []).append(listener) - else: - bucket[fileno] = listener - return listener - - def _obsolete(self, fileno): - """ We've received an indication that 'fileno' has been obsoleted. - Any current listeners must be defanged, and notifications to - their greenlets queued up to send. - """ - found = False - for evtype, bucket in self.secondaries.items(): - if fileno in bucket: - for listener in bucket[fileno]: - found = True - self.closed.append(listener) - listener.defang() - del bucket[fileno] - - # For the primary listeners, we actually need to call remove, - # which may modify the underlying OS polling objects. - for evtype, bucket in self.listeners.items(): - if fileno in bucket: - listener = bucket[fileno] - found = True - self.closed.append(listener) - self.remove(listener) - listener.defang() - - return found - - def notify_close(self, fileno): - """ We might want to do something when a fileno is closed. - However, currently it suffices to obsolete listeners only - when we detect an old fileno being recycled, on open. - """ - pass - - def remove(self, listener): - if listener.spent: - # trampoline may trigger this in its finally section. - return - - fileno = listener.fileno - evtype = listener.evtype - self.listeners[evtype].pop(fileno, None) - # migrate a secondary listener to be the primary listener - if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, None) - if not sec: - return - self.listeners[evtype][fileno] = sec.pop(0) - if not sec: - del self.secondaries[evtype][fileno] - - def mark_as_reopened(self, fileno): - """ If a file descriptor is returned by the OS as the result of some - open call (or equivalent), that signals that it might be being - recycled. - - Catch the case where the fd was previously in use. - """ - self._obsolete(fileno) - - def remove_descriptor(self, fileno): - """ Completely remove all listeners for this fileno. For internal use - only.""" - listeners = [] - listeners.append(self.listeners[READ].pop(fileno, noop)) - listeners.append(self.listeners[WRITE].pop(fileno, noop)) - listeners.extend(self.secondaries[READ].pop(fileno, ())) - listeners.extend(self.secondaries[WRITE].pop(fileno, ())) - for listener in listeners: - try: - listener.cb(fileno) - except Exception: - self.squelch_generic_exception(sys.exc_info()) - - def close_one(self): - """ Triggered from the main run loop. If a listener's underlying FD was - closed somehow, throw an exception back to the trampoline, which should - be able to manage it appropriately. - """ - listener = self.closed.pop() - if not listener.greenlet.dead: - # There's no point signalling a greenlet that's already dead. - listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file")) - - def ensure_greenlet(self): - if self.greenlet.dead: - # create new greenlet sharing same parent as original - new = greenlet.greenlet(self.run, self.greenlet.parent) - # need to assign as parent of old greenlet - # for those greenlets that are currently - # children of the dead hub and may subsequently - # exit without further switching to hub. - self.greenlet.parent = new - self.greenlet = new - - def switch(self): - cur = greenlet.getcurrent() - assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' - switch_out = getattr(cur, 'switch_out', None) - if switch_out is not None: - try: - switch_out() - except: - self.squelch_generic_exception(sys.exc_info()) - self.ensure_greenlet() - try: - if self.greenlet.parent is not cur: - cur.parent = self.greenlet - except ValueError: - pass # gets raised if there is a greenlet parent cycle - clear_sys_exc_info() - return self.greenlet.switch() - - def squelch_exception(self, fileno, exc_info): - traceback.print_exception(*exc_info) - sys.stderr.write("Removing descriptor: %r\n" % (fileno,)) - sys.stderr.flush() - try: - self.remove_descriptor(fileno) - except Exception as e: - sys.stderr.write("Exception while removing descriptor! %r\n" % (e,)) - sys.stderr.flush() - - def wait(self, seconds=None): - raise NotImplementedError("Implement this in a subclass") - - def default_sleep(self): - return 60.0 - - def sleep_until(self): - t = self.timers - if not t: - return None - return t[0][0] - - def run(self, *a, **kw): - """Run the runloop until abort is called. - """ - # accept and discard variable arguments because they will be - # supplied if other greenlets have run and exited before the - # hub's greenlet gets a chance to run - if self.running: - raise RuntimeError("Already running!") - try: - self.running = True - self.stopping = False - while not self.stopping: - while self.closed: - # We ditch all of these first. - self.close_one() - self.prepare_timers() - if self.debug_blocking: - self.block_detect_pre() - self.fire_timers(self.clock()) - if self.debug_blocking: - self.block_detect_post() - self.prepare_timers() - wakeup_when = self.sleep_until() - if wakeup_when is None: - sleep_time = self.default_sleep() - else: - sleep_time = wakeup_when - self.clock() - if sleep_time > 0: - self.wait(sleep_time) - else: - self.wait(0) - else: - self.timers_canceled = 0 - del self.timers[:] - del self.next_timers[:] - finally: - self.running = False - self.stopping = False - - def abort(self, wait=False): - """Stop the runloop. If run is executing, it will exit after - completing the next runloop iteration. - - Set *wait* to True to cause abort to switch to the hub immediately and - wait until it's finished processing. Waiting for the hub will only - work from the main greenthread; all other greenthreads will become - unreachable. - """ - if self.running: - self.stopping = True - if wait: - assert self.greenlet is not greenlet.getcurrent( - ), "Can't abort with wait from inside the hub's greenlet." - # schedule an immediate timer just so the hub doesn't sleep - self.schedule_call_global(0, lambda: None) - # switch to it; when done the hub will switch back to its parent, - # the main greenlet - self.switch() - - def squelch_generic_exception(self, exc_info): - if self.debug_exceptions: - traceback.print_exception(*exc_info) - sys.stderr.flush() - clear_sys_exc_info() - - def squelch_timer_exception(self, timer, exc_info): - if self.debug_exceptions: - traceback.print_exception(*exc_info) - sys.stderr.flush() - clear_sys_exc_info() - - def add_timer(self, timer): - scheduled_time = self.clock() + timer.seconds - self.next_timers.append((scheduled_time, timer)) - return scheduled_time - - def timer_canceled(self, timer): - self.timers_canceled += 1 - len_timers = len(self.timers) + len(self.next_timers) - if len_timers > 1000 and len_timers / 2 <= self.timers_canceled: - self.timers_canceled = 0 - self.timers = [t for t in self.timers if not t[1].called] - self.next_timers = [t for t in self.next_timers if not t[1].called] - heapq.heapify(self.timers) - - def prepare_timers(self): - heappush = heapq.heappush - t = self.timers - for item in self.next_timers: - if item[1].called: - self.timers_canceled -= 1 - else: - heappush(t, item) - del self.next_timers[:] - - def schedule_call_local(self, seconds, cb, *args, **kw): - """Schedule a callable to be called after 'seconds' seconds have - elapsed. Cancel the timer if greenlet has exited. - seconds: The number of seconds to wait. - cb: The callable to call after the given time. - *args: Arguments to pass to the callable when called. - **kw: Keyword arguments to pass to the callable when called. - """ - t = timer.LocalTimer(seconds, cb, *args, **kw) - self.add_timer(t) - return t - - def schedule_call_global(self, seconds, cb, *args, **kw): - """Schedule a callable to be called after 'seconds' seconds have - elapsed. The timer will NOT be canceled if the current greenlet has - exited before the timer fires. - seconds: The number of seconds to wait. - cb: The callable to call after the given time. - *args: Arguments to pass to the callable when called. - **kw: Keyword arguments to pass to the callable when called. - """ - t = timer.Timer(seconds, cb, *args, **kw) - self.add_timer(t) - return t - - def fire_timers(self, when): - t = self.timers - heappop = heapq.heappop - - while t: - next = t[0] - - exp = next[0] - timer = next[1] - - if when < exp: - break - - heappop(t) - - try: - if timer.called: - self.timers_canceled -= 1 - else: - timer() - except self.SYSTEM_EXCEPTIONS: - raise - except: - self.squelch_timer_exception(timer, sys.exc_info()) - clear_sys_exc_info() - - # for debugging: - - def get_readers(self): - return self.listeners[READ].values() - - def get_writers(self): - return self.listeners[WRITE].values() - - def get_timers_count(hub): - return len(hub.timers) + len(hub.next_timers) - - def set_debug_listeners(self, value): - if value: - self.lclass = DebugListener - else: - self.lclass = FdListener - - def set_timer_exceptions(self, value): - self.debug_exceptions = value