3 from socket import socket as _original_socket
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_opened, IOClosed
13 'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
14 'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
15 'shutdown_safe', 'SSL',
19 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
20 CONNECT_SUCCESS = set((0, errno.EISCONN))
21 if sys.platform[:3] == "win":
22 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
25 _python2_fileobject = socket._fileobject
28 def socket_connect(descriptor, address):
30 Attempts to connect to the address, returns the descriptor if it succeeds,
31 returns None if it needs to trampoline, and raises any exceptions.
33 err = descriptor.connect_ex(address)
34 if err in CONNECT_ERR:
36 if err not in CONNECT_SUCCESS:
37 raise socket.error(err, errno.errorcode[err])
41 def socket_checkerr(descriptor):
42 err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
43 if err not in CONNECT_SUCCESS:
44 raise socket.error(err, errno.errorcode[err])
47 def socket_accept(descriptor):
49 Attempts to accept() on the descriptor, returns a client,address tuple
50 if it succeeds; returns None if it needs to trampoline, and raises
54 return descriptor.accept()
55 except socket.error as e:
56 if get_errno(e) == errno.EWOULDBLOCK:
61 if sys.platform[:3] == "win":
62 # winsock sometimes throws ENOTCONN
63 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
64 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
66 # oddly, on linux/darwin, an unconnected socket is expected to block,
67 # so we treat ENOTCONN the same as EWOULDBLOCK
68 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
69 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
72 def set_nonblocking(fd):
74 Sets the descriptor to be nonblocking. Works on many file-like
75 objects as well as sockets. Only sockets can be nonblocking on
79 setblocking = fd.setblocking
80 except AttributeError:
81 # fd has no setblocking() method. It could be that this version of
82 # Python predates socket.setblocking(). In that case, we can still set
83 # the flag "by hand" on the underlying OS fileno using the fcntl
88 # Whoops, Windows has no fcntl module. This might not be a socket
89 # at all, but rather a file-like object with no setblocking()
90 # method. In particular, on Windows, pipes don't support
91 # non-blocking I/O and therefore don't have that method. Which
92 # means fcntl wouldn't help even if we could load it.
93 raise NotImplementedError("set_nonblocking() on a file object "
94 "with no setblocking() method "
95 "(Windows pipes don't support non-blocking I/O)")
96 # We managed to import fcntl.
98 orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
99 new_flags = orig_flags | os.O_NONBLOCK
100 if new_flags != orig_flags:
101 fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
103 # socket supports setblocking()
108 from socket import _GLOBAL_DEFAULT_TIMEOUT
110 _GLOBAL_DEFAULT_TIMEOUT = object()
113 class GreenSocket(object):
115 Green version of socket.socket class, that is intended to be 100%
118 It also recognizes the keyword parameter, 'set_nonblocking=True'.
119 Pass False to indicate that socket is already in non-blocking mode
123 # This placeholder is to prevent __getattr__ from creating an infinite call loop
126 def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
127 should_set_nonblocking = kwargs.pop('set_nonblocking', True)
128 if isinstance(family_or_realsock, six.integer_types):
129 fd = _original_socket(family_or_realsock, *args, **kwargs)
130 # Notify the hub that this is a newly-opened socket.
131 notify_opened(fd.fileno())
133 fd = family_or_realsock
135 # import timeout from other socket, if it was there
137 self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
138 except AttributeError:
139 self._timeout = socket.getdefaulttimeout()
141 if should_set_nonblocking:
144 # when client calls setblocking(0) or settimeout(0) the socket must
146 self.act_non_blocking = False
148 # Copy some attributes from underlying real socket.
149 # This is the easiest way that i found to fix
150 # https://bitbucket.org/eventlet/eventlet/issue/136
151 # Only `getsockopt` is required to fix that issue, others
152 # are just premature optimization to save __getattr__ call.
154 self.close = fd.close
155 self.fileno = fd.fileno
156 self.getsockname = fd.getsockname
157 self.getsockopt = fd.getsockopt
158 self.listen = fd.listen
159 self.setsockopt = fd.setsockopt
160 self.shutdown = fd.shutdown
168 def _get_io_refs(self):
169 return self.fd._io_refs
171 def _set_io_refs(self, value):
172 self.fd._io_refs = value
174 _io_refs = property(_get_io_refs, _set_io_refs)
176 # Forward unknown attributes to fd, cache the value for future use.
177 # I do not see any simple attribute which could be changed
178 # so caching everything in self is fine.
179 # If we find such attributes - only attributes having __get__ might be cached.
180 # For now - I do not want to complicate it.
181 def __getattr__(self, name):
183 raise AttributeError(name)
184 attr = getattr(self.fd, name)
185 setattr(self, name, attr)
188 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
189 """ We need to trampoline via the event hub.
190 We catch any signal back from the hub indicating that the operation we
191 were waiting on was associated with a filehandle that's since been
195 # If we did any logging, alerting to a second trampoline attempt on a closed
196 # socket here would be useful.
199 return trampoline(fd, read=read, write=write, timeout=timeout,
200 timeout_exc=timeout_exc,
201 mark_as_closed=self._mark_as_closed)
203 # This socket's been obsoleted. De-fang it.
204 self._mark_as_closed()
208 if self.act_non_blocking:
209 return self.fd.accept()
212 res = socket_accept(fd)
215 set_nonblocking(client)
216 return type(self)(client), addr
217 self._trampoline(fd, read=True, timeout=self.gettimeout(),
218 timeout_exc=socket.timeout("timed out"))
220 def _mark_as_closed(self):
221 """ Mark this socket as being closed """
225 # This is in case self.close is not assigned yet (currently the constructor does it)
226 close = getattr(self, 'close', None)
227 if close is not None:
230 def connect(self, address):
231 if self.act_non_blocking:
232 return self.fd.connect(address)
234 if self.gettimeout() is None:
235 while not socket_connect(fd, address):
237 self._trampoline(fd, write=True)
239 raise socket.error(errno.EBADFD)
242 end = time.time() + self.gettimeout()
244 if socket_connect(fd, address):
246 if time.time() >= end:
247 raise socket.timeout("timed out")
249 self._trampoline(fd, write=True, timeout=end - time.time(),
250 timeout_exc=socket.timeout("timed out"))
252 # ... we need some workable errno here.
253 raise socket.error(errno.EBADFD)
256 def connect_ex(self, address):
257 if self.act_non_blocking:
258 return self.fd.connect_ex(address)
260 if self.gettimeout() is None:
261 while not socket_connect(fd, address):
263 self._trampoline(fd, write=True)
265 except socket.error as ex:
270 end = time.time() + self.gettimeout()
273 if socket_connect(fd, address):
275 if time.time() >= end:
276 raise socket.timeout(errno.EAGAIN)
277 self._trampoline(fd, write=True, timeout=end - time.time(),
278 timeout_exc=socket.timeout(errno.EAGAIN))
280 except socket.error as ex:
285 def dup(self, *args, **kw):
286 sock = self.fd.dup(*args, **kw)
287 newsock = type(self)(sock, set_nonblocking=False)
288 newsock.settimeout(self.gettimeout())
292 def makefile(self, *args, **kwargs):
293 return _original_socket.makefile(self, *args, **kwargs)
295 def makefile(self, *args, **kwargs):
297 res = _python2_fileobject(dupped, *args, **kwargs)
298 if hasattr(dupped, "_drop"):
302 def makeGreenFile(self, *args, **kw):
303 warnings.warn("makeGreenFile has been deprecated, please use "
304 "makefile instead", DeprecationWarning, stacklevel=2)
305 return self.makefile(*args, **kw)
307 def recv(self, buflen, flags=0):
309 if self.act_non_blocking:
310 return fd.recv(buflen, flags)
313 return fd.recv(buflen, flags)
314 except socket.error as e:
315 if get_errno(e) in SOCKET_BLOCKING:
317 elif get_errno(e) in SOCKET_CLOSED:
325 timeout=self.gettimeout(),
326 timeout_exc=socket.timeout("timed out"))
327 except IOClosed as e:
328 # Perhaps we should return '' instead?
331 def recvfrom(self, *args):
332 if not self.act_non_blocking:
333 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
334 timeout_exc=socket.timeout("timed out"))
335 return self.fd.recvfrom(*args)
337 def recvfrom_into(self, *args):
338 if not self.act_non_blocking:
339 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
340 timeout_exc=socket.timeout("timed out"))
341 return self.fd.recvfrom_into(*args)
343 def recv_into(self, *args):
344 if not self.act_non_blocking:
345 self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
346 timeout_exc=socket.timeout("timed out"))
347 return self.fd.recv_into(*args)
349 def send(self, data, flags=0):
351 if self.act_non_blocking:
352 return fd.send(data, flags)
354 # blocking socket behavior - sends all, blocks if the buffer is full
359 total_sent += fd.send(data[total_sent:], flags)
360 except socket.error as e:
361 if get_errno(e) not in SOCKET_BLOCKING:
364 if total_sent == len_data:
368 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
369 timeout_exc=socket.timeout("timed out"))
371 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
375 def sendall(self, data, flags=0):
376 tail = self.send(data, flags)
378 while tail < len_data:
379 tail += self.send(data[tail:], flags)
381 def sendto(self, *args):
382 self._trampoline(self.fd, write=True)
383 return self.fd.sendto(*args)
385 def setblocking(self, flag):
387 self.act_non_blocking = False
390 self.act_non_blocking = True
393 def settimeout(self, howlong):
394 if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
395 self.setblocking(True)
398 f = howlong.__float__
399 except AttributeError:
400 raise TypeError('a float is required')
403 raise ValueError('Timeout value out of range')
405 self.act_non_blocking = True
408 self.act_non_blocking = False
409 self._timeout = howlong
411 def gettimeout(self):
414 if "__pypy__" in sys.builtin_module_names:
416 getattr(self.fd, '_sock', self.fd)._reuse()
419 getattr(self.fd, '_sock', self.fd)._drop()
422 def _operation_on_closed_file(*args, **kwargs):
423 raise ValueError("I/O operation on closed file")
427 GreenPipe is a cooperative replacement for file class.
428 It will cooperate on pipes. It will block on regular file.
429 Differneces from file class:
430 - mode is r/w property. Should re r/o
431 - encoding property not implemented
432 - write/writelines will not raise TypeError exception when non-string data is written
433 it will write str(data) instead
434 - Universal new lines are not supported and newlines property not implementeded
435 - file argument can be descriptor, file name or file object.
438 # import SSL module here so we can refer to greenio.SSL.exceptionclass
440 from OpenSSL import SSL
442 # pyOpenSSL not installed, define exceptions anyway for convenience
444 class WantWriteError(Exception):
447 class WantReadError(Exception):
450 class ZeroReturnError(Exception):
453 class SysCallError(Exception):
457 def shutdown_safe(sock):
458 """ Shuts down the socket. This is a convenience method for
459 code that wants to gracefully handle regular sockets, SSL.Connection
460 sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
461 interchangeably. Both types of ssl socket require a shutdown() before
462 close, but they have different arity on their shutdown method.
464 Regular sockets don't need a shutdown before close, but it doesn't hurt.
468 # socket, ssl.SSLSocket
469 return sock.shutdown(socket.SHUT_RDWR)
472 return sock.shutdown()
473 except socket.error as e:
474 # we don't care if the socket is already closed;
475 # this will often be the case in an http server context
476 if get_errno(e) not in (errno.ENOTCONN, errno.EBADF):