Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / twistedutil / protocol.py
1 """Basic twisted protocols converted to synchronous mode"""
2 import sys
3 from twisted.internet.protocol import Protocol as twistedProtocol
4 from twisted.internet.error import ConnectionDone
5 from twisted.internet.protocol import Factory, ClientFactory
6 from twisted.internet import main
7 from twisted.python import failure
8
9 from eventlet import greenthread
10 from eventlet import getcurrent
11 from eventlet.coros import Queue
12 from eventlet.event import Event as BaseEvent
13
14
15 class ValueQueue(Queue):
16     """Queue that keeps the last item forever in the queue if it's an exception.
17     Useful if you send an exception over queue only once, and once sent it must be always
18     available.
19     """
20
21     def send(self, value=None, exc=None):
22         if exc is not None or not self.has_error():
23             Queue.send(self, value, exc)
24
25     def wait(self):
26         """The difference from Queue.wait: if there is an only item in the
27         Queue and it is an exception, raise it, but keep it in the Queue, so
28         that future calls to wait() will raise it again.
29         """
30         if self.has_error() and len(self.items)==1:
31             # the last item, which is an exception, raise without emptying the Queue
32             getcurrent().throw(*self.items[0][1])
33         else:
34             return Queue.wait(self)
35
36     def has_error(self):
37         return self.items and self.items[-1][1] is not None
38
39
40 class Event(BaseEvent):
41
42     def send(self, value, exc=None):
43         if self.ready():
44             self.reset()
45         return BaseEvent.send(self, value, exc)
46
47     def send_exception(self, *throw_args):
48         if self.ready():
49             self.reset()
50         return BaseEvent.send_exception(self, *throw_args)
51
52 class Producer2Event(object):
53
54     # implements IPullProducer
55
56     def __init__(self, event):
57         self.event = event
58
59     def resumeProducing(self):
60         self.event.send(1)
61
62     def stopProducing(self):
63         del self.event
64
65
66 class GreenTransportBase(object):
67
68     transportBufferSize = None
69
70     def __init__(self, transportBufferSize=None):
71         if transportBufferSize is not None:
72             self.transportBufferSize = transportBufferSize
73         self._queue = ValueQueue()
74         self._write_event = Event()
75         self._disconnected_event = Event()
76
77     def build_protocol(self):
78         return self.protocol_class(self)
79
80     def _got_transport(self, transport):
81         self._queue.send(transport)
82
83     def _got_data(self, data):
84         self._queue.send(data)
85
86     def _connectionLost(self, reason):
87         self._disconnected_event.send(reason.value)
88         self._queue.send_exception(reason.value)
89         self._write_event.send_exception(reason.value)
90
91     def _wait(self):
92         if self.disconnecting or self._disconnected_event.ready():
93             if self._queue:
94                 return self._queue.wait()
95             else:
96                 raise self._disconnected_event.wait()
97         self.resumeProducing()
98         try:
99             return self._queue.wait()
100         finally:
101             self.pauseProducing()
102
103     def write(self, data, wait=True):
104         if self._disconnected_event.ready():
105             raise self._disconnected_event.wait()
106         if wait:
107             self._write_event.reset()
108             self.transport.write(data)
109             self._write_event.wait()
110         else:
111             self.transport.write(data)
112
113     def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
114         self.transport.unregisterProducer()
115         self.transport.loseConnection(connDone)
116         if wait:
117             self._disconnected_event.wait()
118
119     def __getattr__(self, item):
120         if item=='transport':
121             raise AttributeError(item)
122         if hasattr(self, 'transport'):
123             try:
124                 return getattr(self.transport, item)
125             except AttributeError:
126                 me = type(self).__name__
127                 trans = type(self.transport).__name__
128                 raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
129         else:
130             raise AttributeError(item)
131
132     def resumeProducing(self):
133         self.paused -= 1
134         if self.paused==0:
135             self.transport.resumeProducing()
136
137     def pauseProducing(self):
138         self.paused += 1
139         if self.paused==1:
140             self.transport.pauseProducing()
141
142     def _init_transport_producer(self):
143         self.transport.pauseProducing()
144         self.paused = 1
145
146     def _init_transport(self):
147         transport = self._queue.wait()
148         self.transport = transport
149         if self.transportBufferSize is not None:
150             transport.bufferSize = self.transportBufferSize
151         self._init_transport_producer()
152         transport.registerProducer(Producer2Event(self._write_event), False)
153
154
155 class Protocol(twistedProtocol):
156
157     def __init__(self, recepient):
158         self._recepient = recepient
159
160     def connectionMade(self):
161         self._recepient._got_transport(self.transport)
162
163     def dataReceived(self, data):
164         self._recepient._got_data(data)
165
166     def connectionLost(self, reason):
167         self._recepient._connectionLost(reason)
168
169
170 class UnbufferedTransport(GreenTransportBase):
171     """A very simple implementation of a green transport without an additional buffer"""
172
173     protocol_class = Protocol
174
175     def recv(self):
176         """Receive a single chunk of undefined size.
177
178         Return '' if connection was closed cleanly, raise the exception if it was closed
179         in a non clean fashion. After that all successive calls return ''.
180         """
181         if self._disconnected_event.ready():
182             return ''
183         try:
184             return self._wait()
185         except ConnectionDone:
186             return ''
187
188     def read(self):
189         """Read the data from the socket until the connection is closed cleanly.
190
191         If connection was closed in a non-clean fashion, the appropriate exception
192         is raised. In that case already received data is lost.
193         Next time read() is called it returns ''.
194         """
195         result = ''
196         while True:
197             recvd = self.recv()
198             if not recvd:
199                 break
200             result += recvd
201         return result
202
203     # iterator protocol:
204
205     def __iter__(self):
206         return self
207
208     def next(self):
209         result = self.recv()
210         if not result:
211             raise StopIteration
212         return result
213
214
215 class GreenTransport(GreenTransportBase):
216
217     protocol_class = Protocol
218     _buffer = ''
219     _error = None
220
221     def read(self, size=-1):
222         """Read size bytes or until EOF"""
223         if not self._disconnected_event.ready():
224             try:
225                 while len(self._buffer) < size or size < 0:
226                     self._buffer += self._wait()
227             except ConnectionDone:
228                 pass
229             except:
230                 if not self._disconnected_event.has_exception():
231                     raise
232         if size>=0:
233             result, self._buffer = self._buffer[:size], self._buffer[size:]
234         else:
235             result, self._buffer = self._buffer, ''
236         if not result and self._disconnected_event.has_exception():
237             try:
238                 self._disconnected_event.wait()
239             except ConnectionDone:
240                 pass
241         return result
242
243     def recv(self, buflen=None):
244         """Receive a single chunk of undefined size but no bigger than buflen"""
245         if not self._disconnected_event.ready():
246             self.resumeProducing()
247             try:
248                 try:
249                     recvd = self._wait()
250                     #print 'received %r' % recvd
251                     self._buffer += recvd
252                 except ConnectionDone:
253                     pass
254                 except:
255                     if not self._disconnected_event.has_exception():
256                         raise
257             finally:
258                 self.pauseProducing()
259         if buflen is None:
260             result, self._buffer = self._buffer, ''
261         else:
262             result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
263         if not result and self._disconnected_event.has_exception():
264             try:
265                 self._disconnected_event.wait()
266             except ConnectionDone:
267                 pass
268         return result
269
270     # iterator protocol:
271
272     def __iter__(self):
273         return self
274
275     def next(self):
276         res = self.recv()
277         if not res:
278             raise StopIteration
279         return res
280
281
282 class GreenInstanceFactory(ClientFactory):
283
284     def __init__(self, instance, event):
285         self.instance = instance
286         self.event = event
287
288     def buildProtocol(self, addr):
289         return self.instance
290
291     def clientConnectionFailed(self, connector, reason):
292         self.event.send_exception(reason.type, reason.value, reason.tb)
293
294
295 class GreenClientCreator(object):
296     """Connect to a remote host and return a connected green transport instance.
297     """
298
299     gtransport_class = GreenTransport
300
301     def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
302         if reactor is None:
303             from twisted.internet import reactor
304         self.reactor = reactor
305         if gtransport_class is not None:
306             self.gtransport_class = gtransport_class
307         self.args = args
308         self.kwargs = kwargs
309
310     def _make_transport_and_factory(self):
311         gtransport = self.gtransport_class(*self.args, **self.kwargs)
312         protocol = gtransport.build_protocol()
313         factory = GreenInstanceFactory(protocol, gtransport._queue)
314         return gtransport, factory
315
316     def connectTCP(self, host, port, *args, **kwargs):
317         gtransport, factory = self._make_transport_and_factory()
318         self.reactor.connectTCP(host, port, factory, *args, **kwargs)
319         gtransport._init_transport()
320         return gtransport
321
322     def connectSSL(self, host, port, *args, **kwargs):
323         gtransport, factory = self._make_transport_and_factory()
324         self.reactor.connectSSL(host, port, factory, *args, **kwargs)
325         gtransport._init_transport()
326         return gtransport
327
328     def connectTLS(self, host, port, *args, **kwargs):
329         gtransport, factory = self._make_transport_and_factory()
330         self.reactor.connectTLS(host, port, factory, *args, **kwargs)
331         gtransport._init_transport()
332         return gtransport
333
334     def connectUNIX(self, address, *args, **kwargs):
335         gtransport, factory = self._make_transport_and_factory()
336         self.reactor.connectUNIX(address, factory, *args, **kwargs)
337         gtransport._init_transport()
338         return gtransport
339
340     def connectSRV(self, service, domain, *args, **kwargs):
341         SRVConnector = kwargs.pop('ConnectorClass', None)
342         if SRVConnector is None:
343             from twisted.names.srvconnect import SRVConnector
344         gtransport, factory = self._make_transport_and_factory()
345         c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
346         c.connect()
347         gtransport._init_transport()
348         return gtransport
349
350
351 class SimpleSpawnFactory(Factory):
352     """Factory that spawns a new greenlet for each incoming connection.
353
354     For an incoming connection a new greenlet is created using the provided
355     callback as a function and a connected green transport instance as an
356     argument.
357     """
358
359     gtransport_class = GreenTransport
360
361     def __init__(self, handler, gtransport_class=None, *args, **kwargs):
362         if callable(handler):
363             self.handler = handler
364         else:
365             self.handler = handler.send
366         if hasattr(handler, 'send_exception'):
367             self.exc_handler = handler.send_exception
368         if gtransport_class is not None:
369             self.gtransport_class = gtransport_class
370         self.args = args
371         self.kwargs = kwargs
372
373     def exc_handler(self, *args):
374         pass
375
376     def buildProtocol(self, addr):
377         gtransport = self.gtransport_class(*self.args, **self.kwargs)
378         protocol = gtransport.build_protocol()
379         protocol.factory = self
380         self._do_spawn(gtransport, protocol)
381         return protocol
382
383     def _do_spawn(self, gtransport, protocol):
384         greenthread.spawn(self._run_handler, gtransport, protocol)
385
386     def _run_handler(self, gtransport, protocol):
387         try:
388             gtransport._init_transport()
389         except Exception:
390             self.exc_handler(*sys.exc_info())
391         else:
392             self.handler(gtransport)
393
394
395 class SpawnFactory(SimpleSpawnFactory):
396     """An extension to SimpleSpawnFactory that provides some control over
397     the greenlets it has spawned.
398     """
399
400     def __init__(self, handler, gtransport_class=None, *args, **kwargs):
401         self.greenlets = set()
402         SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
403
404     def _do_spawn(self, gtransport, protocol):
405         g = greenthread.spawn(self._run_handler, gtransport, protocol)
406         self.greenlets.add(g)
407         g.link(lambda *_: self.greenlets.remove(g))
408
409     def waitall(self):
410         results = []
411         for g in self.greenlets:
412             results.append(g.wait())
413         return results
414