Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / hubs / kqueue.py
1 import os
2 import sys
3 from eventlet import patcher
4 from eventlet.support import six
5 select = patcher.original('select')
6 time = patcher.original('time')
7 sleep = time.sleep
8
9 from eventlet.support import clear_sys_exc_info
10 from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
11
12
13 if getattr(select, 'kqueue', None) is None:
14     raise ImportError('No kqueue implementation found in select module')
15
16
17 FILTERS = {READ: select.KQ_FILTER_READ,
18            WRITE: select.KQ_FILTER_WRITE}
19
20
21 class Hub(BaseHub):
22     MAX_EVENTS = 100
23
24     def __init__(self, clock=time.time):
25         super(Hub, self).__init__(clock)
26         self._events = {}
27         self._init_kqueue()
28
29     def _init_kqueue(self):
30         self.kqueue = select.kqueue()
31         self._pid = os.getpid()
32
33     def _reinit_kqueue(self):
34         self.kqueue.close()
35         self._init_kqueue()
36         kqueue = self.kqueue
37         events = [e for i in six.itervalues(self._events)
38                   for e in six.itervalues(i)]
39         kqueue.control(events, 0, 0)
40
41     def _control(self, events, max_events, timeout):
42         try:
43             return self.kqueue.control(events, max_events, timeout)
44         except (OSError, IOError):
45             # have we forked?
46             if os.getpid() != self._pid:
47                 self._reinit_kqueue()
48                 return self.kqueue.control(events, max_events, timeout)
49             raise
50
51     def add(self, evtype, fileno, cb, tb, mac):
52         listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
53         events = self._events.setdefault(fileno, {})
54         if evtype not in events:
55             try:
56                 event = select.kevent(fileno, FILTERS.get(evtype), select.KQ_EV_ADD)
57                 self._control([event], 0, 0)
58                 events[evtype] = event
59             except ValueError:
60                 super(Hub, self).remove(listener)
61                 raise
62         return listener
63
64     def _delete_events(self, events):
65         del_events = [
66             select.kevent(e.ident, e.filter, select.KQ_EV_DELETE)
67             for e in events
68         ]
69         self._control(del_events, 0, 0)
70
71     def remove(self, listener):
72         super(Hub, self).remove(listener)
73         evtype = listener.evtype
74         fileno = listener.fileno
75         if not self.listeners[evtype].get(fileno):
76             event = self._events[fileno].pop(evtype)
77             try:
78                 self._delete_events([event])
79             except OSError:
80                 pass
81
82     def remove_descriptor(self, fileno):
83         super(Hub, self).remove_descriptor(fileno)
84         try:
85             events = self._events.pop(fileno).values()
86             self._delete_events(events)
87         except KeyError:
88             pass
89         except OSError:
90             pass
91
92     def wait(self, seconds=None):
93         readers = self.listeners[READ]
94         writers = self.listeners[WRITE]
95
96         if not readers and not writers:
97             if seconds:
98                 sleep(seconds)
99             return
100         result = self._control([], self.MAX_EVENTS, seconds)
101         SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
102         for event in result:
103             fileno = event.ident
104             evfilt = event.filter
105             try:
106                 if evfilt == FILTERS[READ]:
107                     readers.get(fileno, noop).cb(fileno)
108                 if evfilt == FILTERS[WRITE]:
109                     writers.get(fileno, noop).cb(fileno)
110             except SYSTEM_EXCEPTIONS:
111                 raise
112             except:
113                 self.squelch_exception(fileno, sys.exc_info())
114                 clear_sys_exc_info()