Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / greenio_test.py
1 import array
2 import errno
3 import eventlet
4 import fcntl
5 import gc
6 from io import DEFAULT_BUFFER_SIZE
7 import os
8 import shutil
9 import socket as _orig_sock
10 import sys
11 import tempfile
12
13 from nose.tools import eq_
14
15 import eventlet
16 from eventlet import event, greenio, debug
17 from eventlet.hubs import get_hub
18 from eventlet.green import select, socket, time, ssl
19 from eventlet.support import capture_stderr, get_errno, six
20 import tests
21
22
23 def bufsized(sock, size=1):
24     """ Resize both send and receive buffers on a socket.
25     Useful for testing trampoline.  Returns the socket.
26
27     >>> import socket
28     >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
29     """
30     sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
31     sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
32     return sock
33
34
35 def expect_socket_timeout(function, *args):
36     try:
37         function(*args)
38         raise AssertionError("socket.timeout not raised")
39     except socket.timeout as e:
40         assert hasattr(e, 'args')
41         eq_(e.args[0], 'timed out')
42
43
44 def min_buf_size():
45     """Return the minimum buffer size that the platform supports."""
46     test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
47     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
48     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
49
50
51 def using_epoll_hub(_f):
52     try:
53         return 'epolls' in type(get_hub()).__module__
54     except Exception:
55         return False
56
57
58 def using_kqueue_hub(_f):
59     try:
60         return 'kqueue' in type(get_hub()).__module__
61     except Exception:
62         return False
63
64
65 class TestGreenSocket(tests.LimitedTestCase):
66     def assertWriteToClosedFileRaises(self, fd):
67         if sys.version_info[0] < 3:
68             # 2.x socket._fileobjects are odd: writes don't check
69             # whether the socket is closed or not, and you get an
70             # AttributeError during flush if it is closed
71             fd.write(b'a')
72             self.assertRaises(Exception, fd.flush)
73         else:
74             # 3.x io write to closed file-like pbject raises ValueError
75             self.assertRaises(ValueError, fd.write, b'a')
76
77     def test_connect_timeout(self):
78         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
79         s.settimeout(0.1)
80         gs = greenio.GreenSocket(s)
81
82         try:
83             expect_socket_timeout(gs.connect, ('192.0.2.1', 80))
84         except socket.error as e:
85             # unreachable is also a valid outcome
86             if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
87                 raise
88
89     def test_accept_timeout(self):
90         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91         s.bind(('', 0))
92         s.listen(50)
93
94         s.settimeout(0.1)
95         gs = greenio.GreenSocket(s)
96         expect_socket_timeout(gs.accept)
97
98     def test_connect_ex_timeout(self):
99         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
100         s.settimeout(0.1)
101         gs = greenio.GreenSocket(s)
102         e = gs.connect_ex(('192.0.2.1', 80))
103         if e not in (errno.EHOSTUNREACH, errno.ENETUNREACH):
104             self.assertEqual(e, errno.EAGAIN)
105
106     def test_recv_timeout(self):
107         listener = greenio.GreenSocket(socket.socket())
108         listener.bind(('', 0))
109         listener.listen(50)
110
111         evt = event.Event()
112
113         def server():
114             # accept the connection in another greenlet
115             sock, addr = listener.accept()
116             evt.wait()
117
118         gt = eventlet.spawn(server)
119
120         addr = listener.getsockname()
121
122         client = greenio.GreenSocket(socket.socket())
123         client.settimeout(0.1)
124
125         client.connect(addr)
126
127         expect_socket_timeout(client.recv, 0)
128         expect_socket_timeout(client.recv, 8192)
129
130         evt.send()
131         gt.wait()
132
133     def test_recvfrom_timeout(self):
134         gs = greenio.GreenSocket(
135             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
136         gs.settimeout(.1)
137         gs.bind(('', 0))
138
139         expect_socket_timeout(gs.recvfrom, 0)
140         expect_socket_timeout(gs.recvfrom, 8192)
141
142     def test_recvfrom_into_timeout(self):
143         buf = array.array('B')
144
145         gs = greenio.GreenSocket(
146             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
147         gs.settimeout(.1)
148         gs.bind(('', 0))
149
150         expect_socket_timeout(gs.recvfrom_into, buf)
151
152     def test_recv_into_timeout(self):
153         buf = array.array('B')
154
155         listener = greenio.GreenSocket(socket.socket())
156         listener.bind(('', 0))
157         listener.listen(50)
158
159         evt = event.Event()
160
161         def server():
162             # accept the connection in another greenlet
163             sock, addr = listener.accept()
164             evt.wait()
165
166         gt = eventlet.spawn(server)
167
168         addr = listener.getsockname()
169
170         client = greenio.GreenSocket(socket.socket())
171         client.settimeout(0.1)
172
173         client.connect(addr)
174
175         expect_socket_timeout(client.recv_into, buf)
176
177         evt.send()
178         gt.wait()
179
180     def test_send_timeout(self):
181         self.reset_timeout(2)
182         listener = bufsized(eventlet.listen(('', 0)))
183
184         evt = event.Event()
185
186         def server():
187             # accept the connection in another greenlet
188             sock, addr = listener.accept()
189             sock = bufsized(sock)
190             evt.wait()
191
192         gt = eventlet.spawn(server)
193
194         addr = listener.getsockname()
195
196         client = bufsized(greenio.GreenSocket(socket.socket()))
197         client.connect(addr)
198
199         client.settimeout(0.00001)
200         msg = b"A" * 100000  # large enough number to overwhelm most buffers
201
202         # want to exceed the size of the OS buffer so it'll block in a
203         # single send
204         def send():
205             for x in range(10):
206                 client.send(msg)
207
208         expect_socket_timeout(send)
209
210         evt.send()
211         gt.wait()
212
213     def test_sendall_timeout(self):
214         listener = greenio.GreenSocket(socket.socket())
215         listener.bind(('', 0))
216         listener.listen(50)
217
218         evt = event.Event()
219
220         def server():
221             # accept the connection in another greenlet
222             sock, addr = listener.accept()
223             evt.wait()
224
225         gt = eventlet.spawn(server)
226
227         addr = listener.getsockname()
228
229         client = greenio.GreenSocket(socket.socket())
230         client.settimeout(0.1)
231         client.connect(addr)
232
233         # want to exceed the size of the OS buffer so it'll block
234         msg = b"A" * (8 << 20)
235         expect_socket_timeout(client.sendall, msg)
236
237         evt.send()
238         gt.wait()
239
240     def test_close_with_makefile(self):
241         def accept_close_early(listener):
242             # verify that the makefile and the socket are truly independent
243             # by closing the socket prior to using the made file
244             try:
245                 conn, addr = listener.accept()
246                 fd = conn.makefile('wb')
247                 conn.close()
248                 fd.write(b'hello\n')
249                 fd.close()
250                 self.assertWriteToClosedFileRaises(fd)
251                 self.assertRaises(socket.error, conn.send, b'b')
252             finally:
253                 listener.close()
254
255         def accept_close_late(listener):
256             # verify that the makefile and the socket are truly independent
257             # by closing the made file and then sending a character
258             try:
259                 conn, addr = listener.accept()
260                 fd = conn.makefile('wb')
261                 fd.write(b'hello')
262                 fd.close()
263                 conn.send(b'\n')
264                 conn.close()
265                 self.assertWriteToClosedFileRaises(fd)
266                 self.assertRaises(socket.error, conn.send, b'b')
267             finally:
268                 listener.close()
269
270         def did_it_work(server):
271             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
272             client.connect(('127.0.0.1', server.getsockname()[1]))
273             fd = client.makefile('rb')
274             client.close()
275             assert fd.readline() == b'hello\n'
276             assert fd.read() == b''
277             fd.close()
278
279         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
280         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
281         server.bind(('0.0.0.0', 0))
282         server.listen(50)
283         killer = eventlet.spawn(accept_close_early, server)
284         did_it_work(server)
285         killer.wait()
286
287         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
288         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
289         server.bind(('0.0.0.0', 0))
290         server.listen(50)
291         killer = eventlet.spawn(accept_close_late, server)
292         did_it_work(server)
293         killer.wait()
294
295     def test_del_closes_socket(self):
296         def accept_once(listener):
297             # delete/overwrite the original conn
298             # object, only keeping the file object around
299             # closing the file object should close everything
300             try:
301                 conn, addr = listener.accept()
302                 conn = conn.makefile('wb')
303                 conn.write(b'hello\n')
304                 conn.close()
305                 gc.collect()
306                 self.assertWriteToClosedFileRaises(conn)
307             finally:
308                 listener.close()
309
310         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
311         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
312         server.bind(('127.0.0.1', 0))
313         server.listen(50)
314         killer = eventlet.spawn(accept_once, server)
315         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316         client.connect(('127.0.0.1', server.getsockname()[1]))
317         fd = client.makefile('rb')
318         client.close()
319         assert fd.read() == b'hello\n'
320         assert fd.read() == b''
321
322         killer.wait()
323
324     def test_full_duplex(self):
325         large_data = b'*' * 10 * min_buf_size()
326         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
327         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
328         listener.bind(('127.0.0.1', 0))
329         listener.listen(50)
330         bufsized(listener)
331
332         def send_large(sock):
333             sock.sendall(large_data)
334
335         def read_large(sock):
336             result = sock.recv(len(large_data))
337             while len(result) < len(large_data):
338                 result += sock.recv(len(large_data))
339             self.assertEqual(result, large_data)
340
341         def server():
342             (sock, addr) = listener.accept()
343             sock = bufsized(sock)
344             send_large_coro = eventlet.spawn(send_large, sock)
345             eventlet.sleep(0)
346             result = sock.recv(10)
347             expected = b'hello world'
348             while len(result) < len(expected):
349                 result += sock.recv(10)
350             self.assertEqual(result, expected)
351             send_large_coro.wait()
352
353         server_evt = eventlet.spawn(server)
354         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
355         client.connect(('127.0.0.1', listener.getsockname()[1]))
356         bufsized(client)
357         large_evt = eventlet.spawn(read_large, client)
358         eventlet.sleep(0)
359         client.sendall(b'hello world')
360         server_evt.wait()
361         large_evt.wait()
362         client.close()
363
364     def test_sendall(self):
365         # test adapted from Marcus Cavanaugh's email
366         # it may legitimately take a while, but will eventually complete
367         self.timer.cancel()
368         second_bytes = 10
369
370         def test_sendall_impl(many_bytes):
371             bufsize = max(many_bytes // 15, 2)
372
373             def sender(listener):
374                 (sock, addr) = listener.accept()
375                 sock = bufsized(sock, size=bufsize)
376                 sock.sendall(b'x' * many_bytes)
377                 sock.sendall(b'y' * second_bytes)
378
379             listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
380             listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
381             listener.bind(("", 0))
382             listener.listen(50)
383             sender_coro = eventlet.spawn(sender, listener)
384             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
385             client.connect(('127.0.0.1', listener.getsockname()[1]))
386             bufsized(client, size=bufsize)
387             total = 0
388             while total < many_bytes:
389                 data = client.recv(min(many_bytes - total, many_bytes // 10))
390                 if not data:
391                     break
392                 total += len(data)
393
394             total2 = 0
395             while total < second_bytes:
396                 data = client.recv(second_bytes)
397                 if not data:
398                     break
399                 total2 += len(data)
400
401             sender_coro.wait()
402             client.close()
403
404         for how_many in (1000, 10000, 100000, 1000000):
405             test_sendall_impl(how_many)
406
407     def test_wrap_socket(self):
408         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
409         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
410         sock.bind(('127.0.0.1', 0))
411         sock.listen(50)
412         ssl.wrap_socket(sock)
413
414     def test_timeout_and_final_write(self):
415         # This test verifies that a write on a socket that we've
416         # stopped listening for doesn't result in an incorrect switch
417         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
418         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
419         server.bind(('127.0.0.1', 0))
420         server.listen(50)
421         bound_port = server.getsockname()[1]
422
423         def sender(evt):
424             s2, addr = server.accept()
425             wrap_wfile = s2.makefile('wb')
426
427             eventlet.sleep(0.02)
428             wrap_wfile.write(b'hi')
429             s2.close()
430             evt.send(b'sent via event')
431
432         evt = event.Event()
433         eventlet.spawn(sender, evt)
434         # lets the socket enter accept mode, which
435         # is necessary for connect to succeed on windows
436         eventlet.sleep(0)
437         try:
438             # try and get some data off of this pipe
439             # but bail before any is sent
440             eventlet.Timeout(0.01)
441             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
442             client.connect(('127.0.0.1', bound_port))
443             wrap_rfile = client.makefile()
444             wrap_rfile.read(1)
445             self.fail()
446         except eventlet.TimeoutError:
447             pass
448
449         result = evt.wait()
450         self.assertEqual(result, b'sent via event')
451         server.close()
452         client.close()
453
454     @tests.skip_with_pyevent
455     def test_raised_multiple_readers(self):
456         debug.hub_prevent_multiple_readers(True)
457
458         def handle(sock, addr):
459             sock.recv(1)
460             sock.sendall(b"a")
461             raise eventlet.StopServe()
462
463         listener = eventlet.listen(('127.0.0.1', 0))
464         eventlet.spawn(eventlet.serve, listener, handle)
465
466         def reader(s):
467             s.recv(1)
468
469         s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
470         a = eventlet.spawn(reader, s)
471         eventlet.sleep(0)
472         self.assertRaises(RuntimeError, s.recv, 1)
473         s.sendall(b'b')
474         a.wait()
475
476     @tests.skip_with_pyevent
477     @tests.skip_if(using_epoll_hub)
478     @tests.skip_if(using_kqueue_hub)
479     def test_closure(self):
480         def spam_to_me(address):
481             sock = eventlet.connect(address)
482             while True:
483                 try:
484                     sock.sendall(b'hello world')
485                     # Arbitrary delay to not use all available CPU, keeps the test
486                     # running quickly and reliably under a second
487                     time.sleep(0.001)
488                 except socket.error as e:
489                     if get_errno(e) == errno.EPIPE:
490                         return
491                     raise
492
493         server = eventlet.listen(('127.0.0.1', 0))
494         sender = eventlet.spawn(spam_to_me, server.getsockname())
495         client, address = server.accept()
496         server.close()
497
498         def reader():
499             try:
500                 while True:
501                     data = client.recv(1024)
502                     assert data
503                     # Arbitrary delay to not use all available CPU, keeps the test
504                     # running quickly and reliably under a second
505                     time.sleep(0.001)
506             except socket.error as e:
507                 # we get an EBADF because client is closed in the same process
508                 # (but a different greenthread)
509                 if get_errno(e) != errno.EBADF:
510                     raise
511
512         def closer():
513             client.close()
514
515         reader = eventlet.spawn(reader)
516         eventlet.spawn_n(closer)
517         reader.wait()
518         sender.wait()
519
520     def test_invalid_connection(self):
521         # find an unused port by creating a socket then closing it
522         listening_socket = eventlet.listen(('127.0.0.1', 0))
523         port = listening_socket.getsockname()[1]
524         listening_socket.close()
525         self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
526
527     def test_zero_timeout_and_back(self):
528         listen = eventlet.listen(('', 0))
529         # Keep reference to server side of socket
530         server = eventlet.spawn(listen.accept)
531         client = eventlet.connect(listen.getsockname())
532
533         client.settimeout(0.05)
534         # Now must raise socket.timeout
535         self.assertRaises(socket.timeout, client.recv, 1)
536
537         client.settimeout(0)
538         # Now must raise socket.error with EAGAIN
539         try:
540             client.recv(1)
541             assert False
542         except socket.error as e:
543             assert get_errno(e) == errno.EAGAIN
544
545         client.settimeout(0.05)
546         # Now socket.timeout again
547         self.assertRaises(socket.timeout, client.recv, 1)
548         server.wait()
549
550     def test_default_nonblocking(self):
551         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
552         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
553         assert flags & os.O_NONBLOCK
554
555         sock2 = socket.socket(sock1.fd)
556         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
557         assert flags & os.O_NONBLOCK
558
559     def test_dup_nonblocking(self):
560         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
561         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
562         assert flags & os.O_NONBLOCK
563
564         sock2 = sock1.dup()
565         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
566         assert flags & os.O_NONBLOCK
567
568     def test_skip_nonblocking(self):
569         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
570         fd = sock1.fd.fileno()
571         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
572         flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
573         assert flags & os.O_NONBLOCK == 0
574
575         sock2 = socket.socket(sock1.fd, set_nonblocking=False)
576         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
577         assert flags & os.O_NONBLOCK == 0
578
579     def test_sockopt_interface(self):
580         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
581         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 0
582         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) == b'\000'
583         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
584
585     def test_socketpair_select(self):
586         # https://github.com/eventlet/eventlet/pull/25
587         s1, s2 = socket.socketpair()
588         assert select.select([], [s1], [], 0) == ([], [s1], [])
589         assert select.select([], [s1], [], 0) == ([], [s1], [])
590
591     def test_shutdown_safe(self):
592         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
593         sock.close()
594         # should not raise
595         greenio.shutdown_safe(sock)
596
597     def test_datagram_socket_operations_work(self):
598         receiver = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
599         receiver.bind(('127.0.0.1', 0))
600         address = receiver.getsockname()
601
602         sender = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
603
604         # Two ways sendto can be called
605         sender.sendto(b'first', address)
606         sender.sendto(b'second', 0, address)
607
608         sender_address = ('127.0.0.1', sender.getsockname()[1])
609         eq_(receiver.recvfrom(1024), (b'first', sender_address))
610         eq_(receiver.recvfrom(1024), (b'second', sender_address))
611
612
613 def test_get_fileno_of_a_socket_works():
614     class DummySocket(object):
615         def fileno(self):
616             return 123
617     assert select.get_fileno(DummySocket()) == 123
618
619
620 def test_get_fileno_of_an_int_works():
621     assert select.get_fileno(123) == 123
622
623
624 expected_get_fileno_type_error_message = (
625     'Expected int or long, got <%s \'str\'>' % ('type' if six.PY2 else 'class'))
626
627
628 def test_get_fileno_of_wrong_type_fails():
629     try:
630         select.get_fileno('foo')
631     except TypeError as ex:
632         assert str(ex) == expected_get_fileno_type_error_message
633     else:
634         assert False, 'Expected TypeError not raised'
635
636
637 def test_get_fileno_of_a_socket_with_fileno_returning_wrong_type_fails():
638     class DummySocket(object):
639         def fileno(self):
640             return 'foo'
641     try:
642         select.get_fileno(DummySocket())
643     except TypeError as ex:
644         assert str(ex) == expected_get_fileno_type_error_message
645     else:
646         assert False, 'Expected TypeError not raised'
647
648
649 class TestGreenPipe(tests.LimitedTestCase):
650     @tests.skip_on_windows
651     def setUp(self):
652         super(self.__class__, self).setUp()
653         self.tempdir = tempfile.mkdtemp('_green_pipe_test')
654
655     def tearDown(self):
656         shutil.rmtree(self.tempdir)
657         super(self.__class__, self).tearDown()
658
659     def test_pipe(self):
660         r, w = os.pipe()
661         rf = greenio.GreenPipe(r, 'rb')
662         wf = greenio.GreenPipe(w, 'wb', 0)
663
664         def sender(f, content):
665             for ch in map(six.int2byte, six.iterbytes(content)):
666                 eventlet.sleep(0.0001)
667                 f.write(ch)
668             f.close()
669
670         one_line = b"12345\n"
671         eventlet.spawn(sender, wf, one_line * 5)
672         for i in range(5):
673             line = rf.readline()
674             eventlet.sleep(0.01)
675             self.assertEqual(line, one_line)
676         self.assertEqual(rf.readline(), b'')
677
678     def test_pipe_read(self):
679         # ensure that 'readline' works properly on GreenPipes when data is not
680         # immediately available (fd is nonblocking, was raising EAGAIN)
681         # also ensures that readline() terminates on '\n' and '\r\n'
682         r, w = os.pipe()
683
684         r = greenio.GreenPipe(r, 'rb')
685         w = greenio.GreenPipe(w, 'wb')
686
687         def writer():
688             eventlet.sleep(.1)
689
690             w.write(b'line\n')
691             w.flush()
692
693             w.write(b'line\r\n')
694             w.flush()
695
696         gt = eventlet.spawn(writer)
697
698         eventlet.sleep(0)
699
700         line = r.readline()
701         self.assertEqual(line, b'line\n')
702
703         line = r.readline()
704         self.assertEqual(line, b'line\r\n')
705
706         gt.wait()
707
708     def test_pip_read_until_end(self):
709         # similar to test_pip_read above but reading until eof
710         r, w = os.pipe()
711
712         r = greenio.GreenPipe(r, 'rb')
713         w = greenio.GreenPipe(w, 'wb')
714
715         w.write(b'c' * DEFAULT_BUFFER_SIZE * 2)
716         w.close()
717
718         buf = r.read()  # no chunk size specified; read until end
719         self.assertEqual(len(buf), 2 * DEFAULT_BUFFER_SIZE)
720         self.assertEqual(buf[:3], b'ccc')
721
722     def test_pipe_read_unbuffered(self):
723         # Ensure that seting the buffer size works properly on GreenPipes,
724         # it used to be ignored on Python 2 and the test would hang on r.readline()
725         # below.
726         r, w = os.pipe()
727
728         r = greenio.GreenPipe(r, 'rb', 0)
729         w = greenio.GreenPipe(w, 'wb', 0)
730
731         w.write(b'line\n')
732
733         line = r.readline()
734         self.assertEqual(line, b'line\n')
735         r.close()
736         w.close()
737
738     def test_pipe_writes_large_messages(self):
739         r, w = os.pipe()
740
741         r = greenio.GreenPipe(r, 'rb')
742         w = greenio.GreenPipe(w, 'wb')
743
744         large_message = b"".join([1024 * six.int2byte(i) for i in range(65)])
745
746         def writer():
747             w.write(large_message)
748             w.close()
749
750         gt = eventlet.spawn(writer)
751
752         for i in range(65):
753             buf = r.read(1024)
754             expected = 1024 * six.int2byte(i)
755             self.assertEqual(
756                 buf, expected,
757                 "expected=%r..%r, found=%r..%r iter=%d"
758                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
759         gt.wait()
760
761     def test_seek_on_buffered_pipe(self):
762         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'wb+', 1024)
763         self.assertEqual(f.tell(), 0)
764         f.seek(0, 2)
765         self.assertEqual(f.tell(), 0)
766         f.write(b'1234567890')
767         f.seek(0, 2)
768         self.assertEqual(f.tell(), 10)
769         f.seek(0)
770         value = f.read(1)
771         self.assertEqual(value, b'1')
772         self.assertEqual(f.tell(), 1)
773         value = f.read(1)
774         self.assertEqual(value, b'2')
775         self.assertEqual(f.tell(), 2)
776         f.seek(0, 1)
777         self.assertEqual(f.readline(), b'34567890')
778         f.seek(-5, 1)
779         self.assertEqual(f.readline(), b'67890')
780         f.seek(0)
781         self.assertEqual(f.readline(), b'1234567890')
782         f.seek(0, 2)
783         self.assertEqual(f.readline(), b'')
784
785     def test_truncate(self):
786         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'wb+', 1024)
787         f.write(b'1234567890')
788         f.truncate(9)
789         self.assertEqual(f.tell(), 9)
790
791
792 class TestGreenIoLong(tests.LimitedTestCase):
793     TEST_TIMEOUT = 10  # the test here might take a while depending on the OS
794
795     @tests.skip_with_pyevent
796     def test_multiple_readers(self):
797         debug.hub_prevent_multiple_readers(False)
798         recvsize = 2 * min_buf_size()
799         sendsize = 10 * recvsize
800
801         # test that we can have multiple coroutines reading
802         # from the same fd.  We make no guarantees about which one gets which
803         # bytes, but they should both get at least some
804         def reader(sock, results):
805             while True:
806                 data = sock.recv(recvsize)
807                 if not data:
808                     break
809                 results.append(data)
810
811         results1 = []
812         results2 = []
813         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
814         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
815         listener.bind(('127.0.0.1', 0))
816         listener.listen(50)
817
818         def server():
819             (sock, addr) = listener.accept()
820             sock = bufsized(sock)
821             try:
822                 c1 = eventlet.spawn(reader, sock, results1)
823                 c2 = eventlet.spawn(reader, sock, results2)
824                 try:
825                     c1.wait()
826                     c2.wait()
827                 finally:
828                     c1.kill()
829                     c2.kill()
830             finally:
831                 sock.close()
832
833         server_coro = eventlet.spawn(server)
834         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
835         client.connect(('127.0.0.1', listener.getsockname()[1]))
836         bufsized(client, size=sendsize)
837
838         # Split into multiple chunks so that we can wait a little
839         # every iteration which allows both readers to queue and
840         # recv some data when we actually send it.
841         for i in range(20):
842             eventlet.sleep(0.001)
843             client.sendall(b'*' * (sendsize // 20))
844
845         client.close()
846         server_coro.wait()
847         listener.close()
848         assert len(results1) > 0
849         assert len(results2) > 0
850         debug.hub_prevent_multiple_readers()
851
852
853 def test_set_nonblocking():
854     sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
855     fileno = sock.fileno()
856     orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
857     assert orig_flags & os.O_NONBLOCK == 0
858     greenio.set_nonblocking(sock)
859     new_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
860     assert new_flags == (orig_flags | os.O_NONBLOCK)
861
862
863 def test_socket_del_fails_gracefully_when_not_fully_initialized():
864     # Regression introduced in da87716714689894f23d0db7b003f26d97031e83, reported in:
865     # * GH #137 https://github.com/eventlet/eventlet/issues/137
866     # * https://bugs.launchpad.net/oslo.messaging/+bug/1369999
867
868     class SocketSubclass(socket.socket):
869
870         def __init__(self):
871             pass
872
873     with capture_stderr() as err:
874         SocketSubclass()
875
876     assert err.getvalue() == ''
877
878
879 def test_double_close_219():
880     tests.run_isolated('greenio_double_close_219.py')
881
882
883 def test_partial_write_295():
884     # https://github.com/eventlet/eventlet/issues/295
885     # `socket.makefile('w').writelines()` must send all
886     # despite partial writes by underlying socket
887     listen_socket = eventlet.listen(('localhost', 0))
888     original_accept = listen_socket.accept
889
890     def talk(conn):
891         f = conn.makefile('wb')
892         line = b'*' * 2140
893         f.writelines([line] * 10000)
894         conn.close()
895
896     def accept():
897         connection, address = original_accept()
898         original_send = connection.send
899
900         def slow_send(b, *args):
901             b = b[:1031]
902             return original_send(b, *args)
903
904         connection.send = slow_send
905         eventlet.spawn(talk, connection)
906         return connection, address
907
908     listen_socket.accept = accept
909
910     eventlet.spawn(listen_socket.accept)
911     sock = eventlet.connect(listen_socket.getsockname())
912     with eventlet.Timeout(10):
913         bs = sock.makefile('rb').read()
914     assert len(bs) == 21400000
915     assert bs == (b'*' * 21400000)
916
917
918 def test_socket_file_read_non_int():
919     listen_socket = eventlet.listen(('localhost', 0))
920
921     def server():
922         conn, _ = listen_socket.accept()
923         conn.recv(1)
924         conn.sendall('response')
925         conn.close()
926
927     eventlet.spawn(server)
928     sock = eventlet.connect(listen_socket.getsockname())
929
930     fd = sock.makefile('rwb')
931     fd.write(b'?')
932     fd.flush()
933     with eventlet.Timeout(1):
934         try:
935             fd.read("This shouldn't work")
936             assert False
937         except TypeError:
938             pass
939
940
941 def test_pipe_context():
942     # ensure using a pipe as a context actually closes it.
943     r, w = os.pipe()
944     r = greenio.GreenPipe(r)
945     w = greenio.GreenPipe(w, 'w')
946
947     with r:
948         pass
949     assert r.closed and not w.closed
950
951     with w as f:
952         assert f == w
953     assert r.closed and w.closed