9 from eventlet.hubs import trampoline, notify_opened, IOClosed
10 from eventlet.support import get_errno, six
13 'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
14 'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
15 'shutdown_safe', 'SSL',
19 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
20 CONNECT_SUCCESS = set((0, errno.EISCONN))
21 if sys.platform[:3] == "win":
22 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
25 _python2_fileobject = socket._fileobject
27 _original_socket = eventlet.patcher.original('socket').socket
30 def socket_connect(descriptor, address):
32 Attempts to connect to the address, returns the descriptor if it succeeds,
33 returns None if it needs to trampoline, and raises any exceptions.
35 err = descriptor.connect_ex(address)
36 if err in CONNECT_ERR:
38 if err not in CONNECT_SUCCESS:
39 raise socket.error(err, errno.errorcode[err])
43 def socket_checkerr(descriptor):
44 err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
45 if err not in CONNECT_SUCCESS:
46 raise socket.error(err, errno.errorcode[err])
49 def socket_accept(descriptor):
51 Attempts to accept() on the descriptor, returns a client,address tuple
52 if it succeeds; returns None if it needs to trampoline, and raises
56 return descriptor.accept()
57 except socket.error as e:
58 if get_errno(e) == errno.EWOULDBLOCK:
63 if sys.platform[:3] == "win":
64 # winsock sometimes throws ENOTCONN
65 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
66 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
68 # oddly, on linux/darwin, an unconnected socket is expected to block,
69 # so we treat ENOTCONN the same as EWOULDBLOCK
70 SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
71 SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
74 def set_nonblocking(fd):
76 Sets the descriptor to be nonblocking. Works on many file-like
77 objects as well as sockets. Only sockets can be nonblocking on
81 setblocking = fd.setblocking
82 except AttributeError:
83 # fd has no setblocking() method. It could be that this version of
84 # Python predates socket.setblocking(). In that case, we can still set
85 # the flag "by hand" on the underlying OS fileno using the fcntl
90 # Whoops, Windows has no fcntl module. This might not be a socket
91 # at all, but rather a file-like object with no setblocking()
92 # method. In particular, on Windows, pipes don't support
93 # non-blocking I/O and therefore don't have that method. Which
94 # means fcntl wouldn't help even if we could load it.
95 raise NotImplementedError("set_nonblocking() on a file object "
96 "with no setblocking() method "
97 "(Windows pipes don't support non-blocking I/O)")
98 # We managed to import fcntl.
100 orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
101 new_flags = orig_flags | os.O_NONBLOCK
102 if new_flags != orig_flags:
103 fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
105 # socket supports setblocking()
110 from socket import _GLOBAL_DEFAULT_TIMEOUT
112 _GLOBAL_DEFAULT_TIMEOUT = object()
115 class GreenSocket(object):
117 Green version of socket.socket class, that is intended to be 100%
120 It also recognizes the keyword parameter, 'set_nonblocking=True'.
121 Pass False to indicate that socket is already in non-blocking mode
125 # This placeholder is to prevent __getattr__ from creating an infinite call loop
128 def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
129 should_set_nonblocking = kwargs.pop('set_nonblocking', True)
130 if isinstance(family_or_realsock, six.integer_types):
131 fd = _original_socket(family_or_realsock, *args, **kwargs)
132 # Notify the hub that this is a newly-opened socket.
133 notify_opened(fd.fileno())
135 fd = family_or_realsock
137 # import timeout from other socket, if it was there
139 self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
140 except AttributeError:
141 self._timeout = socket.getdefaulttimeout()
143 if should_set_nonblocking:
146 # when client calls setblocking(0) or settimeout(0) the socket must
148 self.act_non_blocking = False
150 # Copy some attributes from underlying real socket.
151 # This is the easiest way that i found to fix
152 # https://bitbucket.org/eventlet/eventlet/issue/136
153 # Only `getsockopt` is required to fix that issue, others
154 # are just premature optimization to save __getattr__ call.
156 self.close = fd.close
157 self.fileno = fd.fileno
158 self.getsockname = fd.getsockname
159 self.getsockopt = fd.getsockopt
160 self.listen = fd.listen
161 self.setsockopt = fd.setsockopt
162 self.shutdown = fd.shutdown
170 def _get_io_refs(self):
171 return self.fd._io_refs
173 def _set_io_refs(self, value):
174 self.fd._io_refs = value
176 _io_refs = property(_get_io_refs, _set_io_refs)
178 # Forward unknown attributes to fd, cache the value for future use.
179 # I do not see any simple attribute which could be changed
180 # so caching everything in self is fine.
181 # If we find such attributes - only attributes having __get__ might be cached.
182 # For now - I do not want to complicate it.
183 def __getattr__(self, name):
185 raise AttributeError(name)
186 attr = getattr(self.fd, name)
187 setattr(self, name, attr)
190 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
191 """ We need to trampoline via the event hub.
192 We catch any signal back from the hub indicating that the operation we
193 were waiting on was associated with a filehandle that's since been
197 # If we did any logging, alerting to a second trampoline attempt on a closed
198 # socket here would be useful.
201 return trampoline(fd, read=read, write=write, timeout=timeout,
202 timeout_exc=timeout_exc,
203 mark_as_closed=self._mark_as_closed)
205 # This socket's been obsoleted. De-fang it.
206 self._mark_as_closed()
210 if self.act_non_blocking:
211 return self.fd.accept()
214 res = socket_accept(fd)
217 set_nonblocking(client)
218 return type(self)(client), addr
219 self._trampoline(fd, read=True, timeout=self.gettimeout(),
220 timeout_exc=socket.timeout("timed out"))
222 def _mark_as_closed(self):
223 """ Mark this socket as being closed """
227 # This is in case self.close is not assigned yet (currently the constructor does it)
228 close = getattr(self, 'close', None)
229 if close is not None:
232 def connect(self, address):
233 if self.act_non_blocking:
234 return self.fd.connect(address)
236 if self.gettimeout() is None:
237 while not socket_connect(fd, address):
239 self._trampoline(fd, write=True)
241 raise socket.error(errno.EBADFD)
244 end = time.time() + self.gettimeout()
246 if socket_connect(fd, address):
248 if time.time() >= end:
249 raise socket.timeout("timed out")
251 self._trampoline(fd, write=True, timeout=end - time.time(),
252 timeout_exc=socket.timeout("timed out"))
254 # ... we need some workable errno here.
255 raise socket.error(errno.EBADFD)
258 def connect_ex(self, address):
259 if self.act_non_blocking:
260 return self.fd.connect_ex(address)
262 if self.gettimeout() is None:
263 while not socket_connect(fd, address):
265 self._trampoline(fd, write=True)
267 except socket.error as ex:
272 end = time.time() + self.gettimeout()
275 if socket_connect(fd, address):
277 if time.time() >= end:
278 raise socket.timeout(errno.EAGAIN)
279 self._trampoline(fd, write=True, timeout=end - time.time(),
280 timeout_exc=socket.timeout(errno.EAGAIN))
282 except socket.error as ex:
287 def dup(self, *args, **kw):
288 sock = self.fd.dup(*args, **kw)
289 newsock = type(self)(sock, set_nonblocking=False)
290 newsock.settimeout(self.gettimeout())
294 def makefile(self, *args, **kwargs):
295 return _original_socket.makefile(self, *args, **kwargs)
297 def makefile(self, *args, **kwargs):
299 res = _python2_fileobject(dupped, *args, **kwargs)
300 if hasattr(dupped, "_drop"):
304 def makeGreenFile(self, *args, **kw):
305 warnings.warn("makeGreenFile has been deprecated, please use "
306 "makefile instead", DeprecationWarning, stacklevel=2)
307 return self.makefile(*args, **kw)
309 def _read_trampoline(self):
313 timeout=self.gettimeout(),
314 timeout_exc=socket.timeout("timed out"))
316 def _recv_loop(self, recv_meth, *args):
318 if self.act_non_blocking:
319 return recv_meth(*args)
324 # recv_into: buffer is empty?
325 # This is needed because behind the scenes we use sockets in
326 # nonblocking mode and builtin recv* methods. Attempting to read
327 # 0 bytes from a nonblocking socket using a builtin recv* method
328 # does not raise a timeout exception. Since we're simulating
329 # a blocking socket here we need to produce a timeout exception
330 # if needed, hence the call to trampoline.
332 self._read_trampoline()
333 return recv_meth(*args)
334 except socket.error as e:
335 if get_errno(e) in SOCKET_BLOCKING:
337 elif get_errno(e) in SOCKET_CLOSED:
343 self._read_trampoline()
344 except IOClosed as e:
345 # Perhaps we should return '' instead?
348 def recv(self, bufsize, flags=0):
349 return self._recv_loop(self.fd.recv, bufsize, flags)
351 def recvfrom(self, bufsize, flags=0):
352 return self._recv_loop(self.fd.recvfrom, bufsize, flags)
354 def recv_into(self, buffer, nbytes=0, flags=0):
355 return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags)
357 def recvfrom_into(self, buffer, nbytes=0, flags=0):
358 return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags)
360 def _send_loop(self, send_method, data, *args):
361 if self.act_non_blocking:
362 return send_method(data, *args)
366 return send_method(data, *args)
367 except socket.error as e:
369 if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
373 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
374 timeout_exc=socket.timeout("timed out"))
376 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
378 def send(self, data, flags=0):
379 return self._send_loop(self.fd.send, data, flags)
381 def sendto(self, data, *args):
382 return self._send_loop(self.fd.sendto, data, *args)
384 def sendall(self, data, flags=0):
385 tail = self.send(data, flags)
387 while tail < len_data:
388 tail += self.send(data[tail:], flags)
390 def setblocking(self, flag):
392 self.act_non_blocking = False
395 self.act_non_blocking = True
398 def settimeout(self, howlong):
399 if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
400 self.setblocking(True)
403 f = howlong.__float__
404 except AttributeError:
405 raise TypeError('a float is required')
408 raise ValueError('Timeout value out of range')
410 self.act_non_blocking = True
413 self.act_non_blocking = False
414 self._timeout = howlong
416 def gettimeout(self):
419 if "__pypy__" in sys.builtin_module_names:
421 getattr(self.fd, '_sock', self.fd)._reuse()
424 getattr(self.fd, '_sock', self.fd)._drop()
427 def _operation_on_closed_file(*args, **kwargs):
428 raise ValueError("I/O operation on closed file")
432 GreenPipe is a cooperative replacement for file class.
433 It will cooperate on pipes. It will block on regular file.
434 Differneces from file class:
435 - mode is r/w property. Should re r/o
436 - encoding property not implemented
437 - write/writelines will not raise TypeError exception when non-string data is written
438 it will write str(data) instead
439 - Universal new lines are not supported and newlines property not implementeded
440 - file argument can be descriptor, file name or file object.
443 # import SSL module here so we can refer to greenio.SSL.exceptionclass
445 from OpenSSL import SSL
447 # pyOpenSSL not installed, define exceptions anyway for convenience
449 class WantWriteError(Exception):
452 class WantReadError(Exception):
455 class ZeroReturnError(Exception):
458 class SysCallError(Exception):
462 def shutdown_safe(sock):
463 """ Shuts down the socket. This is a convenience method for
464 code that wants to gracefully handle regular sockets, SSL.Connection
465 sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
466 interchangeably. Both types of ssl socket require a shutdown() before
467 close, but they have different arity on their shutdown method.
469 Regular sockets don't need a shutdown before close, but it doesn't hurt.
473 # socket, ssl.SSLSocket
474 return sock.shutdown(socket.SHUT_RDWR)
477 return sock.shutdown()
478 except socket.error as e:
479 # we don't care if the socket is already closed;
480 # this will often be the case in an http server context
481 if get_errno(e) not in (errno.ENOTCONN, errno.EBADF):