X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=python-eventlet%2Feventlet%2Fgreenio%2Fbase.py;h=e1c4a4552d46aae884ed05e13c59dd2d08737c5d;hb=refs%2Fheads%2Fmaster;hp=8da51caa88f155062297e0c7290cd193dd27df9f;hpb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/python-eventlet/eventlet/greenio/base.py b/python-eventlet/eventlet/greenio/base.py index 8da51ca..e1c4a45 100644 --- a/python-eventlet/eventlet/greenio/base.py +++ b/python-eventlet/eventlet/greenio/base.py @@ -1,13 +1,13 @@ import errno import os -from socket import socket as _original_socket import socket import sys import time import warnings -from eventlet.support import get_errno, six +import eventlet from eventlet.hubs import trampoline, notify_opened, IOClosed +from eventlet.support import get_errno, six __all__ = [ 'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking', @@ -24,6 +24,8 @@ if sys.platform[:3] == "win": if six.PY2: _python2_fileobject = socket._fileobject +_original_socket = eventlet.patcher.original('socket').socket + def socket_connect(descriptor, address): """ @@ -304,73 +306,80 @@ class GreenSocket(object): "makefile instead", DeprecationWarning, stacklevel=2) return self.makefile(*args, **kw) - def recv(self, buflen, flags=0): + def _read_trampoline(self): + self._trampoline( + self.fd, + read=True, + timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) + + def _recv_loop(self, recv_meth, *args): fd = self.fd if self.act_non_blocking: - return fd.recv(buflen, flags) + return recv_meth(*args) + while True: try: - return fd.recv(buflen, flags) + # recv: bufsize=0? + # recv_into: buffer is empty? + # This is needed because behind the scenes we use sockets in + # nonblocking mode and builtin recv* methods. Attempting to read + # 0 bytes from a nonblocking socket using a builtin recv* method + # does not raise a timeout exception. Since we're simulating + # a blocking socket here we need to produce a timeout exception + # if needed, hence the call to trampoline. + if not args[0]: + self._read_trampoline() + return recv_meth(*args) except socket.error as e: if get_errno(e) in SOCKET_BLOCKING: pass elif get_errno(e) in SOCKET_CLOSED: - return '' + return b'' else: raise + try: - self._trampoline( - fd, - read=True, - timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) + self._read_trampoline() except IOClosed as e: # Perhaps we should return '' instead? raise EOFError() - def recvfrom(self, *args): - if not self.act_non_blocking: - self._trampoline(self.fd, read=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) - return self.fd.recvfrom(*args) + def recv(self, bufsize, flags=0): + return self._recv_loop(self.fd.recv, bufsize, flags) - def recvfrom_into(self, *args): - if not self.act_non_blocking: - self._trampoline(self.fd, read=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) - return self.fd.recvfrom_into(*args) + def recvfrom(self, bufsize, flags=0): + return self._recv_loop(self.fd.recvfrom, bufsize, flags) - def recv_into(self, *args): - if not self.act_non_blocking: - self._trampoline(self.fd, read=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) - return self.fd.recv_into(*args) + def recv_into(self, buffer, nbytes=0, flags=0): + return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags) - def send(self, data, flags=0): - fd = self.fd + def recvfrom_into(self, buffer, nbytes=0, flags=0): + return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags) + + def _send_loop(self, send_method, data, *args): if self.act_non_blocking: - return fd.send(data, flags) + return send_method(data, *args) - # blocking socket behavior - sends all, blocks if the buffer is full - total_sent = 0 - len_data = len(data) while 1: try: - total_sent += fd.send(data[total_sent:], flags) + return send_method(data, *args) except socket.error as e: - if get_errno(e) not in SOCKET_BLOCKING: + eno = get_errno(e) + if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING: raise - if total_sent == len_data: - break - try: self._trampoline(self.fd, write=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) except IOClosed: raise socket.error(errno.ECONNRESET, 'Connection closed by another thread') - return total_sent + def send(self, data, flags=0): + return self._send_loop(self.fd.send, data, flags) + + def sendto(self, data, *args): + return self._send_loop(self.fd.sendto, data, *args) def sendall(self, data, flags=0): tail = self.send(data, flags) @@ -378,10 +387,6 @@ class GreenSocket(object): while tail < len_data: tail += self.send(data[tail:], flags) - def sendto(self, *args): - self._trampoline(self.fd, write=True) - return self.fd.sendto(*args) - def setblocking(self, flag): if flag: self.act_non_blocking = False