import errno import os from socket import socket as _original_socket import socket import sys import time import warnings from eventlet.support import get_errno, six from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe'] BUFFER_SIZE = 4096 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)) CONNECT_SUCCESS = set((0, errno.EISCONN)) if sys.platform[:3] == "win": CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67 if six.PY3: from io import IOBase as file _fileobject = socket.SocketIO elif six.PY2: _fileobject = socket._fileobject def socket_connect(descriptor, address): """ Attempts to connect to the address, returns the descriptor if it succeeds, returns None if it needs to trampoline, and raises any exceptions. """ err = descriptor.connect_ex(address) if err in CONNECT_ERR: return None if err not in CONNECT_SUCCESS: raise socket.error(err, errno.errorcode[err]) return descriptor def socket_checkerr(descriptor): err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err not in CONNECT_SUCCESS: raise socket.error(err, errno.errorcode[err]) def socket_accept(descriptor): """ Attempts to accept() on the descriptor, returns a client,address tuple if it succeeds; returns None if it needs to trampoline, and raises any exceptions. """ try: return descriptor.accept() except socket.error as e: if get_errno(e) == errno.EWOULDBLOCK: return None raise if sys.platform[:3] == "win": # winsock sometimes throws ENOTCONN SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)) else: # oddly, on linux/darwin, an unconnected socket is expected to block, # so we treat ENOTCONN the same as EWOULDBLOCK SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE)) def set_nonblocking(fd): """ Sets the descriptor to be nonblocking. Works on many file-like objects as well as sockets. Only sockets can be nonblocking on Windows, however. """ try: setblocking = fd.setblocking except AttributeError: # fd has no setblocking() method. It could be that this version of # Python predates socket.setblocking(). In that case, we can still set # the flag "by hand" on the underlying OS fileno using the fcntl # module. try: import fcntl except ImportError: # Whoops, Windows has no fcntl module. This might not be a socket # at all, but rather a file-like object with no setblocking() # method. In particular, on Windows, pipes don't support # non-blocking I/O and therefore don't have that method. Which # means fcntl wouldn't help even if we could load it. raise NotImplementedError("set_nonblocking() on a file object " "with no setblocking() method " "(Windows pipes don't support non-blocking I/O)") # We managed to import fcntl. fileno = fd.fileno() orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL) new_flags = orig_flags | os.O_NONBLOCK if new_flags != orig_flags: fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags) else: # socket supports setblocking() setblocking(0) try: from socket import _GLOBAL_DEFAULT_TIMEOUT except ImportError: _GLOBAL_DEFAULT_TIMEOUT = object() class GreenSocket(object): """ Green version of socket.socket class, that is intended to be 100% API-compatible. It also recognizes the keyword parameter, 'set_nonblocking=True'. Pass False to indicate that socket is already in non-blocking mode to save syscalls. """ # This placeholder is to prevent __getattr__ from creating an infinite call loop fd = None def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs): should_set_nonblocking = kwargs.pop('set_nonblocking', True) if isinstance(family_or_realsock, six.integer_types): fd = _original_socket(family_or_realsock, *args, **kwargs) # Notify the hub that this is a newly-opened socket. notify_opened(fd.fileno()) else: fd = family_or_realsock # import timeout from other socket, if it was there try: self._timeout = fd.gettimeout() or socket.getdefaulttimeout() except AttributeError: self._timeout = socket.getdefaulttimeout() if should_set_nonblocking: set_nonblocking(fd) self.fd = fd # when client calls setblocking(0) or settimeout(0) the socket must # act non-blocking self.act_non_blocking = False # Copy some attributes from underlying real socket. # This is the easiest way that i found to fix # https://bitbucket.org/eventlet/eventlet/issue/136 # Only `getsockopt` is required to fix that issue, others # are just premature optimization to save __getattr__ call. self.bind = fd.bind self.close = fd.close self.fileno = fd.fileno self.getsockname = fd.getsockname self.getsockopt = fd.getsockopt self.listen = fd.listen self.setsockopt = fd.setsockopt self.shutdown = fd.shutdown self._closed = False @property def _sock(self): return self if six.PY3: def _get_io_refs(self): return self.fd._io_refs def _set_io_refs(self, value): self.fd._io_refs = value _io_refs = property(_get_io_refs, _set_io_refs) # Forward unknown attributes to fd, cache the value for future use. # I do not see any simple attribute which could be changed # so caching everything in self is fine. # If we find such attributes - only attributes having __get__ might be cached. # For now - I do not want to complicate it. def __getattr__(self, name): if self.fd is None: raise AttributeError(name) attr = getattr(self.fd, name) setattr(self, name, attr) return attr def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None): """ We need to trampoline via the event hub. We catch any signal back from the hub indicating that the operation we were waiting on was associated with a filehandle that's since been invalidated. """ if self._closed: # If we did any logging, alerting to a second trampoline attempt on a closed # socket here would be useful. raise IOClosed() try: return trampoline(fd, read=read, write=write, timeout=timeout, timeout_exc=timeout_exc, mark_as_closed=self._mark_as_closed) except IOClosed: # This socket's been obsoleted. De-fang it. self._mark_as_closed() raise def accept(self): if self.act_non_blocking: return self.fd.accept() fd = self.fd while True: res = socket_accept(fd) if res is not None: client, addr = res set_nonblocking(client) return type(self)(client), addr self._trampoline(fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) def _mark_as_closed(self): """ Mark this socket as being closed """ self._closed = True def __del__(self): # This is in case self.close is not assigned yet (currently the constructor does it) close = getattr(self, 'close', None) if close is not None: close() def connect(self, address): if self.act_non_blocking: return self.fd.connect(address) fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): try: self._trampoline(fd, write=True) except IOClosed: raise socket.error(errno.EBADFD) socket_checkerr(fd) else: end = time.time() + self.gettimeout() while True: if socket_connect(fd, address): return if time.time() >= end: raise socket.timeout("timed out") try: self._trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout("timed out")) except IOClosed: # ... we need some workable errno here. raise socket.error(errno.EBADFD) socket_checkerr(fd) def connect_ex(self, address): if self.act_non_blocking: return self.fd.connect_ex(address) fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): try: self._trampoline(fd, write=True) socket_checkerr(fd) except socket.error as ex: return get_errno(ex) except IOClosed: return errno.EBADFD else: end = time.time() + self.gettimeout() while True: try: if socket_connect(fd, address): return 0 if time.time() >= end: raise socket.timeout(errno.EAGAIN) self._trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout(errno.EAGAIN)) socket_checkerr(fd) except socket.error as ex: return get_errno(ex) except IOClosed: return errno.EBADFD def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) newsock = type(self)(sock, set_nonblocking=False) newsock.settimeout(self.gettimeout()) return newsock if six.PY3: def makefile(self, *args, **kwargs): return _original_socket.makefile(self, *args, **kwargs) else: def makefile(self, *args, **kwargs): dupped = self.dup() res = _fileobject(dupped, *args, **kwargs) if hasattr(dupped, "_drop"): dupped._drop() return res def makeGreenFile(self, *args, **kw): warnings.warn("makeGreenFile has been deprecated, please use " "makefile instead", DeprecationWarning, stacklevel=2) return self.makefile(*args, **kw) def recv(self, buflen, flags=0): fd = self.fd if self.act_non_blocking: return fd.recv(buflen, flags) while True: try: return fd.recv(buflen, flags) except socket.error as e: if get_errno(e) in SOCKET_BLOCKING: pass elif get_errno(e) in SOCKET_CLOSED: return '' else: raise try: self._trampoline( fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) except IOClosed as e: # Perhaps we should return '' instead? raise EOFError() def recvfrom(self, *args): if not self.act_non_blocking: self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom(*args) def recvfrom_into(self, *args): if not self.act_non_blocking: self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom_into(*args) def recv_into(self, *args): if not self.act_non_blocking: self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recv_into(*args) def send(self, data, flags=0): fd = self.fd if self.act_non_blocking: return fd.send(data, flags) # blocking socket behavior - sends all, blocks if the buffer is full total_sent = 0 len_data = len(data) while 1: try: total_sent += fd.send(data[total_sent:], flags) except socket.error as e: if get_errno(e) not in SOCKET_BLOCKING: raise if total_sent == len_data: break try: self._trampoline(self.fd, write=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) except IOClosed: raise socket.error(errno.ECONNRESET, 'Connection closed by another thread') return total_sent def sendall(self, data, flags=0): tail = self.send(data, flags) len_data = len(data) while tail < len_data: tail += self.send(data[tail:], flags) def sendto(self, *args): self._trampoline(self.fd, write=True) return self.fd.sendto(*args) def setblocking(self, flag): if flag: self.act_non_blocking = False self._timeout = None else: self.act_non_blocking = True self._timeout = 0.0 def settimeout(self, howlong): if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT: self.setblocking(True) return try: f = howlong.__float__ except AttributeError: raise TypeError('a float is required') howlong = f() if howlong < 0.0: raise ValueError('Timeout value out of range') if howlong == 0.0: self.act_non_blocking = True self._timeout = 0.0 else: self.act_non_blocking = False self._timeout = howlong def gettimeout(self): return self._timeout if "__pypy__" in sys.builtin_module_names: def _reuse(self): getattr(self.fd, '_sock', self.fd)._reuse() def _drop(self): getattr(self.fd, '_sock', self.fd)._drop() class _SocketDuckForFd(object): """Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls. """ _refcount = 0 def __init__(self, fileno): self._fileno = fileno notify_opened(fileno) self._closed = False def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None): if self._closed: # Don't trampoline if we're already closed. raise IOClosed() try: return trampoline(fd, read=read, write=write, timeout=timeout, timeout_exc=timeout_exc, mark_as_closed=self._mark_as_closed) except IOClosed: # Our fileno has been obsoleted. Defang ourselves to # prevent spurious closes. self._mark_as_closed() raise def _mark_as_closed(self): self._closed = True @property def _sock(self): return self def fileno(self): return self._fileno def recv(self, buflen): while True: try: data = os.read(self._fileno, buflen) return data except OSError as e: if get_errno(e) not in SOCKET_BLOCKING: raise IOError(*e.args) self._trampoline(self, read=True) def recv_into(self, buf, nbytes=0, flags=0): if nbytes == 0: nbytes = len(buf) data = self.recv(nbytes) buf[:nbytes] = data return len(data) def send(self, data): while True: try: return os.write(self._fileno, data) except OSError as e: if get_errno(e) not in SOCKET_BLOCKING: raise IOError(*e.args) else: trampoline(self, write=True) def sendall(self, data): len_data = len(data) os_write = os.write fileno = self._fileno try: total_sent = os_write(fileno, data) except OSError as e: if get_errno(e) != errno.EAGAIN: raise IOError(*e.args) total_sent = 0 while total_sent < len_data: self._trampoline(self, write=True) try: total_sent += os_write(fileno, data[total_sent:]) except OSError as e: if get_errno(e) != errno. EAGAIN: raise IOError(*e.args) def __del__(self): self._close() def _close(self): notify_close(self._fileno) self._mark_as_closed() try: os.close(self._fileno) except: # os.close may fail if __init__ didn't complete # (i.e file dscriptor passed to popen was invalid pass def __repr__(self): return "%s:%d" % (self.__class__.__name__, self._fileno) def _reuse(self): self._refcount += 1 def _drop(self): self._refcount -= 1 if self._refcount == 0: self._close() # Python3 _decref_socketios = _drop def _operationOnClosedFile(*args, **kwargs): raise ValueError("I/O operation on closed file") class GreenPipe(_fileobject): """ GreenPipe is a cooperative replacement for file class. It will cooperate on pipes. It will block on regular file. Differneces from file class: - mode is r/w property. Should re r/o - encoding property not implemented - write/writelines will not raise TypeError exception when non-string data is written it will write str(data) instead - Universal new lines are not supported and newlines property not implementeded - file argument can be descriptor, file name or file object. """ def __init__(self, f, mode='r', bufsize=-1): if not isinstance(f, six.string_types + (int, file)): raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f) if isinstance(f, six.string_types): f = open(f, mode, 0) if isinstance(f, int): fileno = f self._name = "" % fileno else: fileno = os.dup(f.fileno()) self._name = f.name if f.mode != mode: raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode)) self._name = f.name f.close() super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode) set_nonblocking(self) self.softspace = 0 @property def name(self): return self._name def __repr__(self): return "<%s %s %r, mode %r at 0x%x>" % ( self.closed and 'closed' or 'open', self.__class__.__name__, self.name, self.mode, (id(self) < 0) and (sys.maxint + id(self)) or id(self)) def close(self): super(GreenPipe, self).close() for method in [ 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 'readline', 'readlines', 'seek', 'tell', 'truncate', 'write', 'xreadlines', '__iter__', '__next__', 'writelines']: setattr(self, method, _operationOnClosedFile) def __enter__(self): return self def __exit__(self, *args): self.close() def _get_readahead_len(self): return len(self._rbuf.getvalue()) def _clear_readahead_buf(self): len = self._get_readahead_len() if len > 0: self.read(len) def tell(self): self.flush() try: return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len() except OSError as e: raise IOError(*e.args) def seek(self, offset, whence=0): self.flush() if whence == 1 and offset == 0: # tell synonym return self.tell() if whence == 1: # adjust offset by what is read ahead offset -= self._get_readahead_len() try: rv = os.lseek(self.fileno(), offset, whence) except OSError as e: raise IOError(*e.args) else: self._clear_readahead_buf() return rv if getattr(file, "truncate", None): # not all OSes implement truncate def truncate(self, size=-1): self.flush() if size == -1: size = self.tell() try: rv = os.ftruncate(self.fileno(), size) except OSError as e: raise IOError(*e.args) else: self.seek(size) # move position&clear buffer return rv def isatty(self): try: return os.isatty(self.fileno()) except OSError as e: raise IOError(*e.args) # import SSL module here so we can refer to greenio.SSL.exceptionclass try: from OpenSSL import SSL except ImportError: # pyOpenSSL not installed, define exceptions anyway for convenience class SSL(object): class WantWriteError(Exception): pass class WantReadError(Exception): pass class ZeroReturnError(Exception): pass class SysCallError(Exception): pass def shutdown_safe(sock): """ Shuts down the socket. This is a convenience method for code that wants to gracefully handle regular sockets, SSL.Connection sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6 interchangeably. Both types of ssl socket require a shutdown() before close, but they have different arity on their shutdown method. Regular sockets don't need a shutdown before close, but it doesn't hurt. """ try: try: # socket, ssl.SSLSocket return sock.shutdown(socket.SHUT_RDWR) except TypeError: # SSL.Connection return sock.shutdown() except socket.error as e: # we don't care if the socket is already closed; # this will often be the case in an http server context if get_errno(e) != errno.ENOTCONN: raise