Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / greenio / base.py
1 import errno
2 import os
3 import socket
4 import sys
5 import time
6 import warnings
7
8 import eventlet
9 from eventlet.hubs import trampoline, notify_opened, IOClosed
10 from eventlet.support import get_errno, six
11
12 __all__ = [
13     'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
14     'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
15     'shutdown_safe', 'SSL',
16 ]
17
18 BUFFER_SIZE = 4096
19 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
20 CONNECT_SUCCESS = set((0, errno.EISCONN))
21 if sys.platform[:3] == "win":
22     CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
23
24 if six.PY2:
25     _python2_fileobject = socket._fileobject
26
27 _original_socket = eventlet.patcher.original('socket').socket
28
29
30 def socket_connect(descriptor, address):
31     """
32     Attempts to connect to the address, returns the descriptor if it succeeds,
33     returns None if it needs to trampoline, and raises any exceptions.
34     """
35     err = descriptor.connect_ex(address)
36     if err in CONNECT_ERR:
37         return None
38     if err not in CONNECT_SUCCESS:
39         raise socket.error(err, errno.errorcode[err])
40     return descriptor
41
42
43 def socket_checkerr(descriptor):
44     err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
45     if err not in CONNECT_SUCCESS:
46         raise socket.error(err, errno.errorcode[err])
47
48
49 def socket_accept(descriptor):
50     """
51     Attempts to accept() on the descriptor, returns a client,address tuple
52     if it succeeds; returns None if it needs to trampoline, and raises
53     any exceptions.
54     """
55     try:
56         return descriptor.accept()
57     except socket.error as e:
58         if get_errno(e) == errno.EWOULDBLOCK:
59             return None
60         raise
61
62
63 if sys.platform[:3] == "win":
64     # winsock sometimes throws ENOTCONN
65     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
66     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
67 else:
68     # oddly, on linux/darwin, an unconnected socket is expected to block,
69     # so we treat ENOTCONN the same as EWOULDBLOCK
70     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
71     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
72
73
74 def set_nonblocking(fd):
75     """
76     Sets the descriptor to be nonblocking.  Works on many file-like
77     objects as well as sockets.  Only sockets can be nonblocking on
78     Windows, however.
79     """
80     try:
81         setblocking = fd.setblocking
82     except AttributeError:
83         # fd has no setblocking() method. It could be that this version of
84         # Python predates socket.setblocking(). In that case, we can still set
85         # the flag "by hand" on the underlying OS fileno using the fcntl
86         # module.
87         try:
88             import fcntl
89         except ImportError:
90             # Whoops, Windows has no fcntl module. This might not be a socket
91             # at all, but rather a file-like object with no setblocking()
92             # method. In particular, on Windows, pipes don't support
93             # non-blocking I/O and therefore don't have that method. Which
94             # means fcntl wouldn't help even if we could load it.
95             raise NotImplementedError("set_nonblocking() on a file object "
96                                       "with no setblocking() method "
97                                       "(Windows pipes don't support non-blocking I/O)")
98         # We managed to import fcntl.
99         fileno = fd.fileno()
100         orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
101         new_flags = orig_flags | os.O_NONBLOCK
102         if new_flags != orig_flags:
103             fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
104     else:
105         # socket supports setblocking()
106         setblocking(0)
107
108
109 try:
110     from socket import _GLOBAL_DEFAULT_TIMEOUT
111 except ImportError:
112     _GLOBAL_DEFAULT_TIMEOUT = object()
113
114
115 class GreenSocket(object):
116     """
117     Green version of socket.socket class, that is intended to be 100%
118     API-compatible.
119
120     It also recognizes the keyword parameter, 'set_nonblocking=True'.
121     Pass False to indicate that socket is already in non-blocking mode
122     to save syscalls.
123     """
124
125     # This placeholder is to prevent __getattr__ from creating an infinite call loop
126     fd = None
127
128     def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
129         should_set_nonblocking = kwargs.pop('set_nonblocking', True)
130         if isinstance(family_or_realsock, six.integer_types):
131             fd = _original_socket(family_or_realsock, *args, **kwargs)
132             # Notify the hub that this is a newly-opened socket.
133             notify_opened(fd.fileno())
134         else:
135             fd = family_or_realsock
136
137         # import timeout from other socket, if it was there
138         try:
139             self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
140         except AttributeError:
141             self._timeout = socket.getdefaulttimeout()
142
143         if should_set_nonblocking:
144             set_nonblocking(fd)
145         self.fd = fd
146         # when client calls setblocking(0) or settimeout(0) the socket must
147         # act non-blocking
148         self.act_non_blocking = False
149
150         # Copy some attributes from underlying real socket.
151         # This is the easiest way that i found to fix
152         # https://bitbucket.org/eventlet/eventlet/issue/136
153         # Only `getsockopt` is required to fix that issue, others
154         # are just premature optimization to save __getattr__ call.
155         self.bind = fd.bind
156         self.close = fd.close
157         self.fileno = fd.fileno
158         self.getsockname = fd.getsockname
159         self.getsockopt = fd.getsockopt
160         self.listen = fd.listen
161         self.setsockopt = fd.setsockopt
162         self.shutdown = fd.shutdown
163         self._closed = False
164
165     @property
166     def _sock(self):
167         return self
168
169     if six.PY3:
170         def _get_io_refs(self):
171             return self.fd._io_refs
172
173         def _set_io_refs(self, value):
174             self.fd._io_refs = value
175
176         _io_refs = property(_get_io_refs, _set_io_refs)
177
178     # Forward unknown attributes to fd, cache the value for future use.
179     # I do not see any simple attribute which could be changed
180     # so caching everything in self is fine.
181     # If we find such attributes - only attributes having __get__ might be cached.
182     # For now - I do not want to complicate it.
183     def __getattr__(self, name):
184         if self.fd is None:
185             raise AttributeError(name)
186         attr = getattr(self.fd, name)
187         setattr(self, name, attr)
188         return attr
189
190     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
191         """ We need to trampoline via the event hub.
192             We catch any signal back from the hub indicating that the operation we
193             were waiting on was associated with a filehandle that's since been
194             invalidated.
195         """
196         if self._closed:
197             # If we did any logging, alerting to a second trampoline attempt on a closed
198             # socket here would be useful.
199             raise IOClosed()
200         try:
201             return trampoline(fd, read=read, write=write, timeout=timeout,
202                               timeout_exc=timeout_exc,
203                               mark_as_closed=self._mark_as_closed)
204         except IOClosed:
205             # This socket's been obsoleted. De-fang it.
206             self._mark_as_closed()
207             raise
208
209     def accept(self):
210         if self.act_non_blocking:
211             return self.fd.accept()
212         fd = self.fd
213         while True:
214             res = socket_accept(fd)
215             if res is not None:
216                 client, addr = res
217                 set_nonblocking(client)
218                 return type(self)(client), addr
219             self._trampoline(fd, read=True, timeout=self.gettimeout(),
220                              timeout_exc=socket.timeout("timed out"))
221
222     def _mark_as_closed(self):
223         """ Mark this socket as being closed """
224         self._closed = True
225
226     def __del__(self):
227         # This is in case self.close is not assigned yet (currently the constructor does it)
228         close = getattr(self, 'close', None)
229         if close is not None:
230             close()
231
232     def connect(self, address):
233         if self.act_non_blocking:
234             return self.fd.connect(address)
235         fd = self.fd
236         if self.gettimeout() is None:
237             while not socket_connect(fd, address):
238                 try:
239                     self._trampoline(fd, write=True)
240                 except IOClosed:
241                     raise socket.error(errno.EBADFD)
242                 socket_checkerr(fd)
243         else:
244             end = time.time() + self.gettimeout()
245             while True:
246                 if socket_connect(fd, address):
247                     return
248                 if time.time() >= end:
249                     raise socket.timeout("timed out")
250                 try:
251                     self._trampoline(fd, write=True, timeout=end - time.time(),
252                                      timeout_exc=socket.timeout("timed out"))
253                 except IOClosed:
254                     # ... we need some workable errno here.
255                     raise socket.error(errno.EBADFD)
256                 socket_checkerr(fd)
257
258     def connect_ex(self, address):
259         if self.act_non_blocking:
260             return self.fd.connect_ex(address)
261         fd = self.fd
262         if self.gettimeout() is None:
263             while not socket_connect(fd, address):
264                 try:
265                     self._trampoline(fd, write=True)
266                     socket_checkerr(fd)
267                 except socket.error as ex:
268                     return get_errno(ex)
269                 except IOClosed:
270                     return errno.EBADFD
271         else:
272             end = time.time() + self.gettimeout()
273             while True:
274                 try:
275                     if socket_connect(fd, address):
276                         return 0
277                     if time.time() >= end:
278                         raise socket.timeout(errno.EAGAIN)
279                     self._trampoline(fd, write=True, timeout=end - time.time(),
280                                      timeout_exc=socket.timeout(errno.EAGAIN))
281                     socket_checkerr(fd)
282                 except socket.error as ex:
283                     return get_errno(ex)
284                 except IOClosed:
285                     return errno.EBADFD
286
287     def dup(self, *args, **kw):
288         sock = self.fd.dup(*args, **kw)
289         newsock = type(self)(sock, set_nonblocking=False)
290         newsock.settimeout(self.gettimeout())
291         return newsock
292
293     if six.PY3:
294         def makefile(self, *args, **kwargs):
295             return _original_socket.makefile(self, *args, **kwargs)
296     else:
297         def makefile(self, *args, **kwargs):
298             dupped = self.dup()
299             res = _python2_fileobject(dupped, *args, **kwargs)
300             if hasattr(dupped, "_drop"):
301                 dupped._drop()
302             return res
303
304     def makeGreenFile(self, *args, **kw):
305         warnings.warn("makeGreenFile has been deprecated, please use "
306                       "makefile instead", DeprecationWarning, stacklevel=2)
307         return self.makefile(*args, **kw)
308
309     def _read_trampoline(self):
310         self._trampoline(
311             self.fd,
312             read=True,
313             timeout=self.gettimeout(),
314             timeout_exc=socket.timeout("timed out"))
315
316     def _recv_loop(self, recv_meth, *args):
317         fd = self.fd
318         if self.act_non_blocking:
319             return recv_meth(*args)
320
321         while True:
322             try:
323                 # recv: bufsize=0?
324                 # recv_into: buffer is empty?
325                 # This is needed because behind the scenes we use sockets in
326                 # nonblocking mode and builtin recv* methods. Attempting to read
327                 # 0 bytes from a nonblocking socket using a builtin recv* method
328                 # does not raise a timeout exception. Since we're simulating
329                 # a blocking socket here we need to produce a timeout exception
330                 # if needed, hence the call to trampoline.
331                 if not args[0]:
332                     self._read_trampoline()
333                 return recv_meth(*args)
334             except socket.error as e:
335                 if get_errno(e) in SOCKET_BLOCKING:
336                     pass
337                 elif get_errno(e) in SOCKET_CLOSED:
338                     return b''
339                 else:
340                     raise
341
342             try:
343                 self._read_trampoline()
344             except IOClosed as e:
345                 # Perhaps we should return '' instead?
346                 raise EOFError()
347
348     def recv(self, bufsize, flags=0):
349         return self._recv_loop(self.fd.recv, bufsize, flags)
350
351     def recvfrom(self, bufsize, flags=0):
352         return self._recv_loop(self.fd.recvfrom, bufsize, flags)
353
354     def recv_into(self, buffer, nbytes=0, flags=0):
355         return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags)
356
357     def recvfrom_into(self, buffer, nbytes=0, flags=0):
358         return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags)
359
360     def _send_loop(self, send_method, data, *args):
361         if self.act_non_blocking:
362             return send_method(data, *args)
363
364         while 1:
365             try:
366                 return send_method(data, *args)
367             except socket.error as e:
368                 eno = get_errno(e)
369                 if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
370                     raise
371
372             try:
373                 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
374                                  timeout_exc=socket.timeout("timed out"))
375             except IOClosed:
376                 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
377
378     def send(self, data, flags=0):
379         return self._send_loop(self.fd.send, data, flags)
380
381     def sendto(self, data, *args):
382         return self._send_loop(self.fd.sendto, data, *args)
383
384     def sendall(self, data, flags=0):
385         tail = self.send(data, flags)
386         len_data = len(data)
387         while tail < len_data:
388             tail += self.send(data[tail:], flags)
389
390     def setblocking(self, flag):
391         if flag:
392             self.act_non_blocking = False
393             self._timeout = None
394         else:
395             self.act_non_blocking = True
396             self._timeout = 0.0
397
398     def settimeout(self, howlong):
399         if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
400             self.setblocking(True)
401             return
402         try:
403             f = howlong.__float__
404         except AttributeError:
405             raise TypeError('a float is required')
406         howlong = f()
407         if howlong < 0.0:
408             raise ValueError('Timeout value out of range')
409         if howlong == 0.0:
410             self.act_non_blocking = True
411             self._timeout = 0.0
412         else:
413             self.act_non_blocking = False
414             self._timeout = howlong
415
416     def gettimeout(self):
417         return self._timeout
418
419     if "__pypy__" in sys.builtin_module_names:
420         def _reuse(self):
421             getattr(self.fd, '_sock', self.fd)._reuse()
422
423         def _drop(self):
424             getattr(self.fd, '_sock', self.fd)._drop()
425
426
427 def _operation_on_closed_file(*args, **kwargs):
428     raise ValueError("I/O operation on closed file")
429
430
431 greenpipe_doc = """
432     GreenPipe is a cooperative replacement for file class.
433     It will cooperate on pipes. It will block on regular file.
434     Differneces from file class:
435     - mode is r/w property. Should re r/o
436     - encoding property not implemented
437     - write/writelines will not raise TypeError exception when non-string data is written
438       it will write str(data) instead
439     - Universal new lines are not supported and newlines property not implementeded
440     - file argument can be descriptor, file name or file object.
441     """
442
443 # import SSL module here so we can refer to greenio.SSL.exceptionclass
444 try:
445     from OpenSSL import SSL
446 except ImportError:
447     # pyOpenSSL not installed, define exceptions anyway for convenience
448     class SSL(object):
449         class WantWriteError(Exception):
450             pass
451
452         class WantReadError(Exception):
453             pass
454
455         class ZeroReturnError(Exception):
456             pass
457
458         class SysCallError(Exception):
459             pass
460
461
462 def shutdown_safe(sock):
463     """ Shuts down the socket. This is a convenience method for
464     code that wants to gracefully handle regular sockets, SSL.Connection
465     sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
466     interchangeably.  Both types of ssl socket require a shutdown() before
467     close, but they have different arity on their shutdown method.
468
469     Regular sockets don't need a shutdown before close, but it doesn't hurt.
470     """
471     try:
472         try:
473             # socket, ssl.SSLSocket
474             return sock.shutdown(socket.SHUT_RDWR)
475         except TypeError:
476             # SSL.Connection
477             return sock.shutdown()
478     except socket.error as e:
479         # we don't care if the socket is already closed;
480         # this will often be the case in an http server context
481         if get_errno(e) not in (errno.ENOTCONN, errno.EBADF):
482             raise