3 from socket import socket as _original_socket
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
12 __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
15 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
16 CONNECT_SUCCESS = set((0, errno.EISCONN))
17 if sys.platform[:3] == "win":
18 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
21 from io import IOBase as file
22 _fileobject = socket.SocketIO
24 _fileobject = socket._fileobject
27 def socket_connect(descriptor, address):
29 Attempts to connect to the address, returns the descriptor if it succeeds,
30 returns None if it needs to trampoline, and raises any exceptions.
32 err = descriptor.connect_ex(address)
33 if err in CONNECT_ERR:
35 if err not in CONNECT_SUCCESS:
36 raise socket.error(err, errno.errorcode[err])
40 def socket_checkerr(descriptor):
41 err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
42 if err not in CONNECT_SUCCESS:
43 raise socket.error(err, errno.errorcode[err])
46 def socket_accept(descriptor):
48 Attempts to accept() on the descriptor, returns a client,address tuple
49 if it succeeds; returns None if it needs to trampoline, and raises
53 return descriptor.accept()
54 except socket.error as e:
55 if get_errno(e) == errno.EWOULDBLOCK:
60 if sys.platform[:3] == "win":
61 # winsock sometimes throws ENOTCONN
62 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
63 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
65 # oddly, on linux/darwin, an unconnected socket is expected to block,
66 # so we treat ENOTCONN the same as EWOULDBLOCK
67 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
68 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
71 def set_nonblocking(fd):
73 Sets the descriptor to be nonblocking. Works on many file-like
74 objects as well as sockets. Only sockets can be nonblocking on
78 setblocking = fd.setblocking
79 except AttributeError:
80 # fd has no setblocking() method. It could be that this version of
81 # Python predates socket.setblocking(). In that case, we can still set
82 # the flag "by hand" on the underlying OS fileno using the fcntl
87 # Whoops, Windows has no fcntl module. This might not be a socket
88 # at all, but rather a file-like object with no setblocking()
89 # method. In particular, on Windows, pipes don't support
90 # non-blocking I/O and therefore don't have that method. Which
91 # means fcntl wouldn't help even if we could load it.
92 raise NotImplementedError("set_nonblocking() on a file object "
93 "with no setblocking() method "
94 "(Windows pipes don't support non-blocking I/O)")
95 # We managed to import fcntl.
97 orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
98 new_flags = orig_flags | os.O_NONBLOCK
99 if new_flags != orig_flags:
100 fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
102 # socket supports setblocking()
107 from socket import _GLOBAL_DEFAULT_TIMEOUT
109 _GLOBAL_DEFAULT_TIMEOUT = object()
112 class GreenSocket(object):
114 Green version of socket.socket class, that is intended to be 100%
117 It also recognizes the keyword parameter, 'set_nonblocking=True'.
118 Pass False to indicate that socket is already in non-blocking mode
122 # This placeholder is to prevent __getattr__ from creating an infinite call loop
125 def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
126 should_set_nonblocking = kwargs.pop('set_nonblocking', True)
127 if isinstance(family_or_realsock, six.integer_types):
128 fd = _original_socket(family_or_realsock, *args, **kwargs)
129 # Notify the hub that this is a newly-opened socket.
130 notify_opened(fd.fileno())
132 fd = family_or_realsock
134 # import timeout from other socket, if it was there
136 self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
137 except AttributeError:
138 self._timeout = socket.getdefaulttimeout()
140 if should_set_nonblocking:
143 # when client calls setblocking(0) or settimeout(0) the socket must
145 self.act_non_blocking = False
147 # Copy some attributes from underlying real socket.
148 # This is the easiest way that i found to fix
149 # https://bitbucket.org/eventlet/eventlet/issue/136
150 # Only `getsockopt` is required to fix that issue, others
151 # are just premature optimization to save __getattr__ call.
153 self.close = fd.close
154 self.fileno = fd.fileno
155 self.getsockname = fd.getsockname
156 self.getsockopt = fd.getsockopt
157 self.listen = fd.listen
158 self.setsockopt = fd.setsockopt
159 self.shutdown = fd.shutdown
167 def _get_io_refs(self):
168 return self.fd._io_refs
170 def _set_io_refs(self, value):
171 self.fd._io_refs = value
173 _io_refs = property(_get_io_refs, _set_io_refs)
175 # Forward unknown attributes to fd, cache the value for future use.
176 # I do not see any simple attribute which could be changed
177 # so caching everything in self is fine.
178 # If we find such attributes - only attributes having __get__ might be cached.
179 # For now - I do not want to complicate it.
180 def __getattr__(self, name):
182 raise AttributeError(name)
183 attr = getattr(self.fd, name)
184 setattr(self, name, attr)
187 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
188 """ We need to trampoline via the event hub.
189 We catch any signal back from the hub indicating that the operation we
190 were waiting on was associated with a filehandle that's since been
194 # If we did any logging, alerting to a second trampoline attempt on a closed
195 # socket here would be useful.
198 return trampoline(fd, read=read, write=write, timeout=timeout,
199 timeout_exc=timeout_exc,
200 mark_as_closed=self._mark_as_closed)
202 # This socket's been obsoleted. De-fang it.
203 self._mark_as_closed()
207 if self.act_non_blocking:
208 return self.fd.accept()
211 res = socket_accept(fd)
214 set_nonblocking(client)
215 return type(self)(client), addr
216 self._trampoline(fd, read=True, timeout=self.gettimeout(),
217 timeout_exc=socket.timeout("timed out"))
219 def _mark_as_closed(self):
220 """ Mark this socket as being closed """
224 # This is in case self.close is not assigned yet (currently the constructor does it)
225 close = getattr(self, 'close', None)
226 if close is not None:
229 def connect(self, address):
230 if self.act_non_blocking:
231 return self.fd.connect(address)
233 if self.gettimeout() is None:
234 while not socket_connect(fd, address):
236 self._trampoline(fd, write=True)
238 raise socket.error(errno.EBADFD)
241 end = time.time() + self.gettimeout()
243 if socket_connect(fd, address):
245 if time.time() >= end:
246 raise socket.timeout("timed out")
248 self._trampoline(fd, write=True, timeout=end - time.time(),
249 timeout_exc=socket.timeout("timed out"))
251 # ... we need some workable errno here.
252 raise socket.error(errno.EBADFD)
255 def connect_ex(self, address):
256 if self.act_non_blocking:
257 return self.fd.connect_ex(address)
259 if self.gettimeout() is None:
260 while not socket_connect(fd, address):
262 self._trampoline(fd, write=True)
264 except socket.error as ex:
269 end = time.time() + self.gettimeout()
272 if socket_connect(fd, address):
274 if time.time() >= end:
275 raise socket.timeout(errno.EAGAIN)
276 self._trampoline(fd, write=True, timeout=end - time.time(),
277 timeout_exc=socket.timeout(errno.EAGAIN))
279 except socket.error as ex:
284 def dup(self, *args, **kw):
285 sock = self.fd.dup(*args, **kw)
286 newsock = type(self)(sock, set_nonblocking=False)
287 newsock.settimeout(self.gettimeout())
291 def makefile(self, *args, **kwargs):
292 return _original_socket.makefile(self, *args, **kwargs)
294 def makefile(self, *args, **kwargs):
296 res = _fileobject(dupped, *args, **kwargs)
297 if hasattr(dupped, "_drop"):
301 def makeGreenFile(self, *args, **kw):
302 warnings.warn("makeGreenFile has been deprecated, please use "
303 "makefile instead", DeprecationWarning, stacklevel=2)
304 return self.makefile(*args, **kw)
306 def recv(self, buflen, flags=0):
308 if self.act_non_blocking:
309 return fd.recv(buflen, flags)
312 return fd.recv(buflen, flags)
313 except socket.error as e:
314 if get_errno(e) in SOCKET_BLOCKING:
316 elif get_errno(e) in SOCKET_CLOSED:
324 timeout=self.gettimeout(),
325 timeout_exc=socket.timeout("timed out"))
326 except IOClosed as e:
327 # Perhaps we should return '' instead?
330 def recvfrom(self, *args):
331 if not self.act_non_blocking:
332 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
333 timeout_exc=socket.timeout("timed out"))
334 return self.fd.recvfrom(*args)
336 def recvfrom_into(self, *args):
337 if not self.act_non_blocking:
338 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
339 timeout_exc=socket.timeout("timed out"))
340 return self.fd.recvfrom_into(*args)
342 def recv_into(self, *args):
343 if not self.act_non_blocking:
344 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
345 timeout_exc=socket.timeout("timed out"))
346 return self.fd.recv_into(*args)
348 def send(self, data, flags=0):
350 if self.act_non_blocking:
351 return fd.send(data, flags)
353 # blocking socket behavior - sends all, blocks if the buffer is full
358 total_sent += fd.send(data[total_sent:], flags)
359 except socket.error as e:
360 if get_errno(e) not in SOCKET_BLOCKING:
363 if total_sent == len_data:
367 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
368 timeout_exc=socket.timeout("timed out"))
370 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
374 def sendall(self, data, flags=0):
375 tail = self.send(data, flags)
377 while tail < len_data:
378 tail += self.send(data[tail:], flags)
380 def sendto(self, *args):
381 self._trampoline(self.fd, write=True)
382 return self.fd.sendto(*args)
384 def setblocking(self, flag):
386 self.act_non_blocking = False
389 self.act_non_blocking = True
392 def settimeout(self, howlong):
393 if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
394 self.setblocking(True)
397 f = howlong.__float__
398 except AttributeError:
399 raise TypeError('a float is required')
402 raise ValueError('Timeout value out of range')
404 self.act_non_blocking = True
407 self.act_non_blocking = False
408 self._timeout = howlong
410 def gettimeout(self):
413 if "__pypy__" in sys.builtin_module_names:
415 getattr(self.fd, '_sock', self.fd)._reuse()
418 getattr(self.fd, '_sock', self.fd)._drop()
421 class _SocketDuckForFd(object):
422 """Class implementing all socket method used by _fileobject
423 in cooperative manner using low level os I/O calls.
427 def __init__(self, fileno):
428 self._fileno = fileno
429 notify_opened(fileno)
432 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
434 # Don't trampoline if we're already closed.
437 return trampoline(fd, read=read, write=write, timeout=timeout,
438 timeout_exc=timeout_exc,
439 mark_as_closed=self._mark_as_closed)
441 # Our fileno has been obsoleted. Defang ourselves to
442 # prevent spurious closes.
443 self._mark_as_closed()
446 def _mark_as_closed(self):
456 def recv(self, buflen):
459 data = os.read(self._fileno, buflen)
462 if get_errno(e) not in SOCKET_BLOCKING:
463 raise IOError(*e.args)
464 self._trampoline(self, read=True)
466 def recv_into(self, buf, nbytes=0, flags=0):
469 data = self.recv(nbytes)
473 def send(self, data):
476 return os.write(self._fileno, data)
478 if get_errno(e) not in SOCKET_BLOCKING:
479 raise IOError(*e.args)
481 trampoline(self, write=True)
483 def sendall(self, data):
486 fileno = self._fileno
488 total_sent = os_write(fileno, data)
490 if get_errno(e) != errno.EAGAIN:
491 raise IOError(*e.args)
493 while total_sent < len_data:
494 self._trampoline(self, write=True)
496 total_sent += os_write(fileno, data[total_sent:])
498 if get_errno(e) != errno. EAGAIN:
499 raise IOError(*e.args)
505 notify_close(self._fileno)
506 self._mark_as_closed()
508 os.close(self._fileno)
510 # os.close may fail if __init__ didn't complete
511 # (i.e file dscriptor passed to popen was invalid
515 return "%s:%d" % (self.__class__.__name__, self._fileno)
522 if self._refcount == 0:
525 _decref_socketios = _drop
528 def _operationOnClosedFile(*args, **kwargs):
529 raise ValueError("I/O operation on closed file")
532 class GreenPipe(_fileobject):
534 GreenPipe is a cooperative replacement for file class.
535 It will cooperate on pipes. It will block on regular file.
536 Differneces from file class:
537 - mode is r/w property. Should re r/o
538 - encoding property not implemented
539 - write/writelines will not raise TypeError exception when non-string data is written
540 it will write str(data) instead
541 - Universal new lines are not supported and newlines property not implementeded
542 - file argument can be descriptor, file name or file object.
545 def __init__(self, f, mode='r', bufsize=-1):
546 if not isinstance(f, six.string_types + (int, file)):
547 raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
549 if isinstance(f, six.string_types):
552 if isinstance(f, int):
554 self._name = "<fd:%d>" % fileno
556 fileno = os.dup(f.fileno())
559 raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
563 super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
564 set_nonblocking(self)
572 return "<%s %s %r, mode %r at 0x%x>" % (
573 self.closed and 'closed' or 'open',
574 self.__class__.__name__,
577 (id(self) < 0) and (sys.maxint + id(self)) or id(self))
580 super(GreenPipe, self).close()
582 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
583 'readline', 'readlines', 'seek', 'tell', 'truncate',
584 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
585 setattr(self, method, _operationOnClosedFile)
590 def __exit__(self, *args):
593 def _get_readahead_len(self):
594 return len(self._rbuf.getvalue())
596 def _clear_readahead_buf(self):
597 len = self._get_readahead_len()
604 return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
606 raise IOError(*e.args)
608 def seek(self, offset, whence=0):
610 if whence == 1 and offset == 0: # tell synonym
612 if whence == 1: # adjust offset by what is read ahead
613 offset -= self._get_readahead_len()
615 rv = os.lseek(self.fileno(), offset, whence)
617 raise IOError(*e.args)
619 self._clear_readahead_buf()
622 if getattr(file, "truncate", None): # not all OSes implement truncate
623 def truncate(self, size=-1):
628 rv = os.ftruncate(self.fileno(), size)
630 raise IOError(*e.args)
632 self.seek(size) # move position&clear buffer
637 return os.isatty(self.fileno())
639 raise IOError(*e.args)
642 # import SSL module here so we can refer to greenio.SSL.exceptionclass
644 from OpenSSL import SSL
646 # pyOpenSSL not installed, define exceptions anyway for convenience
648 class WantWriteError(Exception):
651 class WantReadError(Exception):
654 class ZeroReturnError(Exception):
657 class SysCallError(Exception):
661 def shutdown_safe(sock):
662 """ Shuts down the socket. This is a convenience method for
663 code that wants to gracefully handle regular sockets, SSL.Connection
664 sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
665 interchangeably. Both types of ssl socket require a shutdown() before
666 close, but they have different arity on their shutdown method.
668 Regular sockets don't need a shutdown before close, but it doesn't hurt.
672 # socket, ssl.SSLSocket
673 return sock.shutdown(socket.SHUT_RDWR)
676 return sock.shutdown()
677 except socket.error as e:
678 # we don't care if the socket is already closed;
679 # this will often be the case in an http server context
680 if get_errno(e) != errno.ENOTCONN: