+++ /dev/null
-import os
-import sys
-from eventlet import patcher
-from eventlet.support import six
-select = patcher.original('select')
-time = patcher.original('time')
-sleep = time.sleep
-
-from eventlet.support import clear_sys_exc_info
-from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
-
-
-if getattr(select, 'kqueue', None) is None:
- raise ImportError('No kqueue implementation found in select module')
-
-
-FILTERS = {READ: select.KQ_FILTER_READ,
- WRITE: select.KQ_FILTER_WRITE}
-
-
-class Hub(BaseHub):
- MAX_EVENTS = 100
-
- def __init__(self, clock=time.time):
- super(Hub, self).__init__(clock)
- self._events = {}
- self._init_kqueue()
-
- def _init_kqueue(self):
- self.kqueue = select.kqueue()
- self._pid = os.getpid()
-
- def _reinit_kqueue(self):
- self.kqueue.close()
- self._init_kqueue()
- kqueue = self.kqueue
- events = [e for i in six.itervalues(self._events)
- for e in six.itervalues(i)]
- kqueue.control(events, 0, 0)
-
- def _control(self, events, max_events, timeout):
- try:
- return self.kqueue.control(events, max_events, timeout)
- except (OSError, IOError):
- # have we forked?
- if os.getpid() != self._pid:
- self._reinit_kqueue()
- return self.kqueue.control(events, max_events, timeout)
- raise
-
- def add(self, evtype, fileno, cb, tb, mac):
- listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
- events = self._events.setdefault(fileno, {})
- if evtype not in events:
- try:
- event = select.kevent(fileno, FILTERS.get(evtype), select.KQ_EV_ADD)
- self._control([event], 0, 0)
- events[evtype] = event
- except ValueError:
- super(Hub, self).remove(listener)
- raise
- return listener
-
- def _delete_events(self, events):
- del_events = [
- select.kevent(e.ident, e.filter, select.KQ_EV_DELETE)
- for e in events
- ]
- self._control(del_events, 0, 0)
-
- def remove(self, listener):
- super(Hub, self).remove(listener)
- evtype = listener.evtype
- fileno = listener.fileno
- if not self.listeners[evtype].get(fileno):
- event = self._events[fileno].pop(evtype)
- try:
- self._delete_events([event])
- except OSError:
- pass
-
- def remove_descriptor(self, fileno):
- super(Hub, self).remove_descriptor(fileno)
- try:
- events = self._events.pop(fileno).values()
- self._delete_events(events)
- except KeyError:
- pass
- except OSError:
- pass
-
- def wait(self, seconds=None):
- readers = self.listeners[READ]
- writers = self.listeners[WRITE]
-
- if not readers and not writers:
- if seconds:
- sleep(seconds)
- return
- result = self._control([], self.MAX_EVENTS, seconds)
- SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
- for event in result:
- fileno = event.ident
- evfilt = event.filter
- try:
- if evfilt == FILTERS[READ]:
- readers.get(fileno, noop).cb(fileno)
- if evfilt == FILTERS[WRITE]:
- writers.get(fileno, noop).cb(fileno)
- except SYSTEM_EXCEPTIONS:
- raise
- except:
- self.squelch_exception(fileno, sys.exc_info())
- clear_sys_exc_info()