4 from eventlet.greenio.base import (
5 _operation_on_closed_file,
11 from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
12 from eventlet.support import get_errno, six
14 __all__ = ['_fileobject', 'GreenPipe']
16 _fileobject = socket._fileobject
19 class GreenPipe(_fileobject):
21 __doc__ = greenpipe_doc
23 def __init__(self, f, mode='r', bufsize=-1):
24 if not isinstance(f, six.string_types + (int, file)):
25 raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
27 if isinstance(f, six.string_types):
30 if isinstance(f, int):
32 self._name = "<fd:%d>" % fileno
34 fileno = os.dup(f.fileno())
37 raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
41 super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
50 return "<%s %s %r, mode %r at 0x%x>" % (
51 self.closed and 'closed' or 'open',
52 self.__class__.__name__,
55 (id(self) < 0) and (sys.maxint + id(self)) or id(self))
58 super(GreenPipe, self).close()
60 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
61 'readline', 'readlines', 'seek', 'tell', 'truncate',
62 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
63 setattr(self, method, _operation_on_closed_file)
68 def __exit__(self, *args):
71 def _get_readahead_len(self):
72 return len(self._rbuf.getvalue())
74 def _clear_readahead_buf(self):
75 len = self._get_readahead_len()
82 return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
84 raise IOError(*e.args)
86 def seek(self, offset, whence=0):
88 if whence == 1 and offset == 0: # tell synonym
90 if whence == 1: # adjust offset by what is read ahead
91 offset -= self._get_readahead_len()
93 rv = os.lseek(self.fileno(), offset, whence)
95 raise IOError(*e.args)
97 self._clear_readahead_buf()
100 if getattr(file, "truncate", None): # not all OSes implement truncate
101 def truncate(self, size=-1):
106 rv = os.ftruncate(self.fileno(), size)
108 raise IOError(*e.args)
110 self.seek(size) # move position&clear buffer
115 return os.isatty(self.fileno())
117 raise IOError(*e.args)
120 class _SocketDuckForFd(object):
121 """Class implementing all socket method used by _fileobject
122 in cooperative manner using low level os I/O calls.
126 def __init__(self, fileno):
127 self._fileno = fileno
128 notify_opened(fileno)
131 def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
133 # Don't trampoline if we're already closed.
136 return trampoline(fd, read=read, write=write, timeout=timeout,
137 timeout_exc=timeout_exc,
138 mark_as_closed=self._mark_as_closed)
140 # Our fileno has been obsoleted. Defang ourselves to
141 # prevent spurious closes.
142 self._mark_as_closed()
145 def _mark_as_closed(self):
146 current = self._closed
157 def recv(self, buflen):
160 data = os.read(self._fileno, buflen)
163 if get_errno(e) not in SOCKET_BLOCKING:
164 raise IOError(*e.args)
165 self._trampoline(self, read=True)
167 def recv_into(self, buf, nbytes=0, flags=0):
170 data = self.recv(nbytes)
174 def send(self, data):
177 return os.write(self._fileno, data)
179 if get_errno(e) not in SOCKET_BLOCKING:
180 raise IOError(*e.args)
182 trampoline(self, write=True)
184 def sendall(self, data):
187 fileno = self._fileno
189 total_sent = os_write(fileno, data)
191 if get_errno(e) != errno.EAGAIN:
192 raise IOError(*e.args)
194 while total_sent < len_data:
195 self._trampoline(self, write=True)
197 total_sent += os_write(fileno, data[total_sent:])
199 if get_errno(e) != errno. EAGAIN:
200 raise IOError(*e.args)
206 was_closed = self._mark_as_closed()
209 notify_close(self._fileno)
211 os.close(self._fileno)
213 # os.close may fail if __init__ didn't complete
214 # (i.e file dscriptor passed to popen was invalid
218 return "%s:%d" % (self.__class__.__name__, self._fileno)
225 if self._refcount == 0: