4 from eventlet import patcher
5 select = patcher.original('select')
6 time = patcher.original('time')
9 from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
10 from eventlet.support import get_errno, clear_sys_exc_info
12 EXC_MASK = select.POLLERR | select.POLLHUP
13 READ_MASK = select.POLLIN | select.POLLPRI
14 WRITE_MASK = select.POLLOUT
18 def __init__(self, clock=time.time):
19 super(Hub, self).__init__(clock)
20 self.poll = select.poll()
21 # poll.modify is new to 2.6
23 self.modify = self.poll.modify
24 except AttributeError:
25 self.modify = self.poll.register
27 def add(self, evtype, fileno, cb, tb, mac):
28 listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
29 self.register(fileno, new=True)
32 def remove(self, listener):
33 super(Hub, self).remove(listener)
34 self.register(listener.fileno)
36 def register(self, fileno, new=False):
38 if self.listeners[READ].get(fileno):
39 mask |= READ_MASK | EXC_MASK
40 if self.listeners[WRITE].get(fileno):
41 mask |= WRITE_MASK | EXC_MASK
45 self.poll.register(fileno, mask)
48 self.modify(fileno, mask)
49 except (IOError, OSError):
50 self.poll.register(fileno, mask)
53 self.poll.unregister(fileno)
54 except (KeyError, IOError, OSError):
55 # raised if we try to remove a fileno that was
56 # already removed/invalid
59 # fileno is bad, issue 74
60 self.remove_descriptor(fileno)
63 def remove_descriptor(self, fileno):
64 super(Hub, self).remove_descriptor(fileno)
66 self.poll.unregister(fileno)
67 except (KeyError, ValueError, IOError, OSError):
68 # raised if we try to remove a fileno that was
69 # already removed/invalid
72 def do_poll(self, seconds):
73 # poll.poll expects integral milliseconds
74 return self.poll.poll(int(seconds * 1000.0))
76 def wait(self, seconds=None):
77 readers = self.listeners[READ]
78 writers = self.listeners[WRITE]
80 if not readers and not writers:
85 presult = self.do_poll(seconds)
86 except (IOError, select.error) as e:
87 if get_errno(e) == errno.EINTR:
90 SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
92 if self.debug_blocking:
93 self.block_detect_pre()
95 # Accumulate the listeners to call back to prior to
96 # triggering any of them. This is to keep the set
97 # of callbacks in sync with the events we've just
98 # polled for. It prevents one handler from invalidating
101 for fileno, event in presult:
102 if event & READ_MASK:
103 callbacks.add((readers.get(fileno, noop), fileno))
104 if event & WRITE_MASK:
105 callbacks.add((writers.get(fileno, noop), fileno))
106 if event & select.POLLNVAL:
107 self.remove_descriptor(fileno)
110 callbacks.add((readers.get(fileno, noop), fileno))
111 callbacks.add((writers.get(fileno, noop), fileno))
113 for listener, fileno in callbacks:
116 except SYSTEM_EXCEPTIONS:
119 self.squelch_exception(fileno, sys.exc_info())
122 if self.debug_blocking:
123 self.block_detect_post()