Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / greenio.py
1 import errno
2 import os
3 from socket import socket as _original_socket
4 import socket
5 import sys
6 import time
7 import warnings
8
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
11
12 __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
13
14 BUFFER_SIZE = 4096
15 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
16 CONNECT_SUCCESS = set((0, errno.EISCONN))
17 if sys.platform[:3] == "win":
18     CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
19
20 if six.PY3:
21     from io import IOBase as file
22     _fileobject = socket.SocketIO
23 elif six.PY2:
24     _fileobject = socket._fileobject
25
26
27 def socket_connect(descriptor, address):
28     """
29     Attempts to connect to the address, returns the descriptor if it succeeds,
30     returns None if it needs to trampoline, and raises any exceptions.
31     """
32     err = descriptor.connect_ex(address)
33     if err in CONNECT_ERR:
34         return None
35     if err not in CONNECT_SUCCESS:
36         raise socket.error(err, errno.errorcode[err])
37     return descriptor
38
39
40 def socket_checkerr(descriptor):
41     err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
42     if err not in CONNECT_SUCCESS:
43         raise socket.error(err, errno.errorcode[err])
44
45
46 def socket_accept(descriptor):
47     """
48     Attempts to accept() on the descriptor, returns a client,address tuple
49     if it succeeds; returns None if it needs to trampoline, and raises
50     any exceptions.
51     """
52     try:
53         return descriptor.accept()
54     except socket.error as e:
55         if get_errno(e) == errno.EWOULDBLOCK:
56             return None
57         raise
58
59
60 if sys.platform[:3] == "win":
61     # winsock sometimes throws ENOTCONN
62     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
63     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
64 else:
65     # oddly, on linux/darwin, an unconnected socket is expected to block,
66     # so we treat ENOTCONN the same as EWOULDBLOCK
67     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
68     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
69
70
71 def set_nonblocking(fd):
72     """
73     Sets the descriptor to be nonblocking.  Works on many file-like
74     objects as well as sockets.  Only sockets can be nonblocking on
75     Windows, however.
76     """
77     try:
78         setblocking = fd.setblocking
79     except AttributeError:
80         # fd has no setblocking() method. It could be that this version of
81         # Python predates socket.setblocking(). In that case, we can still set
82         # the flag "by hand" on the underlying OS fileno using the fcntl
83         # module.
84         try:
85             import fcntl
86         except ImportError:
87             # Whoops, Windows has no fcntl module. This might not be a socket
88             # at all, but rather a file-like object with no setblocking()
89             # method. In particular, on Windows, pipes don't support
90             # non-blocking I/O and therefore don't have that method. Which
91             # means fcntl wouldn't help even if we could load it.
92             raise NotImplementedError("set_nonblocking() on a file object "
93                                       "with no setblocking() method "
94                                       "(Windows pipes don't support non-blocking I/O)")
95         # We managed to import fcntl.
96         fileno = fd.fileno()
97         orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
98         new_flags = orig_flags | os.O_NONBLOCK
99         if new_flags != orig_flags:
100             fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
101     else:
102         # socket supports setblocking()
103         setblocking(0)
104
105
106 try:
107     from socket import _GLOBAL_DEFAULT_TIMEOUT
108 except ImportError:
109     _GLOBAL_DEFAULT_TIMEOUT = object()
110
111
112 class GreenSocket(object):
113     """
114     Green version of socket.socket class, that is intended to be 100%
115     API-compatible.
116
117     It also recognizes the keyword parameter, 'set_nonblocking=True'.
118     Pass False to indicate that socket is already in non-blocking mode
119     to save syscalls.
120     """
121
122     # This placeholder is to prevent __getattr__ from creating an infinite call loop
123     fd = None
124
125     def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
126         should_set_nonblocking = kwargs.pop('set_nonblocking', True)
127         if isinstance(family_or_realsock, six.integer_types):
128             fd = _original_socket(family_or_realsock, *args, **kwargs)
129             # Notify the hub that this is a newly-opened socket.
130             notify_opened(fd.fileno())
131         else:
132             fd = family_or_realsock
133
134         # import timeout from other socket, if it was there
135         try:
136             self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
137         except AttributeError:
138             self._timeout = socket.getdefaulttimeout()
139
140         if should_set_nonblocking:
141             set_nonblocking(fd)
142         self.fd = fd
143         # when client calls setblocking(0) or settimeout(0) the socket must
144         # act non-blocking
145         self.act_non_blocking = False
146
147         # Copy some attributes from underlying real socket.
148         # This is the easiest way that i found to fix
149         # https://bitbucket.org/eventlet/eventlet/issue/136
150         # Only `getsockopt` is required to fix that issue, others
151         # are just premature optimization to save __getattr__ call.
152         self.bind = fd.bind
153         self.close = fd.close
154         self.fileno = fd.fileno
155         self.getsockname = fd.getsockname
156         self.getsockopt = fd.getsockopt
157         self.listen = fd.listen
158         self.setsockopt = fd.setsockopt
159         self.shutdown = fd.shutdown
160         self._closed = False
161
162     @property
163     def _sock(self):
164         return self
165
166     if six.PY3:
167         def _get_io_refs(self):
168             return self.fd._io_refs
169
170         def _set_io_refs(self, value):
171             self.fd._io_refs = value
172
173         _io_refs = property(_get_io_refs, _set_io_refs)
174
175     # Forward unknown attributes to fd, cache the value for future use.
176     # I do not see any simple attribute which could be changed
177     # so caching everything in self is fine.
178     # If we find such attributes - only attributes having __get__ might be cached.
179     # For now - I do not want to complicate it.
180     def __getattr__(self, name):
181         if self.fd is None:
182             raise AttributeError(name)
183         attr = getattr(self.fd, name)
184         setattr(self, name, attr)
185         return attr
186
187     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
188         """ We need to trampoline via the event hub.
189             We catch any signal back from the hub indicating that the operation we
190             were waiting on was associated with a filehandle that's since been
191             invalidated.
192         """
193         if self._closed:
194             # If we did any logging, alerting to a second trampoline attempt on a closed
195             # socket here would be useful.
196             raise IOClosed()
197         try:
198             return trampoline(fd, read=read, write=write, timeout=timeout,
199                               timeout_exc=timeout_exc,
200                               mark_as_closed=self._mark_as_closed)
201         except IOClosed:
202             # This socket's been obsoleted. De-fang it.
203             self._mark_as_closed()
204             raise
205
206     def accept(self):
207         if self.act_non_blocking:
208             return self.fd.accept()
209         fd = self.fd
210         while True:
211             res = socket_accept(fd)
212             if res is not None:
213                 client, addr = res
214                 set_nonblocking(client)
215                 return type(self)(client), addr
216             self._trampoline(fd, read=True, timeout=self.gettimeout(),
217                              timeout_exc=socket.timeout("timed out"))
218
219     def _mark_as_closed(self):
220         """ Mark this socket as being closed """
221         self._closed = True
222
223     def __del__(self):
224         # This is in case self.close is not assigned yet (currently the constructor does it)
225         close = getattr(self, 'close', None)
226         if close is not None:
227             close()
228
229     def connect(self, address):
230         if self.act_non_blocking:
231             return self.fd.connect(address)
232         fd = self.fd
233         if self.gettimeout() is None:
234             while not socket_connect(fd, address):
235                 try:
236                     self._trampoline(fd, write=True)
237                 except IOClosed:
238                     raise socket.error(errno.EBADFD)
239                 socket_checkerr(fd)
240         else:
241             end = time.time() + self.gettimeout()
242             while True:
243                 if socket_connect(fd, address):
244                     return
245                 if time.time() >= end:
246                     raise socket.timeout("timed out")
247                 try:
248                     self._trampoline(fd, write=True, timeout=end - time.time(),
249                                      timeout_exc=socket.timeout("timed out"))
250                 except IOClosed:
251                     # ... we need some workable errno here.
252                     raise socket.error(errno.EBADFD)
253                 socket_checkerr(fd)
254
255     def connect_ex(self, address):
256         if self.act_non_blocking:
257             return self.fd.connect_ex(address)
258         fd = self.fd
259         if self.gettimeout() is None:
260             while not socket_connect(fd, address):
261                 try:
262                     self._trampoline(fd, write=True)
263                     socket_checkerr(fd)
264                 except socket.error as ex:
265                     return get_errno(ex)
266                 except IOClosed:
267                     return errno.EBADFD
268         else:
269             end = time.time() + self.gettimeout()
270             while True:
271                 try:
272                     if socket_connect(fd, address):
273                         return 0
274                     if time.time() >= end:
275                         raise socket.timeout(errno.EAGAIN)
276                     self._trampoline(fd, write=True, timeout=end - time.time(),
277                                      timeout_exc=socket.timeout(errno.EAGAIN))
278                     socket_checkerr(fd)
279                 except socket.error as ex:
280                     return get_errno(ex)
281                 except IOClosed:
282                     return errno.EBADFD
283
284     def dup(self, *args, **kw):
285         sock = self.fd.dup(*args, **kw)
286         newsock = type(self)(sock, set_nonblocking=False)
287         newsock.settimeout(self.gettimeout())
288         return newsock
289
290     if six.PY3:
291         def makefile(self, *args, **kwargs):
292             return _original_socket.makefile(self, *args, **kwargs)
293     else:
294         def makefile(self, *args, **kwargs):
295             dupped = self.dup()
296             res = _fileobject(dupped, *args, **kwargs)
297             if hasattr(dupped, "_drop"):
298                 dupped._drop()
299             return res
300
301     def makeGreenFile(self, *args, **kw):
302         warnings.warn("makeGreenFile has been deprecated, please use "
303                       "makefile instead", DeprecationWarning, stacklevel=2)
304         return self.makefile(*args, **kw)
305
306     def recv(self, buflen, flags=0):
307         fd = self.fd
308         if self.act_non_blocking:
309             return fd.recv(buflen, flags)
310         while True:
311             try:
312                 return fd.recv(buflen, flags)
313             except socket.error as e:
314                 if get_errno(e) in SOCKET_BLOCKING:
315                     pass
316                 elif get_errno(e) in SOCKET_CLOSED:
317                     return ''
318                 else:
319                     raise
320             try:
321                 self._trampoline(
322                     fd,
323                     read=True,
324                     timeout=self.gettimeout(),
325                     timeout_exc=socket.timeout("timed out"))
326             except IOClosed as e:
327                 # Perhaps we should return '' instead?
328                 raise EOFError()
329
330     def recvfrom(self, *args):
331         if not self.act_non_blocking:
332             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
333                              timeout_exc=socket.timeout("timed out"))
334         return self.fd.recvfrom(*args)
335
336     def recvfrom_into(self, *args):
337         if not self.act_non_blocking:
338             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
339                              timeout_exc=socket.timeout("timed out"))
340         return self.fd.recvfrom_into(*args)
341
342     def recv_into(self, *args):
343         if not self.act_non_blocking:
344             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
345                              timeout_exc=socket.timeout("timed out"))
346         return self.fd.recv_into(*args)
347
348     def send(self, data, flags=0):
349         fd = self.fd
350         if self.act_non_blocking:
351             return fd.send(data, flags)
352
353         # blocking socket behavior - sends all, blocks if the buffer is full
354         total_sent = 0
355         len_data = len(data)
356         while 1:
357             try:
358                 total_sent += fd.send(data[total_sent:], flags)
359             except socket.error as e:
360                 if get_errno(e) not in SOCKET_BLOCKING:
361                     raise
362
363             if total_sent == len_data:
364                 break
365
366             try:
367                 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
368                                  timeout_exc=socket.timeout("timed out"))
369             except IOClosed:
370                 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
371
372         return total_sent
373
374     def sendall(self, data, flags=0):
375         tail = self.send(data, flags)
376         len_data = len(data)
377         while tail < len_data:
378             tail += self.send(data[tail:], flags)
379
380     def sendto(self, *args):
381         self._trampoline(self.fd, write=True)
382         return self.fd.sendto(*args)
383
384     def setblocking(self, flag):
385         if flag:
386             self.act_non_blocking = False
387             self._timeout = None
388         else:
389             self.act_non_blocking = True
390             self._timeout = 0.0
391
392     def settimeout(self, howlong):
393         if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
394             self.setblocking(True)
395             return
396         try:
397             f = howlong.__float__
398         except AttributeError:
399             raise TypeError('a float is required')
400         howlong = f()
401         if howlong < 0.0:
402             raise ValueError('Timeout value out of range')
403         if howlong == 0.0:
404             self.act_non_blocking = True
405             self._timeout = 0.0
406         else:
407             self.act_non_blocking = False
408             self._timeout = howlong
409
410     def gettimeout(self):
411         return self._timeout
412
413     if "__pypy__" in sys.builtin_module_names:
414         def _reuse(self):
415             getattr(self.fd, '_sock', self.fd)._reuse()
416
417         def _drop(self):
418             getattr(self.fd, '_sock', self.fd)._drop()
419
420
421 class _SocketDuckForFd(object):
422     """Class implementing all socket method used by _fileobject
423     in cooperative manner using low level os I/O calls.
424     """
425     _refcount = 0
426
427     def __init__(self, fileno):
428         self._fileno = fileno
429         notify_opened(fileno)
430         self._closed = False
431
432     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
433         if self._closed:
434             # Don't trampoline if we're already closed.
435             raise IOClosed()
436         try:
437             return trampoline(fd, read=read, write=write, timeout=timeout,
438                               timeout_exc=timeout_exc,
439                               mark_as_closed=self._mark_as_closed)
440         except IOClosed:
441             # Our fileno has been obsoleted. Defang ourselves to
442             # prevent spurious closes.
443             self._mark_as_closed()
444             raise
445
446     def _mark_as_closed(self):
447         self._closed = True
448
449     @property
450     def _sock(self):
451         return self
452
453     def fileno(self):
454         return self._fileno
455
456     def recv(self, buflen):
457         while True:
458             try:
459                 data = os.read(self._fileno, buflen)
460                 return data
461             except OSError as e:
462                 if get_errno(e) not in SOCKET_BLOCKING:
463                     raise IOError(*e.args)
464             self._trampoline(self, read=True)
465
466     def recv_into(self, buf, nbytes=0, flags=0):
467         if nbytes == 0:
468             nbytes = len(buf)
469         data = self.recv(nbytes)
470         buf[:nbytes] = data
471         return len(data)
472
473     def send(self, data):
474         while True:
475             try:
476                 return os.write(self._fileno, data)
477             except OSError as e:
478                 if get_errno(e) not in SOCKET_BLOCKING:
479                     raise IOError(*e.args)
480                 else:
481                     trampoline(self, write=True)
482
483     def sendall(self, data):
484         len_data = len(data)
485         os_write = os.write
486         fileno = self._fileno
487         try:
488             total_sent = os_write(fileno, data)
489         except OSError as e:
490             if get_errno(e) != errno.EAGAIN:
491                 raise IOError(*e.args)
492             total_sent = 0
493         while total_sent < len_data:
494             self._trampoline(self, write=True)
495             try:
496                 total_sent += os_write(fileno, data[total_sent:])
497             except OSError as e:
498                 if get_errno(e) != errno. EAGAIN:
499                     raise IOError(*e.args)
500
501     def __del__(self):
502         self._close()
503
504     def _close(self):
505         notify_close(self._fileno)
506         self._mark_as_closed()
507         try:
508             os.close(self._fileno)
509         except:
510             # os.close may fail if __init__ didn't complete
511             # (i.e file dscriptor passed to popen was invalid
512             pass
513
514     def __repr__(self):
515         return "%s:%d" % (self.__class__.__name__, self._fileno)
516
517     def _reuse(self):
518         self._refcount += 1
519
520     def _drop(self):
521         self._refcount -= 1
522         if self._refcount == 0:
523             self._close()
524     # Python3
525     _decref_socketios = _drop
526
527
528 def _operationOnClosedFile(*args, **kwargs):
529     raise ValueError("I/O operation on closed file")
530
531
532 class GreenPipe(_fileobject):
533     """
534     GreenPipe is a cooperative replacement for file class.
535     It will cooperate on pipes. It will block on regular file.
536     Differneces from file class:
537     - mode is r/w property. Should re r/o
538     - encoding property not implemented
539     - write/writelines will not raise TypeError exception when non-string data is written
540       it will write str(data) instead
541     - Universal new lines are not supported and newlines property not implementeded
542     - file argument can be descriptor, file name or file object.
543     """
544
545     def __init__(self, f, mode='r', bufsize=-1):
546         if not isinstance(f, six.string_types + (int, file)):
547             raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
548
549         if isinstance(f, six.string_types):
550             f = open(f, mode, 0)
551
552         if isinstance(f, int):
553             fileno = f
554             self._name = "<fd:%d>" % fileno
555         else:
556             fileno = os.dup(f.fileno())
557             self._name = f.name
558             if f.mode != mode:
559                 raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
560             self._name = f.name
561             f.close()
562
563         super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
564         set_nonblocking(self)
565         self.softspace = 0
566
567     @property
568     def name(self):
569         return self._name
570
571     def __repr__(self):
572         return "<%s %s %r, mode %r at 0x%x>" % (
573             self.closed and 'closed' or 'open',
574             self.__class__.__name__,
575             self.name,
576             self.mode,
577             (id(self) < 0) and (sys.maxint + id(self)) or id(self))
578
579     def close(self):
580         super(GreenPipe, self).close()
581         for method in [
582                 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
583                 'readline', 'readlines', 'seek', 'tell', 'truncate',
584                 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
585             setattr(self, method, _operationOnClosedFile)
586
587     def __enter__(self):
588         return self
589
590     def __exit__(self, *args):
591         self.close()
592
593     def _get_readahead_len(self):
594         return len(self._rbuf.getvalue())
595
596     def _clear_readahead_buf(self):
597         len = self._get_readahead_len()
598         if len > 0:
599             self.read(len)
600
601     def tell(self):
602         self.flush()
603         try:
604             return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
605         except OSError as e:
606             raise IOError(*e.args)
607
608     def seek(self, offset, whence=0):
609         self.flush()
610         if whence == 1 and offset == 0:  # tell synonym
611             return self.tell()
612         if whence == 1:  # adjust offset by what is read ahead
613             offset -= self._get_readahead_len()
614         try:
615             rv = os.lseek(self.fileno(), offset, whence)
616         except OSError as e:
617             raise IOError(*e.args)
618         else:
619             self._clear_readahead_buf()
620             return rv
621
622     if getattr(file, "truncate", None):  # not all OSes implement truncate
623         def truncate(self, size=-1):
624             self.flush()
625             if size == -1:
626                 size = self.tell()
627             try:
628                 rv = os.ftruncate(self.fileno(), size)
629             except OSError as e:
630                 raise IOError(*e.args)
631             else:
632                 self.seek(size)  # move position&clear buffer
633                 return rv
634
635     def isatty(self):
636         try:
637             return os.isatty(self.fileno())
638         except OSError as e:
639             raise IOError(*e.args)
640
641
642 # import SSL module here so we can refer to greenio.SSL.exceptionclass
643 try:
644     from OpenSSL import SSL
645 except ImportError:
646     # pyOpenSSL not installed, define exceptions anyway for convenience
647     class SSL(object):
648         class WantWriteError(Exception):
649             pass
650
651         class WantReadError(Exception):
652             pass
653
654         class ZeroReturnError(Exception):
655             pass
656
657         class SysCallError(Exception):
658             pass
659
660
661 def shutdown_safe(sock):
662     """ Shuts down the socket. This is a convenience method for
663     code that wants to gracefully handle regular sockets, SSL.Connection
664     sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
665     interchangeably.  Both types of ssl socket require a shutdown() before
666     close, but they have different arity on their shutdown method.
667
668     Regular sockets don't need a shutdown before close, but it doesn't hurt.
669     """
670     try:
671         try:
672             # socket, ssl.SSLSocket
673             return sock.shutdown(socket.SHUT_RDWR)
674         except TypeError:
675             # SSL.Connection
676             return sock.shutdown()
677     except socket.error as e:
678         # we don't care if the socket is already closed;
679         # this will often be the case in an http server context
680         if get_errno(e) != errno.ENOTCONN:
681             raise