Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / hubs / kqueue.py
diff --git a/eventlet/eventlet/hubs/kqueue.py b/eventlet/eventlet/hubs/kqueue.py
new file mode 100644 (file)
index 0000000..9487a16
--- /dev/null
@@ -0,0 +1,114 @@
+import os
+import sys
+from eventlet import patcher
+from eventlet.support import six
+select = patcher.original('select')
+time = patcher.original('time')
+sleep = time.sleep
+
+from eventlet.support import clear_sys_exc_info
+from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
+
+
+if getattr(select, 'kqueue', None) is None:
+    raise ImportError('No kqueue implementation found in select module')
+
+
+FILTERS = {READ: select.KQ_FILTER_READ,
+           WRITE: select.KQ_FILTER_WRITE}
+
+
+class Hub(BaseHub):
+    MAX_EVENTS = 100
+
+    def __init__(self, clock=time.time):
+        super(Hub, self).__init__(clock)
+        self._events = {}
+        self._init_kqueue()
+
+    def _init_kqueue(self):
+        self.kqueue = select.kqueue()
+        self._pid = os.getpid()
+
+    def _reinit_kqueue(self):
+        self.kqueue.close()
+        self._init_kqueue()
+        kqueue = self.kqueue
+        events = [e for i in six.itervalues(self._events)
+                  for e in six.itervalues(i)]
+        kqueue.control(events, 0, 0)
+
+    def _control(self, events, max_events, timeout):
+        try:
+            return self.kqueue.control(events, max_events, timeout)
+        except (OSError, IOError):
+            # have we forked?
+            if os.getpid() != self._pid:
+                self._reinit_kqueue()
+                return self.kqueue.control(events, max_events, timeout)
+            raise
+
+    def add(self, evtype, fileno, cb, tb, mac):
+        listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
+        events = self._events.setdefault(fileno, {})
+        if evtype not in events:
+            try:
+                event = select.kevent(fileno, FILTERS.get(evtype), select.KQ_EV_ADD)
+                self._control([event], 0, 0)
+                events[evtype] = event
+            except ValueError:
+                super(Hub, self).remove(listener)
+                raise
+        return listener
+
+    def _delete_events(self, events):
+        del_events = [
+            select.kevent(e.ident, e.filter, select.KQ_EV_DELETE)
+            for e in events
+        ]
+        self._control(del_events, 0, 0)
+
+    def remove(self, listener):
+        super(Hub, self).remove(listener)
+        evtype = listener.evtype
+        fileno = listener.fileno
+        if not self.listeners[evtype].get(fileno):
+            event = self._events[fileno].pop(evtype)
+            try:
+                self._delete_events([event])
+            except OSError:
+                pass
+
+    def remove_descriptor(self, fileno):
+        super(Hub, self).remove_descriptor(fileno)
+        try:
+            events = self._events.pop(fileno).values()
+            self._delete_events(events)
+        except KeyError:
+            pass
+        except OSError:
+            pass
+
+    def wait(self, seconds=None):
+        readers = self.listeners[READ]
+        writers = self.listeners[WRITE]
+
+        if not readers and not writers:
+            if seconds:
+                sleep(seconds)
+            return
+        result = self._control([], self.MAX_EVENTS, seconds)
+        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
+        for event in result:
+            fileno = event.ident
+            evfilt = event.filter
+            try:
+                if evfilt == FILTERS[READ]:
+                    readers.get(fileno, noop).cb(fileno)
+                if evfilt == FILTERS[WRITE]:
+                    writers.get(fileno, noop).cb(fileno)
+            except SYSTEM_EXCEPTIONS:
+                raise
+            except:
+                self.squelch_exception(fileno, sys.exc_info())
+                clear_sys_exc_info()