8da51caa88f155062297e0c7290cd193dd27df9f
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / greenio / base.py
1 import errno
2 import os
3 from socket import socket as _original_socket
4 import socket
5 import sys
6 import time
7 import warnings
8
9 from eventlet.support import get_errno, six
10 from eventlet.hubs import trampoline, notify_opened, IOClosed
11
12 __all__ = [
13     'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
14     'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
15     'shutdown_safe', 'SSL',
16 ]
17
18 BUFFER_SIZE = 4096
19 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
20 CONNECT_SUCCESS = set((0, errno.EISCONN))
21 if sys.platform[:3] == "win":
22     CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
23
24 if six.PY2:
25     _python2_fileobject = socket._fileobject
26
27
28 def socket_connect(descriptor, address):
29     """
30     Attempts to connect to the address, returns the descriptor if it succeeds,
31     returns None if it needs to trampoline, and raises any exceptions.
32     """
33     err = descriptor.connect_ex(address)
34     if err in CONNECT_ERR:
35         return None
36     if err not in CONNECT_SUCCESS:
37         raise socket.error(err, errno.errorcode[err])
38     return descriptor
39
40
41 def socket_checkerr(descriptor):
42     err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
43     if err not in CONNECT_SUCCESS:
44         raise socket.error(err, errno.errorcode[err])
45
46
47 def socket_accept(descriptor):
48     """
49     Attempts to accept() on the descriptor, returns a client,address tuple
50     if it succeeds; returns None if it needs to trampoline, and raises
51     any exceptions.
52     """
53     try:
54         return descriptor.accept()
55     except socket.error as e:
56         if get_errno(e) == errno.EWOULDBLOCK:
57             return None
58         raise
59
60
61 if sys.platform[:3] == "win":
62     # winsock sometimes throws ENOTCONN
63     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
64     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
65 else:
66     # oddly, on linux/darwin, an unconnected socket is expected to block,
67     # so we treat ENOTCONN the same as EWOULDBLOCK
68     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
69     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
70
71
72 def set_nonblocking(fd):
73     """
74     Sets the descriptor to be nonblocking.  Works on many file-like
75     objects as well as sockets.  Only sockets can be nonblocking on
76     Windows, however.
77     """
78     try:
79         setblocking = fd.setblocking
80     except AttributeError:
81         # fd has no setblocking() method. It could be that this version of
82         # Python predates socket.setblocking(). In that case, we can still set
83         # the flag "by hand" on the underlying OS fileno using the fcntl
84         # module.
85         try:
86             import fcntl
87         except ImportError:
88             # Whoops, Windows has no fcntl module. This might not be a socket
89             # at all, but rather a file-like object with no setblocking()
90             # method. In particular, on Windows, pipes don't support
91             # non-blocking I/O and therefore don't have that method. Which
92             # means fcntl wouldn't help even if we could load it.
93             raise NotImplementedError("set_nonblocking() on a file object "
94                                       "with no setblocking() method "
95                                       "(Windows pipes don't support non-blocking I/O)")
96         # We managed to import fcntl.
97         fileno = fd.fileno()
98         orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
99         new_flags = orig_flags | os.O_NONBLOCK
100         if new_flags != orig_flags:
101             fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
102     else:
103         # socket supports setblocking()
104         setblocking(0)
105
106
107 try:
108     from socket import _GLOBAL_DEFAULT_TIMEOUT
109 except ImportError:
110     _GLOBAL_DEFAULT_TIMEOUT = object()
111
112
113 class GreenSocket(object):
114     """
115     Green version of socket.socket class, that is intended to be 100%
116     API-compatible.
117
118     It also recognizes the keyword parameter, 'set_nonblocking=True'.
119     Pass False to indicate that socket is already in non-blocking mode
120     to save syscalls.
121     """
122
123     # This placeholder is to prevent __getattr__ from creating an infinite call loop
124     fd = None
125
126     def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
127         should_set_nonblocking = kwargs.pop('set_nonblocking', True)
128         if isinstance(family_or_realsock, six.integer_types):
129             fd = _original_socket(family_or_realsock, *args, **kwargs)
130             # Notify the hub that this is a newly-opened socket.
131             notify_opened(fd.fileno())
132         else:
133             fd = family_or_realsock
134
135         # import timeout from other socket, if it was there
136         try:
137             self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
138         except AttributeError:
139             self._timeout = socket.getdefaulttimeout()
140
141         if should_set_nonblocking:
142             set_nonblocking(fd)
143         self.fd = fd
144         # when client calls setblocking(0) or settimeout(0) the socket must
145         # act non-blocking
146         self.act_non_blocking = False
147
148         # Copy some attributes from underlying real socket.
149         # This is the easiest way that i found to fix
150         # https://bitbucket.org/eventlet/eventlet/issue/136
151         # Only `getsockopt` is required to fix that issue, others
152         # are just premature optimization to save __getattr__ call.
153         self.bind = fd.bind
154         self.close = fd.close
155         self.fileno = fd.fileno
156         self.getsockname = fd.getsockname
157         self.getsockopt = fd.getsockopt
158         self.listen = fd.listen
159         self.setsockopt = fd.setsockopt
160         self.shutdown = fd.shutdown
161         self._closed = False
162
163     @property
164     def _sock(self):
165         return self
166
167     if six.PY3:
168         def _get_io_refs(self):
169             return self.fd._io_refs
170
171         def _set_io_refs(self, value):
172             self.fd._io_refs = value
173
174         _io_refs = property(_get_io_refs, _set_io_refs)
175
176     # Forward unknown attributes to fd, cache the value for future use.
177     # I do not see any simple attribute which could be changed
178     # so caching everything in self is fine.
179     # If we find such attributes - only attributes having __get__ might be cached.
180     # For now - I do not want to complicate it.
181     def __getattr__(self, name):
182         if self.fd is None:
183             raise AttributeError(name)
184         attr = getattr(self.fd, name)
185         setattr(self, name, attr)
186         return attr
187
188     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
189         """ We need to trampoline via the event hub.
190             We catch any signal back from the hub indicating that the operation we
191             were waiting on was associated with a filehandle that's since been
192             invalidated.
193         """
194         if self._closed:
195             # If we did any logging, alerting to a second trampoline attempt on a closed
196             # socket here would be useful.
197             raise IOClosed()
198         try:
199             return trampoline(fd, read=read, write=write, timeout=timeout,
200                               timeout_exc=timeout_exc,
201                               mark_as_closed=self._mark_as_closed)
202         except IOClosed:
203             # This socket's been obsoleted. De-fang it.
204             self._mark_as_closed()
205             raise
206
207     def accept(self):
208         if self.act_non_blocking:
209             return self.fd.accept()
210         fd = self.fd
211         while True:
212             res = socket_accept(fd)
213             if res is not None:
214                 client, addr = res
215                 set_nonblocking(client)
216                 return type(self)(client), addr
217             self._trampoline(fd, read=True, timeout=self.gettimeout(),
218                              timeout_exc=socket.timeout("timed out"))
219
220     def _mark_as_closed(self):
221         """ Mark this socket as being closed """
222         self._closed = True
223
224     def __del__(self):
225         # This is in case self.close is not assigned yet (currently the constructor does it)
226         close = getattr(self, 'close', None)
227         if close is not None:
228             close()
229
230     def connect(self, address):
231         if self.act_non_blocking:
232             return self.fd.connect(address)
233         fd = self.fd
234         if self.gettimeout() is None:
235             while not socket_connect(fd, address):
236                 try:
237                     self._trampoline(fd, write=True)
238                 except IOClosed:
239                     raise socket.error(errno.EBADFD)
240                 socket_checkerr(fd)
241         else:
242             end = time.time() + self.gettimeout()
243             while True:
244                 if socket_connect(fd, address):
245                     return
246                 if time.time() >= end:
247                     raise socket.timeout("timed out")
248                 try:
249                     self._trampoline(fd, write=True, timeout=end - time.time(),
250                                      timeout_exc=socket.timeout("timed out"))
251                 except IOClosed:
252                     # ... we need some workable errno here.
253                     raise socket.error(errno.EBADFD)
254                 socket_checkerr(fd)
255
256     def connect_ex(self, address):
257         if self.act_non_blocking:
258             return self.fd.connect_ex(address)
259         fd = self.fd
260         if self.gettimeout() is None:
261             while not socket_connect(fd, address):
262                 try:
263                     self._trampoline(fd, write=True)
264                     socket_checkerr(fd)
265                 except socket.error as ex:
266                     return get_errno(ex)
267                 except IOClosed:
268                     return errno.EBADFD
269         else:
270             end = time.time() + self.gettimeout()
271             while True:
272                 try:
273                     if socket_connect(fd, address):
274                         return 0
275                     if time.time() >= end:
276                         raise socket.timeout(errno.EAGAIN)
277                     self._trampoline(fd, write=True, timeout=end - time.time(),
278                                      timeout_exc=socket.timeout(errno.EAGAIN))
279                     socket_checkerr(fd)
280                 except socket.error as ex:
281                     return get_errno(ex)
282                 except IOClosed:
283                     return errno.EBADFD
284
285     def dup(self, *args, **kw):
286         sock = self.fd.dup(*args, **kw)
287         newsock = type(self)(sock, set_nonblocking=False)
288         newsock.settimeout(self.gettimeout())
289         return newsock
290
291     if six.PY3:
292         def makefile(self, *args, **kwargs):
293             return _original_socket.makefile(self, *args, **kwargs)
294     else:
295         def makefile(self, *args, **kwargs):
296             dupped = self.dup()
297             res = _python2_fileobject(dupped, *args, **kwargs)
298             if hasattr(dupped, "_drop"):
299                 dupped._drop()
300             return res
301
302     def makeGreenFile(self, *args, **kw):
303         warnings.warn("makeGreenFile has been deprecated, please use "
304                       "makefile instead", DeprecationWarning, stacklevel=2)
305         return self.makefile(*args, **kw)
306
307     def recv(self, buflen, flags=0):
308         fd = self.fd
309         if self.act_non_blocking:
310             return fd.recv(buflen, flags)
311         while True:
312             try:
313                 return fd.recv(buflen, flags)
314             except socket.error as e:
315                 if get_errno(e) in SOCKET_BLOCKING:
316                     pass
317                 elif get_errno(e) in SOCKET_CLOSED:
318                     return ''
319                 else:
320                     raise
321             try:
322                 self._trampoline(
323                     fd,
324                     read=True,
325                     timeout=self.gettimeout(),
326                     timeout_exc=socket.timeout("timed out"))
327             except IOClosed as e:
328                 # Perhaps we should return '' instead?
329                 raise EOFError()
330
331     def recvfrom(self, *args):
332         if not self.act_non_blocking:
333             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
334                              timeout_exc=socket.timeout("timed out"))
335         return self.fd.recvfrom(*args)
336
337     def recvfrom_into(self, *args):
338         if not self.act_non_blocking:
339             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
340                              timeout_exc=socket.timeout("timed out"))
341         return self.fd.recvfrom_into(*args)
342
343     def recv_into(self, *args):
344         if not self.act_non_blocking:
345             self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
346                              timeout_exc=socket.timeout("timed out"))
347         return self.fd.recv_into(*args)
348
349     def send(self, data, flags=0):
350         fd = self.fd
351         if self.act_non_blocking:
352             return fd.send(data, flags)
353
354         # blocking socket behavior - sends all, blocks if the buffer is full
355         total_sent = 0
356         len_data = len(data)
357         while 1:
358             try:
359                 total_sent += fd.send(data[total_sent:], flags)
360             except socket.error as e:
361                 if get_errno(e) not in SOCKET_BLOCKING:
362                     raise
363
364             if total_sent == len_data:
365                 break
366
367             try:
368                 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
369                                  timeout_exc=socket.timeout("timed out"))
370             except IOClosed:
371                 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
372
373         return total_sent
374
375     def sendall(self, data, flags=0):
376         tail = self.send(data, flags)
377         len_data = len(data)
378         while tail < len_data:
379             tail += self.send(data[tail:], flags)
380
381     def sendto(self, *args):
382         self._trampoline(self.fd, write=True)
383         return self.fd.sendto(*args)
384
385     def setblocking(self, flag):
386         if flag:
387             self.act_non_blocking = False
388             self._timeout = None
389         else:
390             self.act_non_blocking = True
391             self._timeout = 0.0
392
393     def settimeout(self, howlong):
394         if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
395             self.setblocking(True)
396             return
397         try:
398             f = howlong.__float__
399         except AttributeError:
400             raise TypeError('a float is required')
401         howlong = f()
402         if howlong < 0.0:
403             raise ValueError('Timeout value out of range')
404         if howlong == 0.0:
405             self.act_non_blocking = True
406             self._timeout = 0.0
407         else:
408             self.act_non_blocking = False
409             self._timeout = howlong
410
411     def gettimeout(self):
412         return self._timeout
413
414     if "__pypy__" in sys.builtin_module_names:
415         def _reuse(self):
416             getattr(self.fd, '_sock', self.fd)._reuse()
417
418         def _drop(self):
419             getattr(self.fd, '_sock', self.fd)._drop()
420
421
422 def _operation_on_closed_file(*args, **kwargs):
423     raise ValueError("I/O operation on closed file")
424
425
426 greenpipe_doc = """
427     GreenPipe is a cooperative replacement for file class.
428     It will cooperate on pipes. It will block on regular file.
429     Differneces from file class:
430     - mode is r/w property. Should re r/o
431     - encoding property not implemented
432     - write/writelines will not raise TypeError exception when non-string data is written
433       it will write str(data) instead
434     - Universal new lines are not supported and newlines property not implementeded
435     - file argument can be descriptor, file name or file object.
436     """
437
438 # import SSL module here so we can refer to greenio.SSL.exceptionclass
439 try:
440     from OpenSSL import SSL
441 except ImportError:
442     # pyOpenSSL not installed, define exceptions anyway for convenience
443     class SSL(object):
444         class WantWriteError(Exception):
445             pass
446
447         class WantReadError(Exception):
448             pass
449
450         class ZeroReturnError(Exception):
451             pass
452
453         class SysCallError(Exception):
454             pass
455
456
457 def shutdown_safe(sock):
458     """ Shuts down the socket. This is a convenience method for
459     code that wants to gracefully handle regular sockets, SSL.Connection
460     sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
461     interchangeably.  Both types of ssl socket require a shutdown() before
462     close, but they have different arity on their shutdown method.
463
464     Regular sockets don't need a shutdown before close, but it doesn't hurt.
465     """
466     try:
467         try:
468             # socket, ssl.SSLSocket
469             return sock.shutdown(socket.SHUT_RDWR)
470         except TypeError:
471             # SSL.Connection
472             return sock.shutdown()
473     except socket.error as e:
474         # we don't care if the socket is already closed;
475         # this will often be the case in an http server context
476         if get_errno(e) not in (errno.ENOTCONN, errno.EBADF):
477             raise