Adjust the package revision; no actual code changes
[packages/trusty/python-eventlet.git] / eventlet / eventlet / greenio.py
1 import errno
2 import os
3 from socket import socket as _original_socket
4 import socket
5 import sys
6 import time
7 import warnings
8
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
11
12 __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
13
14 BUFFER_SIZE = 4096
15 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
16 CONNECT_SUCCESS = set((0, errno.EISCONN))
17 if sys.platform[:3] == "win":
18     CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
19
20 if six.PY3:
21     from io import IOBase as file
22     _fileobject = socket.SocketIO
23 elif six.PY2:
24     _fileobject = socket._fileobject
25
26
27 def socket_connect(descriptor, address):
28     """
29     Attempts to connect to the address, returns the descriptor if it succeeds,
30     returns None if it needs to trampoline, and raises any exceptions.
31     """
32     err = descriptor.connect_ex(address)
33     if err in CONNECT_ERR:
34         return None
35     if err not in CONNECT_SUCCESS:
36         raise socket.error(err, errno.errorcode[err])
37     return descriptor
38
39
40 def socket_checkerr(descriptor):
41     err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
42     if err not in CONNECT_SUCCESS:
43         raise socket.error(err, errno.errorcode[err])
44
45
46 def socket_accept(descriptor):
47     """
48     Attempts to accept() on the descriptor, returns a client,address tuple
49     if it succeeds; returns None if it needs to trampoline, and raises
50     any exceptions.
51     """
52     try:
53         return descriptor.accept()
54     except socket.error as e:
55         if get_errno(e) == errno.EWOULDBLOCK:
56             return None
57         raise
58
59
60 if sys.platform[:3] == "win":
61     # winsock sometimes throws ENOTCONN
62     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
63     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
64 else:
65     # oddly, on linux/darwin, an unconnected socket is expected to block,
66     # so we treat ENOTCONN the same as EWOULDBLOCK
67     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
68     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
69
70
71 def set_nonblocking(fd):
72     """
73     Sets the descriptor to be nonblocking.  Works on many file-like
74     objects as well as sockets.  Only sockets can be nonblocking on
75     Windows, however.
76     """
77     try:
78         setblocking = fd.setblocking
79     except AttributeError:
80         # fd has no setblocking() method. It could be that this version of
81         # Python predates socket.setblocking(). In that case, we can still set
82         # the flag "by hand" on the underlying OS fileno using the fcntl
83         # module.
84         try:
85             import fcntl
86         except ImportError:
87             # Whoops, Windows has no fcntl module. This might not be a socket
88             # at all, but rather a file-like object with no setblocking()
89             # method. In particular, on Windows, pipes don't support
90             # non-blocking I/O and therefore don't have that method. Which
91             # means fcntl wouldn't help even if we could load it.
92             raise NotImplementedError("set_nonblocking() on a file object "
93                                       "with no setblocking() method "
94                                       "(Windows pipes don't support non-blocking I/O)")
95         # We managed to import fcntl.
96         fileno = fd.fileno()
97         orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
98         new_flags = orig_flags | os.O_NONBLOCK
99         if new_flags != orig_flags:
100             fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
101     else:
102         # socket supports setblocking()
103         setblocking(0)
104
105
106 try:
107     from socket import _GLOBAL_DEFAULT_TIMEOUT
108 except ImportError:
109     _GLOBAL_DEFAULT_TIMEOUT = object()
110
111
112 class GreenSocket(object):
113     """
114     Green version of socket.socket class, that is intended to be 100%
115     API-compatible.
116
117     It also recognizes the keyword parameter, 'set_nonblocking=True'.
118     Pass False to indicate that socket is already in non-blocking mode
119     to save syscalls.
120     """
121
122     def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
123         should_set_nonblocking = kwargs.pop('set_nonblocking', True)
124         if isinstance(family_or_realsock, six.integer_types):
125             fd = _original_socket(family_or_realsock, *args, **kwargs)
126             # Notify the hub that this is a newly-opened socket.
127             notify_opened(fd.fileno())
128         else:
129             fd = family_or_realsock
130
131         # import timeout from other socket, if it was there
132         try:
133             self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
134         except AttributeError:
135             self._timeout = socket.getdefaulttimeout()
136
137         if should_set_nonblocking:
138             set_nonblocking(fd)
139         self.fd = fd
140         # when client calls setblocking(0) or settimeout(0) the socket must
141         # act non-blocking
142         self.act_non_blocking = False
143
144         # Copy some attributes from underlying real socket.
145         # This is the easiest way that i found to fix
146         # https://bitbucket.org/eventlet/eventlet/issue/136
147         # Only `getsockopt` is required to fix that issue, others
148         # are just premature optimization to save __getattr__ call.
149         self.bind = fd.bind
150         self.close = fd.close
151         self.fileno = fd.fileno
152         self.getsockname = fd.getsockname
153         self.getsockopt = fd.getsockopt
154         self.listen = fd.listen
155         self.setsockopt = fd.setsockopt
156         self.shutdown = fd.shutdown
157         self._closed = False
158
159     @property
160     def _sock(self):
161         return self
162
163     # Forward unknown attributes to fd, cache the value for future use.
164     # I do not see any simple attribute which could be changed
165     # so caching everything in self is fine.
166     # If we find such attributes - only attributes having __get__ might be cached.
167     # For now - I do not want to complicate it.
168     def __getattr__(self, name):
169         attr = getattr(self.fd, name)
170         setattr(self, name, attr)
171         return attr
172
173     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
174         """ We need to trampoline via the event hub.
175             We catch any signal back from the hub indicating that the operation we
176             were waiting on was associated with a filehandle that's since been
177             invalidated.
178         """
179         if self._closed:
180             # If we did any logging, alerting to a second trampoline attempt on a closed
181             # socket here would be useful.
182             raise IOClosed()
183         try:
184             return trampoline(fd, read=read, write=write, timeout=timeout,
185                             timeout_exc=timeout_exc,
186                             mark_as_closed=self._mark_as_closed)
187         except IOClosed:
188             # This socket's been obsoleted. De-fang it.
189             self._mark_as_closed()
190             raise
191
192     def accept(self):
193         if self.act_non_blocking:
194             return self.fd.accept()
195         fd = self.fd
196         while True:
197             res = socket_accept(fd)
198             if res is not None:
199                 client, addr = res
200                 set_nonblocking(client)
201                 return type(self)(client), addr
202             self._trampoline(fd, read=True, timeout=self.gettimeout(),
203                        timeout_exc=socket.timeout("timed out"))
204
205     def _mark_as_closed(self):
206         """ Mark this socket as being closed """
207         self._closed = True
208
209     def close(self):
210         notify_close(self.fd)
211         self._mark_as_closed()
212         return self.fd.close()
213
214     def __del__(self):
215         self.close()
216
217     def connect(self, address):
218         if self.act_non_blocking:
219             return self.fd.connect(address)
220         fd = self.fd
221         if self.gettimeout() is None:
222             while not socket_connect(fd, address):
223                 try:
224                     self._trampoline(fd, write=True)
225                 except IOClosed:
226                     raise socket.error(errno.EBADFD)
227                 socket_checkerr(fd)
228         else:
229             end = time.time() + self.gettimeout()
230             while True:
231                 if socket_connect(fd, address):
232                     return
233                 if time.time() >= end:
234                     raise socket.timeout("timed out")
235                 try:
236                     self._trampoline(fd, write=True, timeout=end - time.time(),
237                            timeout_exc=socket.timeout("timed out"))
238                 except IOClosed:
239                     # ... we need some workable errno here.
240                     raise socket.error(errno.EBADFD)
241                 socket_checkerr(fd)
242
243     def connect_ex(self, address):
244         if self.act_non_blocking:
245             return self.fd.connect_ex(address)
246         fd = self.fd
247         if self.gettimeout() is None:
248             while not socket_connect(fd, address):
249                 try:
250                     self._trampoline(fd, write=True)
251                     socket_checkerr(fd)
252                 except socket.error as ex:
253                     return get_errno(ex)
254                 except IOClosed:
255                     return errno.EBADFD
256         else:
257             end = time.time() + self.gettimeout()
258             while True:
259                 try:
260                     if socket_connect(fd, address):
261                         return 0
262                     if time.time() >= end:
263                         raise socket.timeout(errno.EAGAIN)
264                     self._trampoline(fd, write=True, timeout=end - time.time(),
265                                timeout_exc=socket.timeout(errno.EAGAIN))
266                     socket_checkerr(fd)
267                 except socket.error as ex:
268                     return get_errno(ex)
269                 except IOClosed:
270                     return errno.EBADFD
271
272     def dup(self, *args, **kw):
273         sock = self.fd.dup(*args, **kw)
274         newsock = type(self)(sock, set_nonblocking=False)
275         newsock.settimeout(self.gettimeout())
276         return newsock
277
278     def makefile(self, *args, **kw):
279         dupped = self.dup()
280         res = _fileobject(dupped, *args, **kw)
281         if hasattr(dupped, "_drop"):
282             dupped._drop()
283         return res
284
285     def makeGreenFile(self, *args, **kw):
286         warnings.warn("makeGreenFile has been deprecated, please use "
287                       "makefile instead", DeprecationWarning, stacklevel=2)
288         return self.makefile(*args, **kw)
289
290     def recv(self, buflen, flags=0):
291         fd = self.fd
292         if self.act_non_blocking:
293             return fd.recv(buflen, flags)
294         while True:
295             try:
296                 return fd.recv(buflen, flags)
297             except socket.error as e:
298                 if get_errno(e) in SOCKET_BLOCKING:
299                     pass
300                 elif get_errno(e) in SOCKET_CLOSED:
301                     return ''
302                 else:
303                     raise
304             try:
305                 self._trampoline(
306                     fd,
307                     read=True,
308                     timeout=self.gettimeout(),
309                     timeout_exc=socket.timeout("timed out"))
310             except IOClosed as e:
311                 # Perhaps we should return '' instead?
312                 raise EOFError()
313
314     def recvfrom(self, *args):
315         if not self.act_non_blocking:
316             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
317                        timeout_exc=socket.timeout("timed out"))
318         return self.fd.recvfrom(*args)
319
320     def recvfrom_into(self, *args):
321         if not self.act_non_blocking:
322             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
323                        timeout_exc=socket.timeout("timed out"))
324         return self.fd.recvfrom_into(*args)
325
326     def recv_into(self, *args):
327         if not self.act_non_blocking:
328             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
329                        timeout_exc=socket.timeout("timed out"))
330         return self.fd.recv_into(*args)
331
332     def send(self, data, flags=0):
333         fd = self.fd
334         if self.act_non_blocking:
335             return fd.send(data, flags)
336
337         # blocking socket behavior - sends all, blocks if the buffer is full
338         total_sent = 0
339         len_data = len(data)
340         while 1:
341             try:
342                 total_sent += fd.send(data[total_sent:], flags)
343             except socket.error as e:
344                 if get_errno(e) not in SOCKET_BLOCKING:
345                     raise
346
347             if total_sent == len_data:
348                 break
349
350             try:
351                 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
352                            timeout_exc=socket.timeout("timed out"))
353             except IOClosed:
354                 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
355
356         return total_sent
357
358     def sendall(self, data, flags=0):
359         tail = self.send(data, flags)
360         len_data = len(data)
361         while tail < len_data:
362             tail += self.send(data[tail:], flags)
363
364     def sendto(self, *args):
365         self._trampoline(self.fd, write=True)
366         return self.fd.sendto(*args)
367
368     def setblocking(self, flag):
369         if flag:
370             self.act_non_blocking = False
371             self._timeout = None
372         else:
373             self.act_non_blocking = True
374             self._timeout = 0.0
375
376     def settimeout(self, howlong):
377         if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
378             self.setblocking(True)
379             return
380         try:
381             f = howlong.__float__
382         except AttributeError:
383             raise TypeError('a float is required')
384         howlong = f()
385         if howlong < 0.0:
386             raise ValueError('Timeout value out of range')
387         if howlong == 0.0:
388             self.act_non_blocking = True
389             self._timeout = 0.0
390         else:
391             self.act_non_blocking = False
392             self._timeout = howlong
393
394     def gettimeout(self):
395         return self._timeout
396
397     if "__pypy__" in sys.builtin_module_names:
398         def _reuse(self):
399             getattr(self.fd, '_sock', self.fd)._reuse()
400
401         def _drop(self):
402             getattr(self.fd, '_sock', self.fd)._drop()
403
404
405 class _SocketDuckForFd(object):
406     """Class implementing all socket method used by _fileobject
407     in cooperative manner using low level os I/O calls.
408     """
409     _refcount = 0
410
411     def __init__(self, fileno):
412         self._fileno = fileno
413         notify_opened(fileno)
414         self._closed = False
415
416     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
417         if self._closed:
418             # Don't trampoline if we're already closed.
419             raise IOClosed()
420         try:
421             return trampoline(fd, read=read, write=write, timeout=timeout,
422                             timeout_exc=timeout_exc,
423                             mark_as_closed=self._mark_as_closed)
424         except IOClosed:
425             # Our fileno has been obsoleted. Defang ourselves to
426             # prevent spurious closes.
427             self._mark_as_closed()
428             raise
429
430     def _mark_as_closed(self):
431         self._closed = True
432
433     @property
434     def _sock(self):
435         return self
436
437     def fileno(self):
438         return self._fileno
439
440     def recv(self, buflen):
441         while True:
442             try:
443                 data = os.read(self._fileno, buflen)
444                 return data
445             except OSError as e:
446                 if get_errno(e) not in SOCKET_BLOCKING:
447                     raise IOError(*e.args)
448             self._trampoline(self, read=True)
449
450     def recv_into(self, buf, nbytes=0, flags=0):
451         if nbytes == 0:
452             nbytes = len(buf)
453         data = self.recv(nbytes)
454         buf[:nbytes] = data
455         return len(data)
456
457     def send(self, data):
458         while True:
459             try:
460                 os.write(self._fileno, data)
461             except OSError as e:
462                 if get_errno(e) not in SOCKET_BLOCKING:
463                     raise IOError(*e.args)
464             trampoline(self, write=True)
465
466     def sendall(self, data):
467         len_data = len(data)
468         os_write = os.write
469         fileno = self._fileno
470         try:
471             total_sent = os_write(fileno, data)
472         except OSError as e:
473             if get_errno(e) != errno.EAGAIN:
474                 raise IOError(*e.args)
475             total_sent = 0
476         while total_sent < len_data:
477             self._trampoline(self, write=True)
478             try:
479                 total_sent += os_write(fileno, data[total_sent:])
480             except OSError as e:
481                 if get_errno(e) != errno. EAGAIN:
482                     raise IOError(*e.args)
483
484     def __del__(self):
485         self._close()
486
487     def _close(self):
488         notify_close(self._fileno)
489         self._mark_as_closed()
490         try:
491             os.close(self._fileno)
492         except:
493             # os.close may fail if __init__ didn't complete
494             # (i.e file dscriptor passed to popen was invalid
495             pass
496
497     def __repr__(self):
498         return "%s:%d" % (self.__class__.__name__, self._fileno)
499
500     def _reuse(self):
501         self._refcount += 1
502
503     def _drop(self):
504         self._refcount -= 1
505         if self._refcount == 0:
506             self._close()
507     # Python3
508     _decref_socketios = _drop
509
510
511 def _operationOnClosedFile(*args, **kwargs):
512     raise ValueError("I/O operation on closed file")
513
514
515 class GreenPipe(_fileobject):
516     """
517     GreenPipe is a cooperative replacement for file class.
518     It will cooperate on pipes. It will block on regular file.
519     Differneces from file class:
520     - mode is r/w property. Should re r/o
521     - encoding property not implemented
522     - write/writelines will not raise TypeError exception when non-string data is written
523       it will write str(data) instead
524     - Universal new lines are not supported and newlines property not implementeded
525     - file argument can be descriptor, file name or file object.
526     """
527
528     def __init__(self, f, mode='r', bufsize=-1):
529         if not isinstance(f, six.string_types + (int, file)):
530             raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
531
532         if isinstance(f, six.string_types):
533             f = open(f, mode, 0)
534
535         if isinstance(f, int):
536             fileno = f
537             self._name = "<fd:%d>" % fileno
538         else:
539             fileno = os.dup(f.fileno())
540             self._name = f.name
541             if f.mode != mode:
542                 raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
543             self._name = f.name
544             f.close()
545
546         super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
547         set_nonblocking(self)
548         self.softspace = 0
549
550     @property
551     def name(self):
552         return self._name
553
554     def __repr__(self):
555         return "<%s %s %r, mode %r at 0x%x>" % (
556             self.closed and 'closed' or 'open',
557             self.__class__.__name__,
558             self.name,
559             self.mode,
560             (id(self) < 0) and (sys.maxint + id(self)) or id(self))
561
562     def close(self):
563         super(GreenPipe, self).close()
564         for method in [
565                 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
566                 'readline', 'readlines', 'seek', 'tell', 'truncate',
567                 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
568             setattr(self, method, _operationOnClosedFile)
569
570     def __enter__(self):
571         return self
572
573     def __exit__(self, *args):
574         self.close()
575
576     def _get_readahead_len(self):
577         return len(self._rbuf.getvalue())
578
579     def _clear_readahead_buf(self):
580         len = self._get_readahead_len()
581         if len > 0:
582             self.read(len)
583
584     def tell(self):
585         self.flush()
586         try:
587             return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
588         except OSError as e:
589             raise IOError(*e.args)
590
591     def seek(self, offset, whence=0):
592         self.flush()
593         if whence == 1 and offset == 0:  # tell synonym
594             return self.tell()
595         if whence == 1:  # adjust offset by what is read ahead
596             offset -= self._get_readahead_len()
597         try:
598             rv = os.lseek(self.fileno(), offset, whence)
599         except OSError as e:
600             raise IOError(*e.args)
601         else:
602             self._clear_readahead_buf()
603             return rv
604
605     if getattr(file, "truncate", None):  # not all OSes implement truncate
606         def truncate(self, size=-1):
607             self.flush()
608             if size == -1:
609                 size = self.tell()
610             try:
611                 rv = os.ftruncate(self.fileno(), size)
612             except OSError as e:
613                 raise IOError(*e.args)
614             else:
615                 self.seek(size)  # move position&clear buffer
616                 return rv
617
618     def isatty(self):
619         try:
620             return os.isatty(self.fileno())
621         except OSError as e:
622             raise IOError(*e.args)
623
624
625 # import SSL module here so we can refer to greenio.SSL.exceptionclass
626 try:
627     from OpenSSL import SSL
628 except ImportError:
629     # pyOpenSSL not installed, define exceptions anyway for convenience
630     class SSL(object):
631         class WantWriteError(Exception):
632             pass
633
634         class WantReadError(Exception):
635             pass
636
637         class ZeroReturnError(Exception):
638             pass
639
640         class SysCallError(Exception):
641             pass
642
643
644 def shutdown_safe(sock):
645     """ Shuts down the socket. This is a convenience method for
646     code that wants to gracefully handle regular sockets, SSL.Connection
647     sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
648     interchangeably.  Both types of ssl socket require a shutdown() before
649     close, but they have different arity on their shutdown method.
650
651     Regular sockets don't need a shutdown before close, but it doesn't hurt.
652     """
653     try:
654         try:
655             # socket, ssl.SSLSocket
656             return sock.shutdown(socket.SHUT_RDWR)
657         except TypeError:
658             # SSL.Connection
659             return sock.shutdown()
660     except socket.error as e:
661         # we don't care if the socket is already closed;
662         # this will often be the case in an http server context
663         if get_errno(e) != errno.ENOTCONN:
664             raise