3 from twisted.internet.base import DelayedCall as TwistedDelayedCall
4 from eventlet.support import greenlets as greenlet
5 from eventlet.hubs.hub import FdListener, READ, WRITE
8 class DelayedCall(TwistedDelayedCall):
9 "fix DelayedCall to behave like eventlet's Timer in some respects"
12 if self.cancelled or self.called:
15 return TwistedDelayedCall.cancel(self)
18 class LocalDelayedCall(DelayedCall):
20 def __init__(self, *args, **kwargs):
21 self.greenlet = greenlet.getcurrent()
22 DelayedCall.__init__(self, *args, **kwargs)
24 def _get_cancelled(self):
25 if self.greenlet is None or self.greenlet.dead:
27 return self.__dict__['cancelled']
29 def _set_cancelled(self, value):
30 self.__dict__['cancelled'] = value
32 cancelled = property(_get_cancelled, _set_cancelled)
35 def callLater(DelayedCallClass, reactor, _seconds, _f, *args, **kw):
36 # the same as original but creates fixed DelayedCall instance
37 assert callable(_f), "%s is not callable" % _f
38 if not isinstance(_seconds, (int, long, float)):
39 raise TypeError("Seconds must be int, long, or float, was " + type(_seconds))
40 assert sys.maxint >= _seconds >= 0, \
41 "%s is not greater than or equal to 0 seconds" % (_seconds,)
42 tple = DelayedCallClass(reactor.seconds() + _seconds, _f, args, kw,
43 reactor._cancelCallLater,
44 reactor._moveCallLaterSooner,
45 seconds=reactor.seconds)
46 reactor._newTimedCalls.append(tple)
50 class socket_rwdescriptor(FdListener):
51 # implements(IReadWriteDescriptor)
52 def __init__(self, evtype, fileno, cb):
53 super(socket_rwdescriptor, self).__init__(evtype, fileno, cb)
54 if not isinstance(fileno, (int, long)):
55 raise TypeError("Expected int or long, got %s" % type(fileno))
56 # Twisted expects fileno to be a callable, not an attribute
62 # required by glib2reactor
66 if self.evtype is READ:
70 if self.evtype == WRITE:
73 def connectionLost(self, reason):
74 self.disconnected = True
77 # trampoline() will now switch into the greenlet that owns the socket
78 # leaving the mainloop unscheduled. However, when the next switch
79 # to the mainloop occurs, twisted will not re-evaluate the delayed calls
80 # because it assumes that none were scheduled since no client code was executed
81 # (it has no idea it was switched away). So, we restart the mainloop.
82 # XXX this is not enough, pollreactor prints the traceback for
83 # this and epollreactor times out. see test__hub.TestCloseSocketWhilePolling
84 raise greenlet.GreenletExit
92 class BaseTwistedHub(object):
93 """This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub).
94 Instead, it assumes that the mainloop is run in the main greenlet.
96 This makes running "green" functions in the main greenlet impossible but is useful
97 when you want to call reactor.run() yourself.
100 # XXX: remove me from here. make functions that depend on reactor
102 uses_twisted_reactor = True
107 def __init__(self, mainloop_greenlet):
108 self.greenlet = mainloop_greenlet
111 assert greenlet.getcurrent() is not self.greenlet, \
112 "Cannot switch from MAINLOOP to MAINLOOP"
114 greenlet.getcurrent().parent = self.greenlet
117 return self.greenlet.switch()
120 from twisted.internet import reactor
123 def add(self, evtype, fileno, cb):
124 from twisted.internet import reactor
125 descriptor = socket_rwdescriptor(evtype, fileno, cb)
127 reactor.addReader(descriptor)
129 reactor.addWriter(descriptor)
132 def remove(self, descriptor):
133 from twisted.internet import reactor
134 reactor.removeReader(descriptor)
135 reactor.removeWriter(descriptor)
137 def schedule_call_local(self, seconds, func, *args, **kwargs):
138 from twisted.internet import reactor
140 def call_if_greenlet_alive(*args1, **kwargs1):
141 if timer.greenlet.dead:
143 return func(*args1, **kwargs1)
144 timer = callLater(LocalDelayedCall, reactor, seconds,
145 call_if_greenlet_alive, *args, **kwargs)
148 schedule_call = schedule_call_local
150 def schedule_call_global(self, seconds, func, *args, **kwargs):
151 from twisted.internet import reactor
152 return callLater(DelayedCall, reactor, seconds, func, *args, **kwargs)
155 from twisted.internet import reactor
160 from twisted.internet import reactor
161 return reactor.running
165 def get_readers(self):
166 from twisted.internet import reactor
167 readers = reactor.getReaders()
168 readers.remove(getattr(reactor, 'waker'))
171 def get_writers(self):
172 from twisted.internet import reactor
173 return reactor.getWriters()
175 def get_timers_count(self):
176 from twisted.internet import reactor
177 return len(reactor.getDelayedCalls())
180 class TwistedHub(BaseTwistedHub):
181 # wrapper around reactor that runs reactor's main loop in a separate greenlet.
182 # whenever you need to wait, i.e. inside a call that must appear
183 # blocking, call hub.switch() (then your blocking operation should switch back to you
186 # unlike other eventlet hubs, which are created per-thread,
187 # this one cannot be instantiated more than once, because
188 # twisted doesn't allow that
191 # 1-initialized but not started
196 installSignalHandlers = False
199 assert Hub.state == 0, ('%s hub can only be instantiated once' % type(self).__name__,
202 make_twisted_threadpool_daemonic() # otherwise the program
203 # would hang after the main
205 g = greenlet.greenlet(self.run)
206 BaseTwistedHub.__init__(self, g)
209 assert greenlet.getcurrent() is not self.greenlet, \
210 "Cannot switch from MAINLOOP to MAINLOOP"
211 if self.greenlet.dead:
212 self.greenlet = greenlet.greenlet(self.run)
214 greenlet.getcurrent().parent = self.greenlet
217 return self.greenlet.switch()
219 def run(self, installSignalHandlers=None):
220 if installSignalHandlers is None:
221 installSignalHandlers = self.installSignalHandlers
223 # main loop, executed in a dedicated greenlet
224 from twisted.internet import reactor
225 assert Hub.state in [1, 3], ('run function is not reentrant', Hub.state)
228 reactor.startRunning(installSignalHandlers=installSignalHandlers)
229 elif not reactor.running:
230 # if we're here, then reactor was explicitly stopped with reactor.stop()
231 # restarting reactor (like we would do after an exception) in this case
233 raise AssertionError("reactor is not running")
236 self.mainLoop(reactor)
238 # an exception in the mainLoop is a normal operation (e.g. user's
239 # signal handler could raise an exception). In this case we will re-enter
240 # the main loop at the next switch.
244 # clean exit here is needed for abort() method to work
245 # do not raise an exception here.
247 def mainLoop(self, reactor):
249 # Unlike reactor's mainLoop, this function does not catch exceptions.
250 # Anything raised goes into the main greenlet (because it is always the
251 # parent of this one)
252 while reactor.running:
253 # Advance simulation time in delayed event processors.
254 reactor.runUntilCurrent()
255 t2 = reactor.timeout()
256 t = reactor.running and t2
257 reactor.doIteration(t)
262 class DaemonicThread(threading.Thread):
263 def _set_daemon(self):
267 def make_twisted_threadpool_daemonic():
268 from twisted.python.threadpool import ThreadPool
269 if ThreadPool.threadFactory != DaemonicThread:
270 ThreadPool.threadFactory = DaemonicThread