3 from socket import socket as _original_socket
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
12 __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
15 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
16 CONNECT_SUCCESS = set((0, errno.EISCONN))
17 if sys.platform[:3] == "win":
18 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
21 from io import IOBase as file
22 _fileobject = socket.SocketIO
24 _fileobject = socket._fileobject
27 def socket_connect(descriptor, address):
29 Attempts to connect to the address, returns the descriptor if it succeeds,
30 returns None if it needs to trampoline, and raises any exceptions.
32 err = descriptor.connect_ex(address)
33 if err in CONNECT_ERR:
35 if err not in CONNECT_SUCCESS:
36 raise socket.error(err, errno.errorcode[err])
40 def socket_checkerr(descriptor):
41 err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
42 if err not in CONNECT_SUCCESS:
43 raise socket.error(err, errno.errorcode[err])
46 def socket_accept(descriptor):
48 Attempts to accept() on the descriptor, returns a client,address tuple
49 if it succeeds; returns None if it needs to trampoline, and raises
53 return descriptor.accept()
54 except socket.error as e:
55 if get_errno(e) == errno.EWOULDBLOCK:
60 if sys.platform[:3] == "win":
61 # winsock sometimes throws ENOTCONN
62 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
63 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
65 # oddly, on linux/darwin, an unconnected socket is expected to block,
66 # so we treat ENOTCONN the same as EWOULDBLOCK
67 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
68 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
71 def set_nonblocking(fd):
73 Sets the descriptor to be nonblocking. Works on many file-like
74 objects as well as sockets. Only sockets can be nonblocking on
78 setblocking = fd.setblocking
79 except AttributeError:
80 # fd has no setblocking() method. It could be that this version of
81 # Python predates socket.setblocking(). In that case, we can still set
82 # the flag "by hand" on the underlying OS fileno using the fcntl
87 # Whoops, Windows has no fcntl module. This might not be a socket
88 # at all, but rather a file-like object with no setblocking()
89 # method. In particular, on Windows, pipes don't support
90 # non-blocking I/O and therefore don't have that method. Which
91 # means fcntl wouldn't help even if we could load it.
92 raise NotImplementedError("set_nonblocking() on a file object "
93 "with no setblocking() method "
94 "(Windows pipes don't support non-blocking I/O)")
95 # We managed to import fcntl.
97 orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
98 new_flags = orig_flags | os.O_NONBLOCK
99 if new_flags != orig_flags:
100 fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
102 # socket supports setblocking()
107 from socket import _GLOBAL_DEFAULT_TIMEOUT
109 _GLOBAL_DEFAULT_TIMEOUT = object()
112 class GreenSocket(object):
114 Green version of socket.socket class, that is intended to be 100%
117 It also recognizes the keyword parameter, 'set_nonblocking=True'.
118 Pass False to indicate that socket is already in non-blocking mode
122 def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
123 should_set_nonblocking = kwargs.pop('set_nonblocking', True)
124 if isinstance(family_or_realsock, six.integer_types):
125 fd = _original_socket(family_or_realsock, *args, **kwargs)
126 # Notify the hub that this is a newly-opened socket.
127 notify_opened(fd.fileno())
129 fd = family_or_realsock
131 # import timeout from other socket, if it was there
133 self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
134 except AttributeError:
135 self._timeout = socket.getdefaulttimeout()
137 if should_set_nonblocking:
140 # when client calls setblocking(0) or settimeout(0) the socket must
142 self.act_non_blocking = False
144 # Copy some attributes from underlying real socket.
145 # This is the easiest way that i found to fix
146 # https://bitbucket.org/eventlet/eventlet/issue/136
147 # Only `getsockopt` is required to fix that issue, others
148 # are just premature optimization to save __getattr__ call.
150 self.close = fd.close
151 self.fileno = fd.fileno
152 self.getsockname = fd.getsockname
153 self.getsockopt = fd.getsockopt
154 self.listen = fd.listen
155 self.setsockopt = fd.setsockopt
156 self.shutdown = fd.shutdown
163 # Forward unknown attributes to fd, cache the value for future use.
164 # I do not see any simple attribute which could be changed
165 # so caching everything in self is fine.
166 # If we find such attributes - only attributes having __get__ might be cached.
167 # For now - I do not want to complicate it.
168 def __getattr__(self, name):
169 attr = getattr(self.fd, name)
170 setattr(self, name, attr)
173 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
174 """ We need to trampoline via the event hub.
175 We catch any signal back from the hub indicating that the operation we
176 were waiting on was associated with a filehandle that's since been
180 # If we did any logging, alerting to a second trampoline attempt on a closed
181 # socket here would be useful.
184 return trampoline(fd, read=read, write=write, timeout=timeout,
185 timeout_exc=timeout_exc,
186 mark_as_closed=self._mark_as_closed)
188 # This socket's been obsoleted. De-fang it.
189 self._mark_as_closed()
193 if self.act_non_blocking:
194 return self.fd.accept()
197 res = socket_accept(fd)
200 set_nonblocking(client)
201 return type(self)(client), addr
202 self._trampoline(fd, read=True, timeout=self.gettimeout(),
203 timeout_exc=socket.timeout("timed out"))
205 def _mark_as_closed(self):
206 """ Mark this socket as being closed """
210 notify_close(self.fd)
211 self._mark_as_closed()
212 return self.fd.close()
217 def connect(self, address):
218 if self.act_non_blocking:
219 return self.fd.connect(address)
221 if self.gettimeout() is None:
222 while not socket_connect(fd, address):
224 self._trampoline(fd, write=True)
226 raise socket.error(errno.EBADFD)
229 end = time.time() + self.gettimeout()
231 if socket_connect(fd, address):
233 if time.time() >= end:
234 raise socket.timeout("timed out")
236 self._trampoline(fd, write=True, timeout=end - time.time(),
237 timeout_exc=socket.timeout("timed out"))
239 # ... we need some workable errno here.
240 raise socket.error(errno.EBADFD)
243 def connect_ex(self, address):
244 if self.act_non_blocking:
245 return self.fd.connect_ex(address)
247 if self.gettimeout() is None:
248 while not socket_connect(fd, address):
250 self._trampoline(fd, write=True)
252 except socket.error as ex:
257 end = time.time() + self.gettimeout()
260 if socket_connect(fd, address):
262 if time.time() >= end:
263 raise socket.timeout(errno.EAGAIN)
264 self._trampoline(fd, write=True, timeout=end - time.time(),
265 timeout_exc=socket.timeout(errno.EAGAIN))
267 except socket.error as ex:
272 def dup(self, *args, **kw):
273 sock = self.fd.dup(*args, **kw)
274 newsock = type(self)(sock, set_nonblocking=False)
275 newsock.settimeout(self.gettimeout())
278 def makefile(self, *args, **kw):
280 res = _fileobject(dupped, *args, **kw)
281 if hasattr(dupped, "_drop"):
285 def makeGreenFile(self, *args, **kw):
286 warnings.warn("makeGreenFile has been deprecated, please use "
287 "makefile instead", DeprecationWarning, stacklevel=2)
288 return self.makefile(*args, **kw)
290 def recv(self, buflen, flags=0):
292 if self.act_non_blocking:
293 return fd.recv(buflen, flags)
296 return fd.recv(buflen, flags)
297 except socket.error as e:
298 if get_errno(e) in SOCKET_BLOCKING:
300 elif get_errno(e) in SOCKET_CLOSED:
308 timeout=self.gettimeout(),
309 timeout_exc=socket.timeout("timed out"))
310 except IOClosed as e:
311 # Perhaps we should return '' instead?
314 def recvfrom(self, *args):
315 if not self.act_non_blocking:
316 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
317 timeout_exc=socket.timeout("timed out"))
318 return self.fd.recvfrom(*args)
320 def recvfrom_into(self, *args):
321 if not self.act_non_blocking:
322 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
323 timeout_exc=socket.timeout("timed out"))
324 return self.fd.recvfrom_into(*args)
326 def recv_into(self, *args):
327 if not self.act_non_blocking:
328 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
329 timeout_exc=socket.timeout("timed out"))
330 return self.fd.recv_into(*args)
332 def send(self, data, flags=0):
334 if self.act_non_blocking:
335 return fd.send(data, flags)
337 # blocking socket behavior - sends all, blocks if the buffer is full
342 total_sent += fd.send(data[total_sent:], flags)
343 except socket.error as e:
344 if get_errno(e) not in SOCKET_BLOCKING:
347 if total_sent == len_data:
351 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
352 timeout_exc=socket.timeout("timed out"))
354 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
358 def sendall(self, data, flags=0):
359 tail = self.send(data, flags)
361 while tail < len_data:
362 tail += self.send(data[tail:], flags)
364 def sendto(self, *args):
365 self._trampoline(self.fd, write=True)
366 return self.fd.sendto(*args)
368 def setblocking(self, flag):
370 self.act_non_blocking = False
373 self.act_non_blocking = True
376 def settimeout(self, howlong):
377 if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
378 self.setblocking(True)
381 f = howlong.__float__
382 except AttributeError:
383 raise TypeError('a float is required')
386 raise ValueError('Timeout value out of range')
388 self.act_non_blocking = True
391 self.act_non_blocking = False
392 self._timeout = howlong
394 def gettimeout(self):
397 if "__pypy__" in sys.builtin_module_names:
399 getattr(self.fd, '_sock', self.fd)._reuse()
402 getattr(self.fd, '_sock', self.fd)._drop()
405 class _SocketDuckForFd(object):
406 """Class implementing all socket method used by _fileobject
407 in cooperative manner using low level os I/O calls.
411 def __init__(self, fileno):
412 self._fileno = fileno
413 notify_opened(fileno)
416 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
418 # Don't trampoline if we're already closed.
421 return trampoline(fd, read=read, write=write, timeout=timeout,
422 timeout_exc=timeout_exc,
423 mark_as_closed=self._mark_as_closed)
425 # Our fileno has been obsoleted. Defang ourselves to
426 # prevent spurious closes.
427 self._mark_as_closed()
430 def _mark_as_closed(self):
440 def recv(self, buflen):
443 data = os.read(self._fileno, buflen)
446 if get_errno(e) not in SOCKET_BLOCKING:
447 raise IOError(*e.args)
448 self._trampoline(self, read=True)
450 def recv_into(self, buf, nbytes=0, flags=0):
453 data = self.recv(nbytes)
457 def send(self, data):
460 os.write(self._fileno, data)
462 if get_errno(e) not in SOCKET_BLOCKING:
463 raise IOError(*e.args)
464 trampoline(self, write=True)
466 def sendall(self, data):
469 fileno = self._fileno
471 total_sent = os_write(fileno, data)
473 if get_errno(e) != errno.EAGAIN:
474 raise IOError(*e.args)
476 while total_sent < len_data:
477 self._trampoline(self, write=True)
479 total_sent += os_write(fileno, data[total_sent:])
481 if get_errno(e) != errno. EAGAIN:
482 raise IOError(*e.args)
488 notify_close(self._fileno)
489 self._mark_as_closed()
491 os.close(self._fileno)
493 # os.close may fail if __init__ didn't complete
494 # (i.e file dscriptor passed to popen was invalid
498 return "%s:%d" % (self.__class__.__name__, self._fileno)
505 if self._refcount == 0:
508 _decref_socketios = _drop
511 def _operationOnClosedFile(*args, **kwargs):
512 raise ValueError("I/O operation on closed file")
515 class GreenPipe(_fileobject):
517 GreenPipe is a cooperative replacement for file class.
518 It will cooperate on pipes. It will block on regular file.
519 Differneces from file class:
520 - mode is r/w property. Should re r/o
521 - encoding property not implemented
522 - write/writelines will not raise TypeError exception when non-string data is written
523 it will write str(data) instead
524 - Universal new lines are not supported and newlines property not implementeded
525 - file argument can be descriptor, file name or file object.
528 def __init__(self, f, mode='r', bufsize=-1):
529 if not isinstance(f, six.string_types + (int, file)):
530 raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
532 if isinstance(f, six.string_types):
535 if isinstance(f, int):
537 self._name = "<fd:%d>" % fileno
539 fileno = os.dup(f.fileno())
542 raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
546 super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
547 set_nonblocking(self)
555 return "<%s %s %r, mode %r at 0x%x>" % (
556 self.closed and 'closed' or 'open',
557 self.__class__.__name__,
560 (id(self) < 0) and (sys.maxint + id(self)) or id(self))
563 super(GreenPipe, self).close()
565 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
566 'readline', 'readlines', 'seek', 'tell', 'truncate',
567 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
568 setattr(self, method, _operationOnClosedFile)
573 def __exit__(self, *args):
576 def _get_readahead_len(self):
577 return len(self._rbuf.getvalue())
579 def _clear_readahead_buf(self):
580 len = self._get_readahead_len()
587 return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
589 raise IOError(*e.args)
591 def seek(self, offset, whence=0):
593 if whence == 1 and offset == 0: # tell synonym
595 if whence == 1: # adjust offset by what is read ahead
596 offset -= self._get_readahead_len()
598 rv = os.lseek(self.fileno(), offset, whence)
600 raise IOError(*e.args)
602 self._clear_readahead_buf()
605 if getattr(file, "truncate", None): # not all OSes implement truncate
606 def truncate(self, size=-1):
611 rv = os.ftruncate(self.fileno(), size)
613 raise IOError(*e.args)
615 self.seek(size) # move position&clear buffer
620 return os.isatty(self.fileno())
622 raise IOError(*e.args)
625 # import SSL module here so we can refer to greenio.SSL.exceptionclass
627 from OpenSSL import SSL
629 # pyOpenSSL not installed, define exceptions anyway for convenience
631 class WantWriteError(Exception):
634 class WantReadError(Exception):
637 class ZeroReturnError(Exception):
640 class SysCallError(Exception):
644 def shutdown_safe(sock):
645 """ Shuts down the socket. This is a convenience method for
646 code that wants to gracefully handle regular sockets, SSL.Connection
647 sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
648 interchangeably. Both types of ssl socket require a shutdown() before
649 close, but they have different arity on their shutdown method.
651 Regular sockets don't need a shutdown before close, but it doesn't hurt.
655 # socket, ssl.SSLSocket
656 return sock.shutdown(socket.SHUT_RDWR)
659 return sock.shutdown()
660 except socket.error as e:
661 # we don't care if the socket is already closed;
662 # this will often be the case in an http server context
663 if get_errno(e) != errno.ENOTCONN: