Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / greenio.py
diff --git a/eventlet/eventlet/greenio.py b/eventlet/eventlet/greenio.py
new file mode 100644 (file)
index 0000000..f44096e
--- /dev/null
@@ -0,0 +1,681 @@
+import errno
+import os
+from socket import socket as _original_socket
+import socket
+import sys
+import time
+import warnings
+
+from eventlet.support import get_errno, six
+from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
+
+__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
+
+BUFFER_SIZE = 4096
+CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
+CONNECT_SUCCESS = set((0, errno.EISCONN))
+if sys.platform[:3] == "win":
+    CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
+
+if six.PY3:
+    from io import IOBase as file
+    _fileobject = socket.SocketIO
+elif six.PY2:
+    _fileobject = socket._fileobject
+
+
+def socket_connect(descriptor, address):
+    """
+    Attempts to connect to the address, returns the descriptor if it succeeds,
+    returns None if it needs to trampoline, and raises any exceptions.
+    """
+    err = descriptor.connect_ex(address)
+    if err in CONNECT_ERR:
+        return None
+    if err not in CONNECT_SUCCESS:
+        raise socket.error(err, errno.errorcode[err])
+    return descriptor
+
+
+def socket_checkerr(descriptor):
+    err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+    if err not in CONNECT_SUCCESS:
+        raise socket.error(err, errno.errorcode[err])
+
+
+def socket_accept(descriptor):
+    """
+    Attempts to accept() on the descriptor, returns a client,address tuple
+    if it succeeds; returns None if it needs to trampoline, and raises
+    any exceptions.
+    """
+    try:
+        return descriptor.accept()
+    except socket.error as e:
+        if get_errno(e) == errno.EWOULDBLOCK:
+            return None
+        raise
+
+
+if sys.platform[:3] == "win":
+    # winsock sometimes throws ENOTCONN
+    SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
+    SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
+else:
+    # oddly, on linux/darwin, an unconnected socket is expected to block,
+    # so we treat ENOTCONN the same as EWOULDBLOCK
+    SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
+    SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
+
+
+def set_nonblocking(fd):
+    """
+    Sets the descriptor to be nonblocking.  Works on many file-like
+    objects as well as sockets.  Only sockets can be nonblocking on
+    Windows, however.
+    """
+    try:
+        setblocking = fd.setblocking
+    except AttributeError:
+        # fd has no setblocking() method. It could be that this version of
+        # Python predates socket.setblocking(). In that case, we can still set
+        # the flag "by hand" on the underlying OS fileno using the fcntl
+        # module.
+        try:
+            import fcntl
+        except ImportError:
+            # Whoops, Windows has no fcntl module. This might not be a socket
+            # at all, but rather a file-like object with no setblocking()
+            # method. In particular, on Windows, pipes don't support
+            # non-blocking I/O and therefore don't have that method. Which
+            # means fcntl wouldn't help even if we could load it.
+            raise NotImplementedError("set_nonblocking() on a file object "
+                                      "with no setblocking() method "
+                                      "(Windows pipes don't support non-blocking I/O)")
+        # We managed to import fcntl.
+        fileno = fd.fileno()
+        orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
+        new_flags = orig_flags | os.O_NONBLOCK
+        if new_flags != orig_flags:
+            fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
+    else:
+        # socket supports setblocking()
+        setblocking(0)
+
+
+try:
+    from socket import _GLOBAL_DEFAULT_TIMEOUT
+except ImportError:
+    _GLOBAL_DEFAULT_TIMEOUT = object()
+
+
+class GreenSocket(object):
+    """
+    Green version of socket.socket class, that is intended to be 100%
+    API-compatible.
+
+    It also recognizes the keyword parameter, 'set_nonblocking=True'.
+    Pass False to indicate that socket is already in non-blocking mode
+    to save syscalls.
+    """
+
+    # This placeholder is to prevent __getattr__ from creating an infinite call loop
+    fd = None
+
+    def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
+        should_set_nonblocking = kwargs.pop('set_nonblocking', True)
+        if isinstance(family_or_realsock, six.integer_types):
+            fd = _original_socket(family_or_realsock, *args, **kwargs)
+            # Notify the hub that this is a newly-opened socket.
+            notify_opened(fd.fileno())
+        else:
+            fd = family_or_realsock
+
+        # import timeout from other socket, if it was there
+        try:
+            self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
+        except AttributeError:
+            self._timeout = socket.getdefaulttimeout()
+
+        if should_set_nonblocking:
+            set_nonblocking(fd)
+        self.fd = fd
+        # when client calls setblocking(0) or settimeout(0) the socket must
+        # act non-blocking
+        self.act_non_blocking = False
+
+        # Copy some attributes from underlying real socket.
+        # This is the easiest way that i found to fix
+        # https://bitbucket.org/eventlet/eventlet/issue/136
+        # Only `getsockopt` is required to fix that issue, others
+        # are just premature optimization to save __getattr__ call.
+        self.bind = fd.bind
+        self.close = fd.close
+        self.fileno = fd.fileno
+        self.getsockname = fd.getsockname
+        self.getsockopt = fd.getsockopt
+        self.listen = fd.listen
+        self.setsockopt = fd.setsockopt
+        self.shutdown = fd.shutdown
+        self._closed = False
+
+    @property
+    def _sock(self):
+        return self
+
+    if six.PY3:
+        def _get_io_refs(self):
+            return self.fd._io_refs
+
+        def _set_io_refs(self, value):
+            self.fd._io_refs = value
+
+        _io_refs = property(_get_io_refs, _set_io_refs)
+
+    # Forward unknown attributes to fd, cache the value for future use.
+    # I do not see any simple attribute which could be changed
+    # so caching everything in self is fine.
+    # If we find such attributes - only attributes having __get__ might be cached.
+    # For now - I do not want to complicate it.
+    def __getattr__(self, name):
+        if self.fd is None:
+            raise AttributeError(name)
+        attr = getattr(self.fd, name)
+        setattr(self, name, attr)
+        return attr
+
+    def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
+        """ We need to trampoline via the event hub.
+            We catch any signal back from the hub indicating that the operation we
+            were waiting on was associated with a filehandle that's since been
+            invalidated.
+        """
+        if self._closed:
+            # If we did any logging, alerting to a second trampoline attempt on a closed
+            # socket here would be useful.
+            raise IOClosed()
+        try:
+            return trampoline(fd, read=read, write=write, timeout=timeout,
+                              timeout_exc=timeout_exc,
+                              mark_as_closed=self._mark_as_closed)
+        except IOClosed:
+            # This socket's been obsoleted. De-fang it.
+            self._mark_as_closed()
+            raise
+
+    def accept(self):
+        if self.act_non_blocking:
+            return self.fd.accept()
+        fd = self.fd
+        while True:
+            res = socket_accept(fd)
+            if res is not None:
+                client, addr = res
+                set_nonblocking(client)
+                return type(self)(client), addr
+            self._trampoline(fd, read=True, timeout=self.gettimeout(),
+                             timeout_exc=socket.timeout("timed out"))
+
+    def _mark_as_closed(self):
+        """ Mark this socket as being closed """
+        self._closed = True
+
+    def __del__(self):
+        # This is in case self.close is not assigned yet (currently the constructor does it)
+        close = getattr(self, 'close', None)
+        if close is not None:
+            close()
+
+    def connect(self, address):
+        if self.act_non_blocking:
+            return self.fd.connect(address)
+        fd = self.fd
+        if self.gettimeout() is None:
+            while not socket_connect(fd, address):
+                try:
+                    self._trampoline(fd, write=True)
+                except IOClosed:
+                    raise socket.error(errno.EBADFD)
+                socket_checkerr(fd)
+        else:
+            end = time.time() + self.gettimeout()
+            while True:
+                if socket_connect(fd, address):
+                    return
+                if time.time() >= end:
+                    raise socket.timeout("timed out")
+                try:
+                    self._trampoline(fd, write=True, timeout=end - time.time(),
+                                     timeout_exc=socket.timeout("timed out"))
+                except IOClosed:
+                    # ... we need some workable errno here.
+                    raise socket.error(errno.EBADFD)
+                socket_checkerr(fd)
+
+    def connect_ex(self, address):
+        if self.act_non_blocking:
+            return self.fd.connect_ex(address)
+        fd = self.fd
+        if self.gettimeout() is None:
+            while not socket_connect(fd, address):
+                try:
+                    self._trampoline(fd, write=True)
+                    socket_checkerr(fd)
+                except socket.error as ex:
+                    return get_errno(ex)
+                except IOClosed:
+                    return errno.EBADFD
+        else:
+            end = time.time() + self.gettimeout()
+            while True:
+                try:
+                    if socket_connect(fd, address):
+                        return 0
+                    if time.time() >= end:
+                        raise socket.timeout(errno.EAGAIN)
+                    self._trampoline(fd, write=True, timeout=end - time.time(),
+                                     timeout_exc=socket.timeout(errno.EAGAIN))
+                    socket_checkerr(fd)
+                except socket.error as ex:
+                    return get_errno(ex)
+                except IOClosed:
+                    return errno.EBADFD
+
+    def dup(self, *args, **kw):
+        sock = self.fd.dup(*args, **kw)
+        newsock = type(self)(sock, set_nonblocking=False)
+        newsock.settimeout(self.gettimeout())
+        return newsock
+
+    if six.PY3:
+        def makefile(self, *args, **kwargs):
+            return _original_socket.makefile(self, *args, **kwargs)
+    else:
+        def makefile(self, *args, **kwargs):
+            dupped = self.dup()
+            res = _fileobject(dupped, *args, **kwargs)
+            if hasattr(dupped, "_drop"):
+                dupped._drop()
+            return res
+
+    def makeGreenFile(self, *args, **kw):
+        warnings.warn("makeGreenFile has been deprecated, please use "
+                      "makefile instead", DeprecationWarning, stacklevel=2)
+        return self.makefile(*args, **kw)
+
+    def recv(self, buflen, flags=0):
+        fd = self.fd
+        if self.act_non_blocking:
+            return fd.recv(buflen, flags)
+        while True:
+            try:
+                return fd.recv(buflen, flags)
+            except socket.error as e:
+                if get_errno(e) in SOCKET_BLOCKING:
+                    pass
+                elif get_errno(e) in SOCKET_CLOSED:
+                    return ''
+                else:
+                    raise
+            try:
+                self._trampoline(
+                    fd,
+                    read=True,
+                    timeout=self.gettimeout(),
+                    timeout_exc=socket.timeout("timed out"))
+            except IOClosed as e:
+                # Perhaps we should return '' instead?
+                raise EOFError()
+
+    def recvfrom(self, *args):
+        if not self.act_non_blocking:
+            self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+                             timeout_exc=socket.timeout("timed out"))
+        return self.fd.recvfrom(*args)
+
+    def recvfrom_into(self, *args):
+        if not self.act_non_blocking:
+            self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+                             timeout_exc=socket.timeout("timed out"))
+        return self.fd.recvfrom_into(*args)
+
+    def recv_into(self, *args):
+        if not self.act_non_blocking:
+            self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
+                             timeout_exc=socket.timeout("timed out"))
+        return self.fd.recv_into(*args)
+
+    def send(self, data, flags=0):
+        fd = self.fd
+        if self.act_non_blocking:
+            return fd.send(data, flags)
+
+        # blocking socket behavior - sends all, blocks if the buffer is full
+        total_sent = 0
+        len_data = len(data)
+        while 1:
+            try:
+                total_sent += fd.send(data[total_sent:], flags)
+            except socket.error as e:
+                if get_errno(e) not in SOCKET_BLOCKING:
+                    raise
+
+            if total_sent == len_data:
+                break
+
+            try:
+                self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
+                                 timeout_exc=socket.timeout("timed out"))
+            except IOClosed:
+                raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
+
+        return total_sent
+
+    def sendall(self, data, flags=0):
+        tail = self.send(data, flags)
+        len_data = len(data)
+        while tail < len_data:
+            tail += self.send(data[tail:], flags)
+
+    def sendto(self, *args):
+        self._trampoline(self.fd, write=True)
+        return self.fd.sendto(*args)
+
+    def setblocking(self, flag):
+        if flag:
+            self.act_non_blocking = False
+            self._timeout = None
+        else:
+            self.act_non_blocking = True
+            self._timeout = 0.0
+
+    def settimeout(self, howlong):
+        if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
+            self.setblocking(True)
+            return
+        try:
+            f = howlong.__float__
+        except AttributeError:
+            raise TypeError('a float is required')
+        howlong = f()
+        if howlong < 0.0:
+            raise ValueError('Timeout value out of range')
+        if howlong == 0.0:
+            self.act_non_blocking = True
+            self._timeout = 0.0
+        else:
+            self.act_non_blocking = False
+            self._timeout = howlong
+
+    def gettimeout(self):
+        return self._timeout
+
+    if "__pypy__" in sys.builtin_module_names:
+        def _reuse(self):
+            getattr(self.fd, '_sock', self.fd)._reuse()
+
+        def _drop(self):
+            getattr(self.fd, '_sock', self.fd)._drop()
+
+
+class _SocketDuckForFd(object):
+    """Class implementing all socket method used by _fileobject
+    in cooperative manner using low level os I/O calls.
+    """
+    _refcount = 0
+
+    def __init__(self, fileno):
+        self._fileno = fileno
+        notify_opened(fileno)
+        self._closed = False
+
+    def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
+        if self._closed:
+            # Don't trampoline if we're already closed.
+            raise IOClosed()
+        try:
+            return trampoline(fd, read=read, write=write, timeout=timeout,
+                              timeout_exc=timeout_exc,
+                              mark_as_closed=self._mark_as_closed)
+        except IOClosed:
+            # Our fileno has been obsoleted. Defang ourselves to
+            # prevent spurious closes.
+            self._mark_as_closed()
+            raise
+
+    def _mark_as_closed(self):
+        self._closed = True
+
+    @property
+    def _sock(self):
+        return self
+
+    def fileno(self):
+        return self._fileno
+
+    def recv(self, buflen):
+        while True:
+            try:
+                data = os.read(self._fileno, buflen)
+                return data
+            except OSError as e:
+                if get_errno(e) not in SOCKET_BLOCKING:
+                    raise IOError(*e.args)
+            self._trampoline(self, read=True)
+
+    def recv_into(self, buf, nbytes=0, flags=0):
+        if nbytes == 0:
+            nbytes = len(buf)
+        data = self.recv(nbytes)
+        buf[:nbytes] = data
+        return len(data)
+
+    def send(self, data):
+        while True:
+            try:
+                return os.write(self._fileno, data)
+            except OSError as e:
+                if get_errno(e) not in SOCKET_BLOCKING:
+                    raise IOError(*e.args)
+                else:
+                    trampoline(self, write=True)
+
+    def sendall(self, data):
+        len_data = len(data)
+        os_write = os.write
+        fileno = self._fileno
+        try:
+            total_sent = os_write(fileno, data)
+        except OSError as e:
+            if get_errno(e) != errno.EAGAIN:
+                raise IOError(*e.args)
+            total_sent = 0
+        while total_sent < len_data:
+            self._trampoline(self, write=True)
+            try:
+                total_sent += os_write(fileno, data[total_sent:])
+            except OSError as e:
+                if get_errno(e) != errno. EAGAIN:
+                    raise IOError(*e.args)
+
+    def __del__(self):
+        self._close()
+
+    def _close(self):
+        notify_close(self._fileno)
+        self._mark_as_closed()
+        try:
+            os.close(self._fileno)
+        except:
+            # os.close may fail if __init__ didn't complete
+            # (i.e file dscriptor passed to popen was invalid
+            pass
+
+    def __repr__(self):
+        return "%s:%d" % (self.__class__.__name__, self._fileno)
+
+    def _reuse(self):
+        self._refcount += 1
+
+    def _drop(self):
+        self._refcount -= 1
+        if self._refcount == 0:
+            self._close()
+    # Python3
+    _decref_socketios = _drop
+
+
+def _operationOnClosedFile(*args, **kwargs):
+    raise ValueError("I/O operation on closed file")
+
+
+class GreenPipe(_fileobject):
+    """
+    GreenPipe is a cooperative replacement for file class.
+    It will cooperate on pipes. It will block on regular file.
+    Differneces from file class:
+    - mode is r/w property. Should re r/o
+    - encoding property not implemented
+    - write/writelines will not raise TypeError exception when non-string data is written
+      it will write str(data) instead
+    - Universal new lines are not supported and newlines property not implementeded
+    - file argument can be descriptor, file name or file object.
+    """
+
+    def __init__(self, f, mode='r', bufsize=-1):
+        if not isinstance(f, six.string_types + (int, file)):
+            raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
+
+        if isinstance(f, six.string_types):
+            f = open(f, mode, 0)
+
+        if isinstance(f, int):
+            fileno = f
+            self._name = "<fd:%d>" % fileno
+        else:
+            fileno = os.dup(f.fileno())
+            self._name = f.name
+            if f.mode != mode:
+                raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
+            self._name = f.name
+            f.close()
+
+        super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
+        set_nonblocking(self)
+        self.softspace = 0
+
+    @property
+    def name(self):
+        return self._name
+
+    def __repr__(self):
+        return "<%s %s %r, mode %r at 0x%x>" % (
+            self.closed and 'closed' or 'open',
+            self.__class__.__name__,
+            self.name,
+            self.mode,
+            (id(self) < 0) and (sys.maxint + id(self)) or id(self))
+
+    def close(self):
+        super(GreenPipe, self).close()
+        for method in [
+                'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
+                'readline', 'readlines', 'seek', 'tell', 'truncate',
+                'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
+            setattr(self, method, _operationOnClosedFile)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def _get_readahead_len(self):
+        return len(self._rbuf.getvalue())
+
+    def _clear_readahead_buf(self):
+        len = self._get_readahead_len()
+        if len > 0:
+            self.read(len)
+
+    def tell(self):
+        self.flush()
+        try:
+            return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
+        except OSError as e:
+            raise IOError(*e.args)
+
+    def seek(self, offset, whence=0):
+        self.flush()
+        if whence == 1 and offset == 0:  # tell synonym
+            return self.tell()
+        if whence == 1:  # adjust offset by what is read ahead
+            offset -= self._get_readahead_len()
+        try:
+            rv = os.lseek(self.fileno(), offset, whence)
+        except OSError as e:
+            raise IOError(*e.args)
+        else:
+            self._clear_readahead_buf()
+            return rv
+
+    if getattr(file, "truncate", None):  # not all OSes implement truncate
+        def truncate(self, size=-1):
+            self.flush()
+            if size == -1:
+                size = self.tell()
+            try:
+                rv = os.ftruncate(self.fileno(), size)
+            except OSError as e:
+                raise IOError(*e.args)
+            else:
+                self.seek(size)  # move position&clear buffer
+                return rv
+
+    def isatty(self):
+        try:
+            return os.isatty(self.fileno())
+        except OSError as e:
+            raise IOError(*e.args)
+
+
+# import SSL module here so we can refer to greenio.SSL.exceptionclass
+try:
+    from OpenSSL import SSL
+except ImportError:
+    # pyOpenSSL not installed, define exceptions anyway for convenience
+    class SSL(object):
+        class WantWriteError(Exception):
+            pass
+
+        class WantReadError(Exception):
+            pass
+
+        class ZeroReturnError(Exception):
+            pass
+
+        class SysCallError(Exception):
+            pass
+
+
+def shutdown_safe(sock):
+    """ Shuts down the socket. This is a convenience method for
+    code that wants to gracefully handle regular sockets, SSL.Connection
+    sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
+    interchangeably.  Both types of ssl socket require a shutdown() before
+    close, but they have different arity on their shutdown method.
+
+    Regular sockets don't need a shutdown before close, but it doesn't hurt.
+    """
+    try:
+        try:
+            # socket, ssl.SSLSocket
+            return sock.shutdown(socket.SHUT_RDWR)
+        except TypeError:
+            # SSL.Connection
+            return sock.shutdown()
+    except socket.error as e:
+        # we don't care if the socket is already closed;
+        # this will often be the case in an http server context
+        if get_errno(e) != errno.ENOTCONN:
+            raise