1 import _pyio as _original_pyio
3 import os as _original_os
4 import socket as _original_socket
6 BufferedRandom as _OriginalBufferedRandom,
7 BufferedReader as _OriginalBufferedReader,
8 BufferedWriter as _OriginalBufferedWriter,
10 TextIOWrapper as _OriginalTextIOWrapper,
11 IOBase as _OriginalIOBase,
13 from types import FunctionType
15 from eventlet.greenio.base import (
16 _operation_on_closed_file,
21 from eventlet.hubs import notify_close, notify_opened, IOClosed, trampoline
22 from eventlet.support import get_errno, six
24 __all__ = ['_fileobject', 'GreenPipe']
26 # TODO get rid of this, it only seems like the original _fileobject
27 _fileobject = _original_socket.SocketIO
29 # Large part of the following code is copied from the original
30 # eventlet.greenio module
33 class GreenFileIO(_OriginalIOBase):
34 def __init__(self, name, mode='r', closefd=True, opener=None):
35 if isinstance(name, int):
37 self._name = "<fd:%d>" % fileno
39 assert isinstance(name, six.string_types)
40 with open(name, mode) as fd:
42 fileno = _original_os.dup(fd.fileno())
56 if self._seekable is None:
58 _original_os.lseek(self._fileno, 0, _original_os.SEEK_CUR)
60 if get_errno(e) == errno.ESPIPE:
61 self._seekable = False
70 return 'r' in self._mode or '+' in self._mode
73 return 'w' in self._mode or '+' in self._mode
78 def read(self, size=-1):
84 return _original_os.read(self._fileno, size)
86 if get_errno(e) not in SOCKET_BLOCKING:
87 raise IOError(*e.args)
88 self._trampoline(self, read=True)
94 chunk = _original_os.read(self._fileno, DEFAULT_BUFFER_SIZE)
99 if get_errno(e) not in SOCKET_BLOCKING:
100 raise IOError(*e.args)
101 self._trampoline(self, read=True)
103 def readinto(self, b):
105 data = self.read(up_to)
106 bytes_read = len(data)
107 b[:bytes_read] = data
112 return _original_os.isatty(self.fileno())
114 raise IOError(*e.args)
116 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
118 # Don't trampoline if we're already closed.
121 return trampoline(fd, read=read, write=write, timeout=timeout,
122 timeout_exc=timeout_exc,
123 mark_as_closed=self._mark_as_closed)
125 # Our fileno has been obsoleted. Defang ourselves to
126 # prevent spurious closes.
127 self._mark_as_closed()
130 def _mark_as_closed(self):
131 """ Mark this socket as being closed """
134 def write(self, data):
135 view = memoryview(data)
138 while offset < datalen:
140 written = _original_os.write(self._fileno, view[offset:])
142 if get_errno(e) not in SOCKET_BLOCKING:
143 raise IOError(*e.args)
144 trampoline(self, write=True)
152 _original_os.close(self._fileno)
153 notify_close(self._fileno)
155 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
156 'readline', 'readlines', 'seek', 'tell', 'truncate',
157 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
158 setattr(self, method, _operation_on_closed_file)
160 def truncate(self, size=-1):
164 rv = _original_os.ftruncate(self._fileno, size)
166 raise IOError(*e.args)
168 self.seek(size) # move position&clear buffer
171 def seek(self, offset, whence=_original_os.SEEK_SET):
173 return _original_os.lseek(self._fileno, offset, whence)
175 raise IOError(*e.args)
180 def __exit__(self, *args):
184 _open_environment = dict(globals())
185 _open_environment.update(dict(
186 BufferedRandom=_OriginalBufferedRandom,
187 BufferedWriter=_OriginalBufferedWriter,
188 BufferedReader=_OriginalBufferedReader,
189 TextIOWrapper=_OriginalTextIOWrapper,
194 _open = FunctionType(
195 six.get_function_code(_original_pyio.open),
200 def GreenPipe(name, mode="r", buffering=-1, encoding=None, errors=None,
201 newline=None, closefd=True, opener=None):
203 fileno = name.fileno()
204 except AttributeError:
207 fileno = _original_os.dup(fileno)
211 return _open(name, mode, buffering, encoding, errors, newline, closefd, opener)
213 GreenPipe.__doc__ = greenpipe_doc