Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / twistedutil / protocol.py
diff --git a/eventlet/eventlet/twistedutil/protocol.py b/eventlet/eventlet/twistedutil/protocol.py
new file mode 100644 (file)
index 0000000..60d43ad
--- /dev/null
@@ -0,0 +1,414 @@
+"""Basic twisted protocols converted to synchronous mode"""
+import sys
+from twisted.internet.protocol import Protocol as twistedProtocol
+from twisted.internet.error import ConnectionDone
+from twisted.internet.protocol import Factory, ClientFactory
+from twisted.internet import main
+from twisted.python import failure
+
+from eventlet import greenthread
+from eventlet import getcurrent
+from eventlet.coros import Queue
+from eventlet.event import Event as BaseEvent
+
+
+class ValueQueue(Queue):
+    """Queue that keeps the last item forever in the queue if it's an exception.
+    Useful if you send an exception over queue only once, and once sent it must be always
+    available.
+    """
+
+    def send(self, value=None, exc=None):
+        if exc is not None or not self.has_error():
+            Queue.send(self, value, exc)
+
+    def wait(self):
+        """The difference from Queue.wait: if there is an only item in the
+        Queue and it is an exception, raise it, but keep it in the Queue, so
+        that future calls to wait() will raise it again.
+        """
+        if self.has_error() and len(self.items)==1:
+            # the last item, which is an exception, raise without emptying the Queue
+            getcurrent().throw(*self.items[0][1])
+        else:
+            return Queue.wait(self)
+
+    def has_error(self):
+        return self.items and self.items[-1][1] is not None
+
+
+class Event(BaseEvent):
+
+    def send(self, value, exc=None):
+        if self.ready():
+            self.reset()
+        return BaseEvent.send(self, value, exc)
+
+    def send_exception(self, *throw_args):
+        if self.ready():
+            self.reset()
+        return BaseEvent.send_exception(self, *throw_args)
+
+class Producer2Event(object):
+
+    # implements IPullProducer
+
+    def __init__(self, event):
+        self.event = event
+
+    def resumeProducing(self):
+        self.event.send(1)
+
+    def stopProducing(self):
+        del self.event
+
+
+class GreenTransportBase(object):
+
+    transportBufferSize = None
+
+    def __init__(self, transportBufferSize=None):
+        if transportBufferSize is not None:
+            self.transportBufferSize = transportBufferSize
+        self._queue = ValueQueue()
+        self._write_event = Event()
+        self._disconnected_event = Event()
+
+    def build_protocol(self):
+        return self.protocol_class(self)
+
+    def _got_transport(self, transport):
+        self._queue.send(transport)
+
+    def _got_data(self, data):
+        self._queue.send(data)
+
+    def _connectionLost(self, reason):
+        self._disconnected_event.send(reason.value)
+        self._queue.send_exception(reason.value)
+        self._write_event.send_exception(reason.value)
+
+    def _wait(self):
+        if self.disconnecting or self._disconnected_event.ready():
+            if self._queue:
+                return self._queue.wait()
+            else:
+                raise self._disconnected_event.wait()
+        self.resumeProducing()
+        try:
+            return self._queue.wait()
+        finally:
+            self.pauseProducing()
+
+    def write(self, data, wait=True):
+        if self._disconnected_event.ready():
+            raise self._disconnected_event.wait()
+        if wait:
+            self._write_event.reset()
+            self.transport.write(data)
+            self._write_event.wait()
+        else:
+            self.transport.write(data)
+
+    def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
+        self.transport.unregisterProducer()
+        self.transport.loseConnection(connDone)
+        if wait:
+            self._disconnected_event.wait()
+
+    def __getattr__(self, item):
+        if item=='transport':
+            raise AttributeError(item)
+        if hasattr(self, 'transport'):
+            try:
+                return getattr(self.transport, item)
+            except AttributeError:
+                me = type(self).__name__
+                trans = type(self.transport).__name__
+                raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
+        else:
+            raise AttributeError(item)
+
+    def resumeProducing(self):
+        self.paused -= 1
+        if self.paused==0:
+            self.transport.resumeProducing()
+
+    def pauseProducing(self):
+        self.paused += 1
+        if self.paused==1:
+            self.transport.pauseProducing()
+
+    def _init_transport_producer(self):
+        self.transport.pauseProducing()
+        self.paused = 1
+
+    def _init_transport(self):
+        transport = self._queue.wait()
+        self.transport = transport
+        if self.transportBufferSize is not None:
+            transport.bufferSize = self.transportBufferSize
+        self._init_transport_producer()
+        transport.registerProducer(Producer2Event(self._write_event), False)
+
+
+class Protocol(twistedProtocol):
+
+    def __init__(self, recepient):
+        self._recepient = recepient
+
+    def connectionMade(self):
+        self._recepient._got_transport(self.transport)
+
+    def dataReceived(self, data):
+        self._recepient._got_data(data)
+
+    def connectionLost(self, reason):
+        self._recepient._connectionLost(reason)
+
+
+class UnbufferedTransport(GreenTransportBase):
+    """A very simple implementation of a green transport without an additional buffer"""
+
+    protocol_class = Protocol
+
+    def recv(self):
+        """Receive a single chunk of undefined size.
+
+        Return '' if connection was closed cleanly, raise the exception if it was closed
+        in a non clean fashion. After that all successive calls return ''.
+        """
+        if self._disconnected_event.ready():
+            return ''
+        try:
+            return self._wait()
+        except ConnectionDone:
+            return ''
+
+    def read(self):
+        """Read the data from the socket until the connection is closed cleanly.
+
+        If connection was closed in a non-clean fashion, the appropriate exception
+        is raised. In that case already received data is lost.
+        Next time read() is called it returns ''.
+        """
+        result = ''
+        while True:
+            recvd = self.recv()
+            if not recvd:
+                break
+            result += recvd
+        return result
+
+    # iterator protocol:
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        result = self.recv()
+        if not result:
+            raise StopIteration
+        return result
+
+
+class GreenTransport(GreenTransportBase):
+
+    protocol_class = Protocol
+    _buffer = ''
+    _error = None
+
+    def read(self, size=-1):
+        """Read size bytes or until EOF"""
+        if not self._disconnected_event.ready():
+            try:
+                while len(self._buffer) < size or size < 0:
+                    self._buffer += self._wait()
+            except ConnectionDone:
+                pass
+            except:
+                if not self._disconnected_event.has_exception():
+                    raise
+        if size>=0:
+            result, self._buffer = self._buffer[:size], self._buffer[size:]
+        else:
+            result, self._buffer = self._buffer, ''
+        if not result and self._disconnected_event.has_exception():
+            try:
+                self._disconnected_event.wait()
+            except ConnectionDone:
+                pass
+        return result
+
+    def recv(self, buflen=None):
+        """Receive a single chunk of undefined size but no bigger than buflen"""
+        if not self._disconnected_event.ready():
+            self.resumeProducing()
+            try:
+                try:
+                    recvd = self._wait()
+                    #print 'received %r' % recvd
+                    self._buffer += recvd
+                except ConnectionDone:
+                    pass
+                except:
+                    if not self._disconnected_event.has_exception():
+                        raise
+            finally:
+                self.pauseProducing()
+        if buflen is None:
+            result, self._buffer = self._buffer, ''
+        else:
+            result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
+        if not result and self._disconnected_event.has_exception():
+            try:
+                self._disconnected_event.wait()
+            except ConnectionDone:
+                pass
+        return result
+
+    # iterator protocol:
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        res = self.recv()
+        if not res:
+            raise StopIteration
+        return res
+
+
+class GreenInstanceFactory(ClientFactory):
+
+    def __init__(self, instance, event):
+        self.instance = instance
+        self.event = event
+
+    def buildProtocol(self, addr):
+        return self.instance
+
+    def clientConnectionFailed(self, connector, reason):
+        self.event.send_exception(reason.type, reason.value, reason.tb)
+
+
+class GreenClientCreator(object):
+    """Connect to a remote host and return a connected green transport instance.
+    """
+
+    gtransport_class = GreenTransport
+
+    def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+        if gtransport_class is not None:
+            self.gtransport_class = gtransport_class
+        self.args = args
+        self.kwargs = kwargs
+
+    def _make_transport_and_factory(self):
+        gtransport = self.gtransport_class(*self.args, **self.kwargs)
+        protocol = gtransport.build_protocol()
+        factory = GreenInstanceFactory(protocol, gtransport._queue)
+        return gtransport, factory
+
+    def connectTCP(self, host, port, *args, **kwargs):
+        gtransport, factory = self._make_transport_and_factory()
+        self.reactor.connectTCP(host, port, factory, *args, **kwargs)
+        gtransport._init_transport()
+        return gtransport
+
+    def connectSSL(self, host, port, *args, **kwargs):
+        gtransport, factory = self._make_transport_and_factory()
+        self.reactor.connectSSL(host, port, factory, *args, **kwargs)
+        gtransport._init_transport()
+        return gtransport
+
+    def connectTLS(self, host, port, *args, **kwargs):
+        gtransport, factory = self._make_transport_and_factory()
+        self.reactor.connectTLS(host, port, factory, *args, **kwargs)
+        gtransport._init_transport()
+        return gtransport
+
+    def connectUNIX(self, address, *args, **kwargs):
+        gtransport, factory = self._make_transport_and_factory()
+        self.reactor.connectUNIX(address, factory, *args, **kwargs)
+        gtransport._init_transport()
+        return gtransport
+
+    def connectSRV(self, service, domain, *args, **kwargs):
+        SRVConnector = kwargs.pop('ConnectorClass', None)
+        if SRVConnector is None:
+            from twisted.names.srvconnect import SRVConnector
+        gtransport, factory = self._make_transport_and_factory()
+        c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
+        c.connect()
+        gtransport._init_transport()
+        return gtransport
+
+
+class SimpleSpawnFactory(Factory):
+    """Factory that spawns a new greenlet for each incoming connection.
+
+    For an incoming connection a new greenlet is created using the provided
+    callback as a function and a connected green transport instance as an
+    argument.
+    """
+
+    gtransport_class = GreenTransport
+
+    def __init__(self, handler, gtransport_class=None, *args, **kwargs):
+        if callable(handler):
+            self.handler = handler
+        else:
+            self.handler = handler.send
+        if hasattr(handler, 'send_exception'):
+            self.exc_handler = handler.send_exception
+        if gtransport_class is not None:
+            self.gtransport_class = gtransport_class
+        self.args = args
+        self.kwargs = kwargs
+
+    def exc_handler(self, *args):
+        pass
+
+    def buildProtocol(self, addr):
+        gtransport = self.gtransport_class(*self.args, **self.kwargs)
+        protocol = gtransport.build_protocol()
+        protocol.factory = self
+        self._do_spawn(gtransport, protocol)
+        return protocol
+
+    def _do_spawn(self, gtransport, protocol):
+        greenthread.spawn(self._run_handler, gtransport, protocol)
+
+    def _run_handler(self, gtransport, protocol):
+        try:
+            gtransport._init_transport()
+        except Exception:
+            self.exc_handler(*sys.exc_info())
+        else:
+            self.handler(gtransport)
+
+
+class SpawnFactory(SimpleSpawnFactory):
+    """An extension to SimpleSpawnFactory that provides some control over
+    the greenlets it has spawned.
+    """
+
+    def __init__(self, handler, gtransport_class=None, *args, **kwargs):
+        self.greenlets = set()
+        SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
+
+    def _do_spawn(self, gtransport, protocol):
+        g = greenthread.spawn(self._run_handler, gtransport, protocol)
+        self.greenlets.add(g)
+        g.link(lambda *_: self.greenlets.remove(g))
+
+    def waitall(self):
+        results = []
+        for g in self.greenlets:
+            results.append(g.wait())
+        return results
+