]> review.fuel-infra Code Review - packages/trusty/python-eventlet.git/blob - eventlet/eventlet/hubs/twistedr.py
Adjust the package revision; no actual code changes
[packages/trusty/python-eventlet.git] / eventlet / eventlet / hubs / twistedr.py
1 import sys
2 import threading
3 from twisted.internet.base import DelayedCall as TwistedDelayedCall
4 from eventlet.support import greenlets as greenlet
5 from eventlet.hubs.hub import FdListener, READ, WRITE
6
7
8 class DelayedCall(TwistedDelayedCall):
9     "fix DelayedCall to behave like eventlet's Timer in some respects"
10
11     def cancel(self):
12         if self.cancelled or self.called:
13             self.cancelled = True
14             return
15         return TwistedDelayedCall.cancel(self)
16
17
18 class LocalDelayedCall(DelayedCall):
19
20     def __init__(self, *args, **kwargs):
21         self.greenlet = greenlet.getcurrent()
22         DelayedCall.__init__(self, *args, **kwargs)
23
24     def _get_cancelled(self):
25         if self.greenlet is None or self.greenlet.dead:
26             return True
27         return self.__dict__['cancelled']
28
29     def _set_cancelled(self, value):
30         self.__dict__['cancelled'] = value
31
32     cancelled = property(_get_cancelled, _set_cancelled)
33
34
35 def callLater(DelayedCallClass, reactor, _seconds, _f, *args, **kw):
36     # the same as original but creates fixed DelayedCall instance
37     assert callable(_f), "%s is not callable" % _f
38     if not isinstance(_seconds, (int, long, float)):
39         raise TypeError("Seconds must be int, long, or float, was " + type(_seconds))
40     assert sys.maxint >= _seconds >= 0, \
41            "%s is not greater than or equal to 0 seconds" % (_seconds,)
42     tple = DelayedCallClass(reactor.seconds() + _seconds, _f, args, kw,
43                             reactor._cancelCallLater,
44                             reactor._moveCallLaterSooner,
45                             seconds=reactor.seconds)
46     reactor._newTimedCalls.append(tple)
47     return tple
48
49
50 class socket_rwdescriptor(FdListener):
51     # implements(IReadWriteDescriptor)
52     def __init__(self, evtype, fileno, cb):
53         super(socket_rwdescriptor, self).__init__(evtype, fileno, cb)
54         if not isinstance(fileno, (int, long)):
55             raise TypeError("Expected int or long, got %s" % type(fileno))
56         # Twisted expects fileno to be a callable, not an attribute
57
58         def _fileno():
59             return fileno
60         self.fileno = _fileno
61
62     # required by glib2reactor
63     disconnected = False
64
65     def doRead(self):
66         if self.evtype is READ:
67             self.cb(self)
68
69     def doWrite(self):
70         if self.evtype == WRITE:
71             self.cb(self)
72
73     def connectionLost(self, reason):
74         self.disconnected = True
75         if self.cb:
76             self.cb(reason)
77         # trampoline() will now switch into the greenlet that owns the socket
78         # leaving the mainloop unscheduled. However, when the next switch
79         # to the mainloop occurs, twisted will not re-evaluate the delayed calls
80         # because it assumes that none were scheduled since no client code was executed
81         # (it has no idea it was switched away). So, we restart the mainloop.
82         # XXX this is not enough, pollreactor prints the traceback for
83         # this and epollreactor times out. see test__hub.TestCloseSocketWhilePolling
84         raise greenlet.GreenletExit
85
86     logstr = "twistedr"
87
88     def logPrefix(self):
89         return self.logstr
90
91
92 class BaseTwistedHub(object):
93     """This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub).
94     Instead, it assumes that the mainloop is run in the main greenlet.
95
96     This makes running "green" functions in the main greenlet impossible but is useful
97     when you want to call reactor.run() yourself.
98     """
99
100     # XXX: remove me from here. make functions that depend on reactor
101     # XXX: hub's methods
102     uses_twisted_reactor = True
103
104     WRITE = WRITE
105     READ = READ
106
107     def __init__(self, mainloop_greenlet):
108         self.greenlet = mainloop_greenlet
109
110     def switch(self):
111         assert greenlet.getcurrent() is not self.greenlet, \
112                "Cannot switch from MAINLOOP to MAINLOOP"
113         try:
114            greenlet.getcurrent().parent = self.greenlet
115         except ValueError:
116            pass
117         return self.greenlet.switch()
118
119     def stop(self):
120         from twisted.internet import reactor
121         reactor.stop()
122
123     def add(self, evtype, fileno, cb):
124         from twisted.internet import reactor
125         descriptor = socket_rwdescriptor(evtype, fileno, cb)
126         if evtype is READ:
127             reactor.addReader(descriptor)
128         if evtype is WRITE:
129             reactor.addWriter(descriptor)
130         return descriptor
131
132     def remove(self, descriptor):
133         from twisted.internet import reactor
134         reactor.removeReader(descriptor)
135         reactor.removeWriter(descriptor)
136
137     def schedule_call_local(self, seconds, func, *args, **kwargs):
138         from twisted.internet import reactor
139
140         def call_if_greenlet_alive(*args1, **kwargs1):
141             if timer.greenlet.dead:
142                 return
143             return func(*args1, **kwargs1)
144         timer = callLater(LocalDelayedCall, reactor, seconds,
145                           call_if_greenlet_alive, *args, **kwargs)
146         return timer
147
148     schedule_call = schedule_call_local
149
150     def schedule_call_global(self, seconds, func, *args, **kwargs):
151         from twisted.internet import reactor
152         return callLater(DelayedCall, reactor, seconds, func, *args, **kwargs)
153
154     def abort(self):
155         from twisted.internet import reactor
156         reactor.crash()
157
158     @property
159     def running(self):
160         from twisted.internet import reactor
161         return reactor.running
162
163     # for debugging:
164
165     def get_readers(self):
166         from twisted.internet import reactor
167         readers = reactor.getReaders()
168         readers.remove(getattr(reactor, 'waker'))
169         return readers
170
171     def get_writers(self):
172         from twisted.internet import reactor
173         return reactor.getWriters()
174
175     def get_timers_count(self):
176         from twisted.internet import reactor
177         return len(reactor.getDelayedCalls())
178
179
180 class TwistedHub(BaseTwistedHub):
181     # wrapper around reactor that runs reactor's main loop in a separate greenlet.
182     # whenever you need to wait, i.e. inside a call that must appear
183     # blocking, call hub.switch() (then your blocking operation should switch back to you
184     # upon completion)
185
186     # unlike other eventlet hubs, which are created per-thread,
187     # this one cannot be instantiated more than once, because
188     # twisted doesn't allow that
189
190     # 0-not created
191     # 1-initialized but not started
192     # 2-started
193     # 3-restarted
194     state = 0
195
196     installSignalHandlers = False
197
198     def __init__(self):
199         assert Hub.state == 0, ('%s hub can only be instantiated once' % type(self).__name__,
200                               Hub.state)
201         Hub.state = 1
202         make_twisted_threadpool_daemonic() # otherwise the program
203                                         # would hang after the main
204                                         # greenlet exited
205         g = greenlet.greenlet(self.run)
206         BaseTwistedHub.__init__(self, g)
207
208     def switch(self):
209         assert greenlet.getcurrent() is not self.greenlet, \
210                "Cannot switch from MAINLOOP to MAINLOOP"
211         if self.greenlet.dead:
212             self.greenlet = greenlet.greenlet(self.run)
213         try:
214             greenlet.getcurrent().parent = self.greenlet
215         except ValueError:
216             pass
217         return self.greenlet.switch()
218
219     def run(self, installSignalHandlers=None):
220         if installSignalHandlers is None:
221             installSignalHandlers = self.installSignalHandlers
222
223         # main loop, executed in a dedicated greenlet
224         from twisted.internet import reactor
225         assert Hub.state in [1, 3], ('run function is not reentrant', Hub.state)
226
227         if Hub.state == 1:
228             reactor.startRunning(installSignalHandlers=installSignalHandlers)
229         elif not reactor.running:
230             # if we're here, then reactor was explicitly stopped with reactor.stop()
231             # restarting reactor (like we would do after an exception) in this case
232             # is not an option.
233             raise AssertionError("reactor is not running")
234
235         try:
236             self.mainLoop(reactor)
237         except:
238             # an exception in the mainLoop is a normal operation (e.g. user's
239             # signal handler could raise an exception). In this case we will re-enter
240             # the main loop at the next switch.
241             Hub.state = 3
242             raise
243
244         # clean exit here is needed for abort() method to work
245         # do not raise an exception here.
246
247     def mainLoop(self, reactor):
248         Hub.state = 2
249         # Unlike reactor's mainLoop, this function does not catch exceptions.
250         # Anything raised goes into the main greenlet (because it is always the
251         # parent of this one)
252         while reactor.running:
253             # Advance simulation time in delayed event processors.
254             reactor.runUntilCurrent()
255             t2 = reactor.timeout()
256             t = reactor.running and t2
257             reactor.doIteration(t)
258
259 Hub = TwistedHub
260
261
262 class DaemonicThread(threading.Thread):
263     def _set_daemon(self):
264         return True
265
266
267 def make_twisted_threadpool_daemonic():
268     from twisted.python.threadpool import ThreadPool
269     if ThreadPool.threadFactory != DaemonicThread:
270         ThreadPool.threadFactory = DaemonicThread