import errno
import os
-from socket import socket as _original_socket
import socket
import sys
import time
import warnings
-from eventlet.support import get_errno, six
+import eventlet
from eventlet.hubs import trampoline, notify_opened, IOClosed
+from eventlet.support import get_errno, six
__all__ = [
'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
if six.PY2:
_python2_fileobject = socket._fileobject
+_original_socket = eventlet.patcher.original('socket').socket
+
def socket_connect(descriptor, address):
"""
"makefile instead", DeprecationWarning, stacklevel=2)
return self.makefile(*args, **kw)
- def recv(self, buflen, flags=0):
+ def _read_trampoline(self):
+ self._trampoline(
+ self.fd,
+ read=True,
+ timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+
+ def _recv_loop(self, recv_meth, *args):
fd = self.fd
if self.act_non_blocking:
- return fd.recv(buflen, flags)
+ return recv_meth(*args)
+
while True:
try:
- return fd.recv(buflen, flags)
+ # recv: bufsize=0?
+ # recv_into: buffer is empty?
+ # This is needed because behind the scenes we use sockets in
+ # nonblocking mode and builtin recv* methods. Attempting to read
+ # 0 bytes from a nonblocking socket using a builtin recv* method
+ # does not raise a timeout exception. Since we're simulating
+ # a blocking socket here we need to produce a timeout exception
+ # if needed, hence the call to trampoline.
+ if not args[0]:
+ self._read_trampoline()
+ return recv_meth(*args)
except socket.error as e:
if get_errno(e) in SOCKET_BLOCKING:
pass
elif get_errno(e) in SOCKET_CLOSED:
- return ''
+ return b''
else:
raise
+
try:
- self._trampoline(
- fd,
- read=True,
- timeout=self.gettimeout(),
- timeout_exc=socket.timeout("timed out"))
+ self._read_trampoline()
except IOClosed as e:
# Perhaps we should return '' instead?
raise EOFError()
- def recvfrom(self, *args):
- if not self.act_non_blocking:
- self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
- timeout_exc=socket.timeout("timed out"))
- return self.fd.recvfrom(*args)
+ def recv(self, bufsize, flags=0):
+ return self._recv_loop(self.fd.recv, bufsize, flags)
- def recvfrom_into(self, *args):
- if not self.act_non_blocking:
- self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
- timeout_exc=socket.timeout("timed out"))
- return self.fd.recvfrom_into(*args)
+ def recvfrom(self, bufsize, flags=0):
+ return self._recv_loop(self.fd.recvfrom, bufsize, flags)
- def recv_into(self, *args):
- if not self.act_non_blocking:
- self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
- timeout_exc=socket.timeout("timed out"))
- return self.fd.recv_into(*args)
+ def recv_into(self, buffer, nbytes=0, flags=0):
+ return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags)
- def send(self, data, flags=0):
- fd = self.fd
+ def recvfrom_into(self, buffer, nbytes=0, flags=0):
+ return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags)
+
+ def _send_loop(self, send_method, data, *args):
if self.act_non_blocking:
- return fd.send(data, flags)
+ return send_method(data, *args)
- # blocking socket behavior - sends all, blocks if the buffer is full
- total_sent = 0
- len_data = len(data)
while 1:
try:
- total_sent += fd.send(data[total_sent:], flags)
+ return send_method(data, *args)
except socket.error as e:
- if get_errno(e) not in SOCKET_BLOCKING:
+ eno = get_errno(e)
+ if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
raise
- if total_sent == len_data:
- break
-
try:
self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
except IOClosed:
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
- return total_sent
+ def send(self, data, flags=0):
+ return self._send_loop(self.fd.send, data, flags)
+
+ def sendto(self, data, *args):
+ return self._send_loop(self.fd.sendto, data, *args)
def sendall(self, data, flags=0):
tail = self.send(data, flags)
while tail < len_data:
tail += self.send(data[tail:], flags)
- def sendto(self, *args):
- self._trampoline(self.fd, write=True)
- return self.fd.sendto(*args)
-
def setblocking(self, flag):
if flag:
self.act_non_blocking = False