1 """Basic twisted protocols converted to synchronous mode"""
3 from twisted.internet.protocol import Protocol as twistedProtocol
4 from twisted.internet.error import ConnectionDone
5 from twisted.internet.protocol import Factory, ClientFactory
6 from twisted.internet import main
7 from twisted.python import failure
9 from eventlet import greenthread
10 from eventlet import getcurrent
11 from eventlet.coros import Queue
12 from eventlet.event import Event as BaseEvent
15 class ValueQueue(Queue):
16 """Queue that keeps the last item forever in the queue if it's an exception.
17 Useful if you send an exception over queue only once, and once sent it must be always
21 def send(self, value=None, exc=None):
22 if exc is not None or not self.has_error():
23 Queue.send(self, value, exc)
26 """The difference from Queue.wait: if there is an only item in the
27 Queue and it is an exception, raise it, but keep it in the Queue, so
28 that future calls to wait() will raise it again.
30 if self.has_error() and len(self.items)==1:
31 # the last item, which is an exception, raise without emptying the Queue
32 getcurrent().throw(*self.items[0][1])
34 return Queue.wait(self)
37 return self.items and self.items[-1][1] is not None
40 class Event(BaseEvent):
42 def send(self, value, exc=None):
45 return BaseEvent.send(self, value, exc)
47 def send_exception(self, *throw_args):
50 return BaseEvent.send_exception(self, *throw_args)
52 class Producer2Event(object):
54 # implements IPullProducer
56 def __init__(self, event):
59 def resumeProducing(self):
62 def stopProducing(self):
66 class GreenTransportBase(object):
68 transportBufferSize = None
70 def __init__(self, transportBufferSize=None):
71 if transportBufferSize is not None:
72 self.transportBufferSize = transportBufferSize
73 self._queue = ValueQueue()
74 self._write_event = Event()
75 self._disconnected_event = Event()
77 def build_protocol(self):
78 return self.protocol_class(self)
80 def _got_transport(self, transport):
81 self._queue.send(transport)
83 def _got_data(self, data):
84 self._queue.send(data)
86 def _connectionLost(self, reason):
87 self._disconnected_event.send(reason.value)
88 self._queue.send_exception(reason.value)
89 self._write_event.send_exception(reason.value)
92 if self.disconnecting or self._disconnected_event.ready():
94 return self._queue.wait()
96 raise self._disconnected_event.wait()
97 self.resumeProducing()
99 return self._queue.wait()
101 self.pauseProducing()
103 def write(self, data, wait=True):
104 if self._disconnected_event.ready():
105 raise self._disconnected_event.wait()
107 self._write_event.reset()
108 self.transport.write(data)
109 self._write_event.wait()
111 self.transport.write(data)
113 def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
114 self.transport.unregisterProducer()
115 self.transport.loseConnection(connDone)
117 self._disconnected_event.wait()
119 def __getattr__(self, item):
120 if item=='transport':
121 raise AttributeError(item)
122 if hasattr(self, 'transport'):
124 return getattr(self.transport, item)
125 except AttributeError:
126 me = type(self).__name__
127 trans = type(self.transport).__name__
128 raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
130 raise AttributeError(item)
132 def resumeProducing(self):
135 self.transport.resumeProducing()
137 def pauseProducing(self):
140 self.transport.pauseProducing()
142 def _init_transport_producer(self):
143 self.transport.pauseProducing()
146 def _init_transport(self):
147 transport = self._queue.wait()
148 self.transport = transport
149 if self.transportBufferSize is not None:
150 transport.bufferSize = self.transportBufferSize
151 self._init_transport_producer()
152 transport.registerProducer(Producer2Event(self._write_event), False)
155 class Protocol(twistedProtocol):
157 def __init__(self, recepient):
158 self._recepient = recepient
160 def connectionMade(self):
161 self._recepient._got_transport(self.transport)
163 def dataReceived(self, data):
164 self._recepient._got_data(data)
166 def connectionLost(self, reason):
167 self._recepient._connectionLost(reason)
170 class UnbufferedTransport(GreenTransportBase):
171 """A very simple implementation of a green transport without an additional buffer"""
173 protocol_class = Protocol
176 """Receive a single chunk of undefined size.
178 Return '' if connection was closed cleanly, raise the exception if it was closed
179 in a non clean fashion. After that all successive calls return ''.
181 if self._disconnected_event.ready():
185 except ConnectionDone:
189 """Read the data from the socket until the connection is closed cleanly.
191 If connection was closed in a non-clean fashion, the appropriate exception
192 is raised. In that case already received data is lost.
193 Next time read() is called it returns ''.
215 class GreenTransport(GreenTransportBase):
217 protocol_class = Protocol
221 def read(self, size=-1):
222 """Read size bytes or until EOF"""
223 if not self._disconnected_event.ready():
225 while len(self._buffer) < size or size < 0:
226 self._buffer += self._wait()
227 except ConnectionDone:
230 if not self._disconnected_event.has_exception():
233 result, self._buffer = self._buffer[:size], self._buffer[size:]
235 result, self._buffer = self._buffer, ''
236 if not result and self._disconnected_event.has_exception():
238 self._disconnected_event.wait()
239 except ConnectionDone:
243 def recv(self, buflen=None):
244 """Receive a single chunk of undefined size but no bigger than buflen"""
245 if not self._disconnected_event.ready():
246 self.resumeProducing()
250 #print 'received %r' % recvd
251 self._buffer += recvd
252 except ConnectionDone:
255 if not self._disconnected_event.has_exception():
258 self.pauseProducing()
260 result, self._buffer = self._buffer, ''
262 result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
263 if not result and self._disconnected_event.has_exception():
265 self._disconnected_event.wait()
266 except ConnectionDone:
282 class GreenInstanceFactory(ClientFactory):
284 def __init__(self, instance, event):
285 self.instance = instance
288 def buildProtocol(self, addr):
291 def clientConnectionFailed(self, connector, reason):
292 self.event.send_exception(reason.type, reason.value, reason.tb)
295 class GreenClientCreator(object):
296 """Connect to a remote host and return a connected green transport instance.
299 gtransport_class = GreenTransport
301 def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
303 from twisted.internet import reactor
304 self.reactor = reactor
305 if gtransport_class is not None:
306 self.gtransport_class = gtransport_class
310 def _make_transport_and_factory(self):
311 gtransport = self.gtransport_class(*self.args, **self.kwargs)
312 protocol = gtransport.build_protocol()
313 factory = GreenInstanceFactory(protocol, gtransport._queue)
314 return gtransport, factory
316 def connectTCP(self, host, port, *args, **kwargs):
317 gtransport, factory = self._make_transport_and_factory()
318 self.reactor.connectTCP(host, port, factory, *args, **kwargs)
319 gtransport._init_transport()
322 def connectSSL(self, host, port, *args, **kwargs):
323 gtransport, factory = self._make_transport_and_factory()
324 self.reactor.connectSSL(host, port, factory, *args, **kwargs)
325 gtransport._init_transport()
328 def connectTLS(self, host, port, *args, **kwargs):
329 gtransport, factory = self._make_transport_and_factory()
330 self.reactor.connectTLS(host, port, factory, *args, **kwargs)
331 gtransport._init_transport()
334 def connectUNIX(self, address, *args, **kwargs):
335 gtransport, factory = self._make_transport_and_factory()
336 self.reactor.connectUNIX(address, factory, *args, **kwargs)
337 gtransport._init_transport()
340 def connectSRV(self, service, domain, *args, **kwargs):
341 SRVConnector = kwargs.pop('ConnectorClass', None)
342 if SRVConnector is None:
343 from twisted.names.srvconnect import SRVConnector
344 gtransport, factory = self._make_transport_and_factory()
345 c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
347 gtransport._init_transport()
351 class SimpleSpawnFactory(Factory):
352 """Factory that spawns a new greenlet for each incoming connection.
354 For an incoming connection a new greenlet is created using the provided
355 callback as a function and a connected green transport instance as an
359 gtransport_class = GreenTransport
361 def __init__(self, handler, gtransport_class=None, *args, **kwargs):
362 if callable(handler):
363 self.handler = handler
365 self.handler = handler.send
366 if hasattr(handler, 'send_exception'):
367 self.exc_handler = handler.send_exception
368 if gtransport_class is not None:
369 self.gtransport_class = gtransport_class
373 def exc_handler(self, *args):
376 def buildProtocol(self, addr):
377 gtransport = self.gtransport_class(*self.args, **self.kwargs)
378 protocol = gtransport.build_protocol()
379 protocol.factory = self
380 self._do_spawn(gtransport, protocol)
383 def _do_spawn(self, gtransport, protocol):
384 greenthread.spawn(self._run_handler, gtransport, protocol)
386 def _run_handler(self, gtransport, protocol):
388 gtransport._init_transport()
390 self.exc_handler(*sys.exc_info())
392 self.handler(gtransport)
395 class SpawnFactory(SimpleSpawnFactory):
396 """An extension to SimpleSpawnFactory that provides some control over
397 the greenlets it has spawned.
400 def __init__(self, handler, gtransport_class=None, *args, **kwargs):
401 self.greenlets = set()
402 SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
404 def _do_spawn(self, gtransport, protocol):
405 g = greenthread.spawn(self._run_handler, gtransport, protocol)
406 self.greenlets.add(g)
407 g.link(lambda *_: self.greenlets.remove(g))
411 for g in self.greenlets:
412 results.append(g.wait())