--- /dev/null
+import errno
+import os
+from socket import socket as _original_socket
+import socket
+import sys
+import time
+import warnings
+
+from eventlet.support import get_errno, six
+from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
+
+__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
+
+BUFFER_SIZE = 4096
+CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
+CONNECT_SUCCESS = set((0, errno.EISCONN))
+if sys.platform[:3] == "win":
+ CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
+
+if six.PY3:
+ from io import IOBase as file
+ _fileobject = socket.SocketIO
+elif six.PY2:
+ _fileobject = socket._fileobject
+
+
+def socket_connect(descriptor, address):
+ """
+ Attempts to connect to the address, returns the descriptor if it succeeds,
+ returns None if it needs to trampoline, and raises any exceptions.
+ """
+ err = descriptor.connect_ex(address)
+ if err in CONNECT_ERR:
+ return None
+ if err not in CONNECT_SUCCESS:
+ raise socket.error(err, errno.errorcode[err])
+ return descriptor
+
+
+def socket_checkerr(descriptor):
+ err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err not in CONNECT_SUCCESS:
+ raise socket.error(err, errno.errorcode[err])
+
+
+def socket_accept(descriptor):
+ """
+ Attempts to accept() on the descriptor, returns a client,address tuple
+ if it succeeds; returns None if it needs to trampoline, and raises
+ any exceptions.
+ """
+ try:
+ return descriptor.accept()
+ except socket.error as e:
+ if get_errno(e) == errno.EWOULDBLOCK:
+ return None
+ raise
+
+
+if sys.platform[:3] == "win":
+ # winsock sometimes throws ENOTCONN
+ SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
+ SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
+else:
+ # oddly, on linux/darwin, an unconnected socket is expected to block,
+ # so we treat ENOTCONN the same as EWOULDBLOCK
+ SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
+ SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
+
+
+def set_nonblocking(fd):
+ """
+ Sets the descriptor to be nonblocking. Works on many file-like
+ objects as well as sockets. Only sockets can be nonblocking on
+ Windows, however.
+ """
+ try:
+ setblocking = fd.setblocking
+ except AttributeError:
+ # fd has no setblocking() method. It could be that this version of
+ # Python predates socket.setblocking(). In that case, we can still set
+ # the flag "by hand" on the underlying OS fileno using the fcntl
+ # module.
+ try:
+ import fcntl
+ except ImportError:
+ # Whoops, Windows has no fcntl module. This might not be a socket
+ # at all, but rather a file-like object with no setblocking()
+ # method. In particular, on Windows, pipes don't support
+ # non-blocking I/O and therefore don't have that method. Which
+ # means fcntl wouldn't help even if we could load it.
+ raise NotImplementedError("set_nonblocking() on a file object "
+ "with no setblocking() method "
+ "(Windows pipes don't support non-blocking I/O)")
+ # We managed to import fcntl.
+ fileno = fd.fileno()
+ orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
+ new_flags = orig_flags | os.O_NONBLOCK
+ if new_flags != orig_flags:
+ fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
+ else:
+ # socket supports setblocking()
+ setblocking(0)
+
+
+try:
+ from socket import _GLOBAL_DEFAULT_TIMEOUT
+except ImportError:
+ _GLOBAL_DEFAULT_TIMEOUT = object()
+
+
+class GreenSocket(object):
+ """
+ Green version of socket.socket class, that is intended to be 100%
+ API-compatible.
+
+ It also recognizes the keyword parameter, 'set_nonblocking=True'.
+ Pass False to indicate that socket is already in non-blocking mode
+ to save syscalls.
+ """
+
+ # This placeholder is to prevent __getattr__ from creating an infinite call loop
+ fd = None
+
+ def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
+ should_set_nonblocking = kwargs.pop('set_nonblocking', True)
+ if isinstance(family_or_realsock, six.integer_types):
+ fd = _original_socket(family_or_realsock, *args, **kwargs)
+ # Notify the hub that this is a newly-opened socket.
+ notify_opened(fd.fileno())
+ else:
+ fd = family_or_realsock
+
+ # import timeout from other socket, if it was there
+ try:
+ self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
+ except AttributeError:
+ self._timeout = socket.getdefaulttimeout()
+
+ if should_set_nonblocking:
+ set_nonblocking(fd)
+ self.fd = fd
+ # when client calls setblocking(0) or settimeout(0) the socket must
+ # act non-blocking
+ self.act_non_blocking = False
+
+ # Copy some attributes from underlying real socket.
+ # This is the easiest way that i found to fix
+ # https://bitbucket.org/eventlet/eventlet/issue/136
+ # Only `getsockopt` is required to fix that issue, others
+ # are just premature optimization to save __getattr__ call.
+ self.bind = fd.bind
+ self.close = fd.close
+ self.fileno = fd.fileno
+ self.getsockname = fd.getsockname
+ self.getsockopt = fd.getsockopt
+ self.listen = fd.listen
+ self.setsockopt = fd.setsockopt
+ self.shutdown = fd.shutdown
+ self._closed = False
+
+ @property
+ def _sock(self):
+ return self
+
+ if six.PY3:
+ def _get_io_refs(self):
+ return self.fd._io_refs
+
+ def _set_io_refs(self, value):
+ self.fd._io_refs = value
+
+ _io_refs = property(_get_io_refs, _set_io_refs)
+
+ # Forward unknown attributes to fd, cache the value for future use.
+ # I do not see any simple attribute which could be changed
+ # so caching everything in self is fine.
+ # If we find such attributes - only attributes having __get__ might be cached.
+ # For now - I do not want to complicate it.
+ def __getattr__(self, name):
+ if self.fd is None:
+ raise AttributeError(name)
+ attr = getattr(self.fd, name)
+ setattr(self, name, attr)
+ return attr
+
+ def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
+ """ We need to trampoline via the event hub.
+ We catch any signal back from the hub indicating that the operation we
+ were waiting on was associated with a filehandle that's since been
+ invalidated.
+ """
+ if self._closed:
+ # If we did any logging, alerting to a second trampoline attempt on a closed
+ # socket here would be useful.
+ raise IOClosed()
+ try:
+ return trampoline(fd, read=read, write=write, timeout=timeout,
+ timeout_exc=timeout_exc,
+ mark_as_closed=self._mark_as_closed)
+ except IOClosed:
+ # This socket's been obsoleted. De-fang it.
+ self._mark_as_closed()
+ raise
+
+ def accept(self):
+ if self.act_non_blocking:
+ return self.fd.accept()
+ fd = self.fd
+ while True:
+ res = socket_accept(fd)
+ if res is not None:
+ client, addr = res
+ set_nonblocking(client)
+ return type(self)(client), addr
+ self._trampoline(fd, read=True, timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+
+ def _mark_as_closed(self):
+ """ Mark this socket as being closed """
+ self._closed = True
+
+ def __del__(self):
+ # This is in case self.close is not assigned yet (currently the constructor does it)
+ close = getattr(self, 'close', None)
+ if close is not None:
+ close()
+
+ def connect(self, address):
+ if self.act_non_blocking:
+ return self.fd.connect(address)
+ fd = self.fd
+ if self.gettimeout() is None:
+ while not socket_connect(fd, address):
+ try:
+ self._trampoline(fd, write=True)
+ except IOClosed:
+ raise socket.error(errno.EBADFD)
+ socket_checkerr(fd)
+ else:
+ end = time.time() + self.gettimeout()
+ while True:
+ if socket_connect(fd, address):
+ return
+ if time.time() >= end:
+ raise socket.timeout("timed out")
+ try:
+ self._trampoline(fd, write=True, timeout=end - time.time(),
+ timeout_exc=socket.timeout("timed out"))
+ except IOClosed:
+ # ... we need some workable errno here.
+ raise socket.error(errno.EBADFD)
+ socket_checkerr(fd)
+
+ def connect_ex(self, address):
+ if self.act_non_blocking:
+ return self.fd.connect_ex(address)
+ fd = self.fd
+ if self.gettimeout() is None:
+ while not socket_connect(fd, address):
+ try:
+ self._trampoline(fd, write=True)
+ socket_checkerr(fd)
+ except socket.error as ex:
+ return get_errno(ex)
+ except IOClosed:
+ return errno.EBADFD
+ else:
+ end = time.time() + self.gettimeout()
+ while True:
+ try:
+ if socket_connect(fd, address):
+ return 0
+ if time.time() >= end:
+ raise socket.timeout(errno.EAGAIN)
+ self._trampoline(fd, write=True, timeout=end - time.time(),
+ timeout_exc=socket.timeout(errno.EAGAIN))
+ socket_checkerr(fd)
+ except socket.error as ex:
+ return get_errno(ex)
+ except IOClosed:
+ return errno.EBADFD
+
+ def dup(self, *args, **kw):
+ sock = self.fd.dup(*args, **kw)
+ newsock = type(self)(sock, set_nonblocking=False)
+ newsock.settimeout(self.gettimeout())
+ return newsock
+
+ if six.PY3:
+ def makefile(self, *args, **kwargs):
+ return _original_socket.makefile(self, *args, **kwargs)
+ else:
+ def makefile(self, *args, **kwargs):
+ dupped = self.dup()
+ res = _fileobject(dupped, *args, **kwargs)
+ if hasattr(dupped, "_drop"):
+ dupped._drop()
+ return res
+
+ def makeGreenFile(self, *args, **kw):
+ warnings.warn("makeGreenFile has been deprecated, please use "
+ "makefile instead", DeprecationWarning, stacklevel=2)
+ return self.makefile(*args, **kw)
+
+ def recv(self, buflen, flags=0):
+ fd = self.fd
+ if self.act_non_blocking:
+ return fd.recv(buflen, flags)
+ while True:
+ try:
+ return fd.recv(buflen, flags)
+ except socket.error as e:
+ if get_errno(e) in SOCKET_BLOCKING:
+ pass
+ elif get_errno(e) in SOCKET_CLOSED:
+ return ''
+ else:
+ raise
+ try:
+ self._trampoline(
+ fd,
+ read=True,
+ timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+ except IOClosed as e:
+ # Perhaps we should return '' instead?
+ raise EOFError()
+
+ def recvfrom(self, *args):
+ if not self.act_non_blocking:
+ self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+ return self.fd.recvfrom(*args)
+
+ def recvfrom_into(self, *args):
+ if not self.act_non_blocking:
+ self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+ return self.fd.recvfrom_into(*args)
+
+ def recv_into(self, *args):
+ if not self.act_non_blocking:
+ self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+ return self.fd.recv_into(*args)
+
+ def send(self, data, flags=0):
+ fd = self.fd
+ if self.act_non_blocking:
+ return fd.send(data, flags)
+
+ # blocking socket behavior - sends all, blocks if the buffer is full
+ total_sent = 0
+ len_data = len(data)
+ while 1:
+ try:
+ total_sent += fd.send(data[total_sent:], flags)
+ except socket.error as e:
+ if get_errno(e) not in SOCKET_BLOCKING:
+ raise
+
+ if total_sent == len_data:
+ break
+
+ try:
+ self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
+ timeout_exc=socket.timeout("timed out"))
+ except IOClosed:
+ raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
+
+ return total_sent
+
+ def sendall(self, data, flags=0):
+ tail = self.send(data, flags)
+ len_data = len(data)
+ while tail < len_data:
+ tail += self.send(data[tail:], flags)
+
+ def sendto(self, *args):
+ self._trampoline(self.fd, write=True)
+ return self.fd.sendto(*args)
+
+ def setblocking(self, flag):
+ if flag:
+ self.act_non_blocking = False
+ self._timeout = None
+ else:
+ self.act_non_blocking = True
+ self._timeout = 0.0
+
+ def settimeout(self, howlong):
+ if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
+ self.setblocking(True)
+ return
+ try:
+ f = howlong.__float__
+ except AttributeError:
+ raise TypeError('a float is required')
+ howlong = f()
+ if howlong < 0.0:
+ raise ValueError('Timeout value out of range')
+ if howlong == 0.0:
+ self.act_non_blocking = True
+ self._timeout = 0.0
+ else:
+ self.act_non_blocking = False
+ self._timeout = howlong
+
+ def gettimeout(self):
+ return self._timeout
+
+ if "__pypy__" in sys.builtin_module_names:
+ def _reuse(self):
+ getattr(self.fd, '_sock', self.fd)._reuse()
+
+ def _drop(self):
+ getattr(self.fd, '_sock', self.fd)._drop()
+
+
+class _SocketDuckForFd(object):
+ """Class implementing all socket method used by _fileobject
+ in cooperative manner using low level os I/O calls.
+ """
+ _refcount = 0
+
+ def __init__(self, fileno):
+ self._fileno = fileno
+ notify_opened(fileno)
+ self._closed = False
+
+ def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
+ if self._closed:
+ # Don't trampoline if we're already closed.
+ raise IOClosed()
+ try:
+ return trampoline(fd, read=read, write=write, timeout=timeout,
+ timeout_exc=timeout_exc,
+ mark_as_closed=self._mark_as_closed)
+ except IOClosed:
+ # Our fileno has been obsoleted. Defang ourselves to
+ # prevent spurious closes.
+ self._mark_as_closed()
+ raise
+
+ def _mark_as_closed(self):
+ self._closed = True
+
+ @property
+ def _sock(self):
+ return self
+
+ def fileno(self):
+ return self._fileno
+
+ def recv(self, buflen):
+ while True:
+ try:
+ data = os.read(self._fileno, buflen)
+ return data
+ except OSError as e:
+ if get_errno(e) not in SOCKET_BLOCKING:
+ raise IOError(*e.args)
+ self._trampoline(self, read=True)
+
+ def recv_into(self, buf, nbytes=0, flags=0):
+ if nbytes == 0:
+ nbytes = len(buf)
+ data = self.recv(nbytes)
+ buf[:nbytes] = data
+ return len(data)
+
+ def send(self, data):
+ while True:
+ try:
+ return os.write(self._fileno, data)
+ except OSError as e:
+ if get_errno(e) not in SOCKET_BLOCKING:
+ raise IOError(*e.args)
+ else:
+ trampoline(self, write=True)
+
+ def sendall(self, data):
+ len_data = len(data)
+ os_write = os.write
+ fileno = self._fileno
+ try:
+ total_sent = os_write(fileno, data)
+ except OSError as e:
+ if get_errno(e) != errno.EAGAIN:
+ raise IOError(*e.args)
+ total_sent = 0
+ while total_sent < len_data:
+ self._trampoline(self, write=True)
+ try:
+ total_sent += os_write(fileno, data[total_sent:])
+ except OSError as e:
+ if get_errno(e) != errno. EAGAIN:
+ raise IOError(*e.args)
+
+ def __del__(self):
+ self._close()
+
+ def _close(self):
+ notify_close(self._fileno)
+ self._mark_as_closed()
+ try:
+ os.close(self._fileno)
+ except:
+ # os.close may fail if __init__ didn't complete
+ # (i.e file dscriptor passed to popen was invalid
+ pass
+
+ def __repr__(self):
+ return "%s:%d" % (self.__class__.__name__, self._fileno)
+
+ def _reuse(self):
+ self._refcount += 1
+
+ def _drop(self):
+ self._refcount -= 1
+ if self._refcount == 0:
+ self._close()
+ # Python3
+ _decref_socketios = _drop
+
+
+def _operationOnClosedFile(*args, **kwargs):
+ raise ValueError("I/O operation on closed file")
+
+
+class GreenPipe(_fileobject):
+ """
+ GreenPipe is a cooperative replacement for file class.
+ It will cooperate on pipes. It will block on regular file.
+ Differneces from file class:
+ - mode is r/w property. Should re r/o
+ - encoding property not implemented
+ - write/writelines will not raise TypeError exception when non-string data is written
+ it will write str(data) instead
+ - Universal new lines are not supported and newlines property not implementeded
+ - file argument can be descriptor, file name or file object.
+ """
+
+ def __init__(self, f, mode='r', bufsize=-1):
+ if not isinstance(f, six.string_types + (int, file)):
+ raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
+
+ if isinstance(f, six.string_types):
+ f = open(f, mode, 0)
+
+ if isinstance(f, int):
+ fileno = f
+ self._name = "<fd:%d>" % fileno
+ else:
+ fileno = os.dup(f.fileno())
+ self._name = f.name
+ if f.mode != mode:
+ raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
+ self._name = f.name
+ f.close()
+
+ super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
+ set_nonblocking(self)
+ self.softspace = 0
+
+ @property
+ def name(self):
+ return self._name
+
+ def __repr__(self):
+ return "<%s %s %r, mode %r at 0x%x>" % (
+ self.closed and 'closed' or 'open',
+ self.__class__.__name__,
+ self.name,
+ self.mode,
+ (id(self) < 0) and (sys.maxint + id(self)) or id(self))
+
+ def close(self):
+ super(GreenPipe, self).close()
+ for method in [
+ 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
+ 'readline', 'readlines', 'seek', 'tell', 'truncate',
+ 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
+ setattr(self, method, _operationOnClosedFile)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def _get_readahead_len(self):
+ return len(self._rbuf.getvalue())
+
+ def _clear_readahead_buf(self):
+ len = self._get_readahead_len()
+ if len > 0:
+ self.read(len)
+
+ def tell(self):
+ self.flush()
+ try:
+ return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
+ except OSError as e:
+ raise IOError(*e.args)
+
+ def seek(self, offset, whence=0):
+ self.flush()
+ if whence == 1 and offset == 0: # tell synonym
+ return self.tell()
+ if whence == 1: # adjust offset by what is read ahead
+ offset -= self._get_readahead_len()
+ try:
+ rv = os.lseek(self.fileno(), offset, whence)
+ except OSError as e:
+ raise IOError(*e.args)
+ else:
+ self._clear_readahead_buf()
+ return rv
+
+ if getattr(file, "truncate", None): # not all OSes implement truncate
+ def truncate(self, size=-1):
+ self.flush()
+ if size == -1:
+ size = self.tell()
+ try:
+ rv = os.ftruncate(self.fileno(), size)
+ except OSError as e:
+ raise IOError(*e.args)
+ else:
+ self.seek(size) # move position&clear buffer
+ return rv
+
+ def isatty(self):
+ try:
+ return os.isatty(self.fileno())
+ except OSError as e:
+ raise IOError(*e.args)
+
+
+# import SSL module here so we can refer to greenio.SSL.exceptionclass
+try:
+ from OpenSSL import SSL
+except ImportError:
+ # pyOpenSSL not installed, define exceptions anyway for convenience
+ class SSL(object):
+ class WantWriteError(Exception):
+ pass
+
+ class WantReadError(Exception):
+ pass
+
+ class ZeroReturnError(Exception):
+ pass
+
+ class SysCallError(Exception):
+ pass
+
+
+def shutdown_safe(sock):
+ """ Shuts down the socket. This is a convenience method for
+ code that wants to gracefully handle regular sockets, SSL.Connection
+ sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
+ interchangeably. Both types of ssl socket require a shutdown() before
+ close, but they have different arity on their shutdown method.
+
+ Regular sockets don't need a shutdown before close, but it doesn't hurt.
+ """
+ try:
+ try:
+ # socket, ssl.SSLSocket
+ return sock.shutdown(socket.SHUT_RDWR)
+ except TypeError:
+ # SSL.Connection
+ return sock.shutdown()
+ except socket.error as e:
+ # we don't care if the socket is already closed;
+ # this will often be the case in an http server context
+ if get_errno(e) != errno.ENOTCONN:
+ raise