X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;ds=inline;f=eventlet%2Feventlet%2Ftwistedutil%2Fprotocol.py;fp=eventlet%2Feventlet%2Ftwistedutil%2Fprotocol.py;h=60d43ad6089a1c09fd4332ba8975d29aaac0a87e;hb=a7790d9c6e32b6ce02cf489d91c232e7b4d31161;hp=0000000000000000000000000000000000000000;hpb=70992db4bef26426213a8eae488be377cdd655ae;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/eventlet/twistedutil/protocol.py b/eventlet/eventlet/twistedutil/protocol.py new file mode 100644 index 0000000..60d43ad --- /dev/null +++ b/eventlet/eventlet/twistedutil/protocol.py @@ -0,0 +1,414 @@ +"""Basic twisted protocols converted to synchronous mode""" +import sys +from twisted.internet.protocol import Protocol as twistedProtocol +from twisted.internet.error import ConnectionDone +from twisted.internet.protocol import Factory, ClientFactory +from twisted.internet import main +from twisted.python import failure + +from eventlet import greenthread +from eventlet import getcurrent +from eventlet.coros import Queue +from eventlet.event import Event as BaseEvent + + +class ValueQueue(Queue): + """Queue that keeps the last item forever in the queue if it's an exception. + Useful if you send an exception over queue only once, and once sent it must be always + available. + """ + + def send(self, value=None, exc=None): + if exc is not None or not self.has_error(): + Queue.send(self, value, exc) + + def wait(self): + """The difference from Queue.wait: if there is an only item in the + Queue and it is an exception, raise it, but keep it in the Queue, so + that future calls to wait() will raise it again. + """ + if self.has_error() and len(self.items)==1: + # the last item, which is an exception, raise without emptying the Queue + getcurrent().throw(*self.items[0][1]) + else: + return Queue.wait(self) + + def has_error(self): + return self.items and self.items[-1][1] is not None + + +class Event(BaseEvent): + + def send(self, value, exc=None): + if self.ready(): + self.reset() + return BaseEvent.send(self, value, exc) + + def send_exception(self, *throw_args): + if self.ready(): + self.reset() + return BaseEvent.send_exception(self, *throw_args) + +class Producer2Event(object): + + # implements IPullProducer + + def __init__(self, event): + self.event = event + + def resumeProducing(self): + self.event.send(1) + + def stopProducing(self): + del self.event + + +class GreenTransportBase(object): + + transportBufferSize = None + + def __init__(self, transportBufferSize=None): + if transportBufferSize is not None: + self.transportBufferSize = transportBufferSize + self._queue = ValueQueue() + self._write_event = Event() + self._disconnected_event = Event() + + def build_protocol(self): + return self.protocol_class(self) + + def _got_transport(self, transport): + self._queue.send(transport) + + def _got_data(self, data): + self._queue.send(data) + + def _connectionLost(self, reason): + self._disconnected_event.send(reason.value) + self._queue.send_exception(reason.value) + self._write_event.send_exception(reason.value) + + def _wait(self): + if self.disconnecting or self._disconnected_event.ready(): + if self._queue: + return self._queue.wait() + else: + raise self._disconnected_event.wait() + self.resumeProducing() + try: + return self._queue.wait() + finally: + self.pauseProducing() + + def write(self, data, wait=True): + if self._disconnected_event.ready(): + raise self._disconnected_event.wait() + if wait: + self._write_event.reset() + self.transport.write(data) + self._write_event.wait() + else: + self.transport.write(data) + + def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True): + self.transport.unregisterProducer() + self.transport.loseConnection(connDone) + if wait: + self._disconnected_event.wait() + + def __getattr__(self, item): + if item=='transport': + raise AttributeError(item) + if hasattr(self, 'transport'): + try: + return getattr(self.transport, item) + except AttributeError: + me = type(self).__name__ + trans = type(self.transport).__name__ + raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item)) + else: + raise AttributeError(item) + + def resumeProducing(self): + self.paused -= 1 + if self.paused==0: + self.transport.resumeProducing() + + def pauseProducing(self): + self.paused += 1 + if self.paused==1: + self.transport.pauseProducing() + + def _init_transport_producer(self): + self.transport.pauseProducing() + self.paused = 1 + + def _init_transport(self): + transport = self._queue.wait() + self.transport = transport + if self.transportBufferSize is not None: + transport.bufferSize = self.transportBufferSize + self._init_transport_producer() + transport.registerProducer(Producer2Event(self._write_event), False) + + +class Protocol(twistedProtocol): + + def __init__(self, recepient): + self._recepient = recepient + + def connectionMade(self): + self._recepient._got_transport(self.transport) + + def dataReceived(self, data): + self._recepient._got_data(data) + + def connectionLost(self, reason): + self._recepient._connectionLost(reason) + + +class UnbufferedTransport(GreenTransportBase): + """A very simple implementation of a green transport without an additional buffer""" + + protocol_class = Protocol + + def recv(self): + """Receive a single chunk of undefined size. + + Return '' if connection was closed cleanly, raise the exception if it was closed + in a non clean fashion. After that all successive calls return ''. + """ + if self._disconnected_event.ready(): + return '' + try: + return self._wait() + except ConnectionDone: + return '' + + def read(self): + """Read the data from the socket until the connection is closed cleanly. + + If connection was closed in a non-clean fashion, the appropriate exception + is raised. In that case already received data is lost. + Next time read() is called it returns ''. + """ + result = '' + while True: + recvd = self.recv() + if not recvd: + break + result += recvd + return result + + # iterator protocol: + + def __iter__(self): + return self + + def next(self): + result = self.recv() + if not result: + raise StopIteration + return result + + +class GreenTransport(GreenTransportBase): + + protocol_class = Protocol + _buffer = '' + _error = None + + def read(self, size=-1): + """Read size bytes or until EOF""" + if not self._disconnected_event.ready(): + try: + while len(self._buffer) < size or size < 0: + self._buffer += self._wait() + except ConnectionDone: + pass + except: + if not self._disconnected_event.has_exception(): + raise + if size>=0: + result, self._buffer = self._buffer[:size], self._buffer[size:] + else: + result, self._buffer = self._buffer, '' + if not result and self._disconnected_event.has_exception(): + try: + self._disconnected_event.wait() + except ConnectionDone: + pass + return result + + def recv(self, buflen=None): + """Receive a single chunk of undefined size but no bigger than buflen""" + if not self._disconnected_event.ready(): + self.resumeProducing() + try: + try: + recvd = self._wait() + #print 'received %r' % recvd + self._buffer += recvd + except ConnectionDone: + pass + except: + if not self._disconnected_event.has_exception(): + raise + finally: + self.pauseProducing() + if buflen is None: + result, self._buffer = self._buffer, '' + else: + result, self._buffer = self._buffer[:buflen], self._buffer[buflen:] + if not result and self._disconnected_event.has_exception(): + try: + self._disconnected_event.wait() + except ConnectionDone: + pass + return result + + # iterator protocol: + + def __iter__(self): + return self + + def next(self): + res = self.recv() + if not res: + raise StopIteration + return res + + +class GreenInstanceFactory(ClientFactory): + + def __init__(self, instance, event): + self.instance = instance + self.event = event + + def buildProtocol(self, addr): + return self.instance + + def clientConnectionFailed(self, connector, reason): + self.event.send_exception(reason.type, reason.value, reason.tb) + + +class GreenClientCreator(object): + """Connect to a remote host and return a connected green transport instance. + """ + + gtransport_class = GreenTransport + + def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs): + if reactor is None: + from twisted.internet import reactor + self.reactor = reactor + if gtransport_class is not None: + self.gtransport_class = gtransport_class + self.args = args + self.kwargs = kwargs + + def _make_transport_and_factory(self): + gtransport = self.gtransport_class(*self.args, **self.kwargs) + protocol = gtransport.build_protocol() + factory = GreenInstanceFactory(protocol, gtransport._queue) + return gtransport, factory + + def connectTCP(self, host, port, *args, **kwargs): + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectTCP(host, port, factory, *args, **kwargs) + gtransport._init_transport() + return gtransport + + def connectSSL(self, host, port, *args, **kwargs): + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectSSL(host, port, factory, *args, **kwargs) + gtransport._init_transport() + return gtransport + + def connectTLS(self, host, port, *args, **kwargs): + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectTLS(host, port, factory, *args, **kwargs) + gtransport._init_transport() + return gtransport + + def connectUNIX(self, address, *args, **kwargs): + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectUNIX(address, factory, *args, **kwargs) + gtransport._init_transport() + return gtransport + + def connectSRV(self, service, domain, *args, **kwargs): + SRVConnector = kwargs.pop('ConnectorClass', None) + if SRVConnector is None: + from twisted.names.srvconnect import SRVConnector + gtransport, factory = self._make_transport_and_factory() + c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs) + c.connect() + gtransport._init_transport() + return gtransport + + +class SimpleSpawnFactory(Factory): + """Factory that spawns a new greenlet for each incoming connection. + + For an incoming connection a new greenlet is created using the provided + callback as a function and a connected green transport instance as an + argument. + """ + + gtransport_class = GreenTransport + + def __init__(self, handler, gtransport_class=None, *args, **kwargs): + if callable(handler): + self.handler = handler + else: + self.handler = handler.send + if hasattr(handler, 'send_exception'): + self.exc_handler = handler.send_exception + if gtransport_class is not None: + self.gtransport_class = gtransport_class + self.args = args + self.kwargs = kwargs + + def exc_handler(self, *args): + pass + + def buildProtocol(self, addr): + gtransport = self.gtransport_class(*self.args, **self.kwargs) + protocol = gtransport.build_protocol() + protocol.factory = self + self._do_spawn(gtransport, protocol) + return protocol + + def _do_spawn(self, gtransport, protocol): + greenthread.spawn(self._run_handler, gtransport, protocol) + + def _run_handler(self, gtransport, protocol): + try: + gtransport._init_transport() + except Exception: + self.exc_handler(*sys.exc_info()) + else: + self.handler(gtransport) + + +class SpawnFactory(SimpleSpawnFactory): + """An extension to SimpleSpawnFactory that provides some control over + the greenlets it has spawned. + """ + + def __init__(self, handler, gtransport_class=None, *args, **kwargs): + self.greenlets = set() + SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs) + + def _do_spawn(self, gtransport, protocol): + g = greenthread.spawn(self._run_handler, gtransport, protocol) + self.greenlets.add(g) + g.link(lambda *_: self.greenlets.remove(g)) + + def waitall(self): + results = [] + for g in self.greenlets: + results.append(g.wait()) + return results +