8a94b7bddd80d1b1d45460ddac34e0f15192260d
[packages/trusty/python-eventlet.git] / python-eventlet / tests / greenio_test.py
1 import array
2 import errno
3 import eventlet
4 import fcntl
5 import gc
6 import os
7 import shutil
8 import socket as _orig_sock
9 import sys
10 import tempfile
11
12 from nose.tools import eq_
13
14 from eventlet import event, greenio, debug
15 from eventlet.hubs import get_hub
16 from eventlet.green import select, socket, time, ssl
17 from eventlet.support import capture_stderr, get_errno, six
18 import tests
19
20
21 if six.PY3:
22     buffer = memoryview
23
24
25 def bufsized(sock, size=1):
26     """ Resize both send and receive buffers on a socket.
27     Useful for testing trampoline.  Returns the socket.
28
29     >>> import socket
30     >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
31     """
32     sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
33     sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
34     return sock
35
36
37 def min_buf_size():
38     """Return the minimum buffer size that the platform supports."""
39     test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
40     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
41     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
42
43
44 def using_epoll_hub(_f):
45     try:
46         return 'epolls' in type(get_hub()).__module__
47     except Exception:
48         return False
49
50
51 def using_kqueue_hub(_f):
52     try:
53         return 'kqueue' in type(get_hub()).__module__
54     except Exception:
55         return False
56
57
58 class TestGreenSocket(tests.LimitedTestCase):
59     def assertWriteToClosedFileRaises(self, fd):
60         if sys.version_info[0] < 3:
61             # 2.x socket._fileobjects are odd: writes don't check
62             # whether the socket is closed or not, and you get an
63             # AttributeError during flush if it is closed
64             fd.write(b'a')
65             self.assertRaises(Exception, fd.flush)
66         else:
67             # 3.x io write to closed file-like pbject raises ValueError
68             self.assertRaises(ValueError, fd.write, b'a')
69
70     def test_connect_timeout(self):
71         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
72         s.settimeout(0.1)
73         gs = greenio.GreenSocket(s)
74         try:
75             gs.connect(('192.0.2.1', 80))
76             self.fail("socket.timeout not raised")
77         except socket.timeout as e:
78             assert hasattr(e, 'args')
79             self.assertEqual(e.args[0], 'timed out')
80         except socket.error as e:
81             # unreachable is also a valid outcome
82             if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
83                 raise
84
85     def test_accept_timeout(self):
86         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87         s.bind(('', 0))
88         s.listen(50)
89
90         s.settimeout(0.1)
91         gs = greenio.GreenSocket(s)
92         try:
93             gs.accept()
94             self.fail("socket.timeout not raised")
95         except socket.timeout as e:
96             assert hasattr(e, 'args')
97             self.assertEqual(e.args[0], 'timed out')
98
99     def test_connect_ex_timeout(self):
100         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101         s.settimeout(0.1)
102         gs = greenio.GreenSocket(s)
103         e = gs.connect_ex(('192.0.2.1', 80))
104         if e not in (errno.EHOSTUNREACH, errno.ENETUNREACH):
105             self.assertEqual(e, errno.EAGAIN)
106
107     def test_recv_timeout(self):
108         listener = greenio.GreenSocket(socket.socket())
109         listener.bind(('', 0))
110         listener.listen(50)
111
112         evt = event.Event()
113
114         def server():
115             # accept the connection in another greenlet
116             sock, addr = listener.accept()
117             evt.wait()
118
119         gt = eventlet.spawn(server)
120
121         addr = listener.getsockname()
122
123         client = greenio.GreenSocket(socket.socket())
124         client.settimeout(0.1)
125
126         client.connect(addr)
127
128         try:
129             client.recv(8192)
130             self.fail("socket.timeout not raised")
131         except socket.timeout as e:
132             assert hasattr(e, 'args')
133             self.assertEqual(e.args[0], 'timed out')
134
135         evt.send()
136         gt.wait()
137
138     def test_recvfrom_timeout(self):
139         gs = greenio.GreenSocket(
140             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
141         gs.settimeout(.1)
142         gs.bind(('', 0))
143
144         try:
145             gs.recvfrom(8192)
146             self.fail("socket.timeout not raised")
147         except socket.timeout as e:
148             assert hasattr(e, 'args')
149             self.assertEqual(e.args[0], 'timed out')
150
151     def test_recvfrom_into_timeout(self):
152         buf = array.array('B')
153
154         gs = greenio.GreenSocket(
155             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
156         gs.settimeout(.1)
157         gs.bind(('', 0))
158
159         try:
160             gs.recvfrom_into(buf)
161             self.fail("socket.timeout not raised")
162         except socket.timeout as e:
163             assert hasattr(e, 'args')
164             self.assertEqual(e.args[0], 'timed out')
165
166     def test_recv_into_timeout(self):
167         buf = array.array('B')
168
169         listener = greenio.GreenSocket(socket.socket())
170         listener.bind(('', 0))
171         listener.listen(50)
172
173         evt = event.Event()
174
175         def server():
176             # accept the connection in another greenlet
177             sock, addr = listener.accept()
178             evt.wait()
179
180         gt = eventlet.spawn(server)
181
182         addr = listener.getsockname()
183
184         client = greenio.GreenSocket(socket.socket())
185         client.settimeout(0.1)
186
187         client.connect(addr)
188
189         try:
190             client.recv_into(buf)
191             self.fail("socket.timeout not raised")
192         except socket.timeout as e:
193             assert hasattr(e, 'args')
194             self.assertEqual(e.args[0], 'timed out')
195
196         evt.send()
197         gt.wait()
198
199     def test_send_timeout(self):
200         self.reset_timeout(2)
201         listener = bufsized(eventlet.listen(('', 0)))
202
203         evt = event.Event()
204
205         def server():
206             # accept the connection in another greenlet
207             sock, addr = listener.accept()
208             sock = bufsized(sock)
209             evt.wait()
210
211         gt = eventlet.spawn(server)
212
213         addr = listener.getsockname()
214
215         client = bufsized(greenio.GreenSocket(socket.socket()))
216         client.connect(addr)
217         try:
218             client.settimeout(0.00001)
219             msg = b"A" * 100000  # large enough number to overwhelm most buffers
220
221             total_sent = 0
222             # want to exceed the size of the OS buffer so it'll block in a
223             # single send
224             for x in range(10):
225                 total_sent += client.send(msg)
226             self.fail("socket.timeout not raised")
227         except socket.timeout as e:
228             assert hasattr(e, 'args')
229             self.assertEqual(e.args[0], 'timed out')
230
231         evt.send()
232         gt.wait()
233
234     def test_sendall_timeout(self):
235         listener = greenio.GreenSocket(socket.socket())
236         listener.bind(('', 0))
237         listener.listen(50)
238
239         evt = event.Event()
240
241         def server():
242             # accept the connection in another greenlet
243             sock, addr = listener.accept()
244             evt.wait()
245
246         gt = eventlet.spawn(server)
247
248         addr = listener.getsockname()
249
250         client = greenio.GreenSocket(socket.socket())
251         client.settimeout(0.1)
252         client.connect(addr)
253
254         try:
255             msg = b"A" * (8 << 20)
256
257             # want to exceed the size of the OS buffer so it'll block
258             client.sendall(msg)
259             self.fail("socket.timeout not raised")
260         except socket.timeout as e:
261             assert hasattr(e, 'args')
262             self.assertEqual(e.args[0], 'timed out')
263
264         evt.send()
265         gt.wait()
266
267     def test_close_with_makefile(self):
268         def accept_close_early(listener):
269             # verify that the makefile and the socket are truly independent
270             # by closing the socket prior to using the made file
271             try:
272                 conn, addr = listener.accept()
273                 fd = conn.makefile('wb')
274                 conn.close()
275                 fd.write(b'hello\n')
276                 fd.close()
277                 self.assertWriteToClosedFileRaises(fd)
278                 self.assertRaises(socket.error, conn.send, b'b')
279             finally:
280                 listener.close()
281
282         def accept_close_late(listener):
283             # verify that the makefile and the socket are truly independent
284             # by closing the made file and then sending a character
285             try:
286                 conn, addr = listener.accept()
287                 fd = conn.makefile('wb')
288                 fd.write(b'hello')
289                 fd.close()
290                 conn.send(b'\n')
291                 conn.close()
292                 self.assertWriteToClosedFileRaises(fd)
293                 self.assertRaises(socket.error, conn.send, b'b')
294             finally:
295                 listener.close()
296
297         def did_it_work(server):
298             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
299             client.connect(('127.0.0.1', server.getsockname()[1]))
300             fd = client.makefile('rb')
301             client.close()
302             assert fd.readline() == b'hello\n'
303             assert fd.read() == b''
304             fd.close()
305
306         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
307         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
308         server.bind(('0.0.0.0', 0))
309         server.listen(50)
310         killer = eventlet.spawn(accept_close_early, server)
311         did_it_work(server)
312         killer.wait()
313
314         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
315         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
316         server.bind(('0.0.0.0', 0))
317         server.listen(50)
318         killer = eventlet.spawn(accept_close_late, server)
319         did_it_work(server)
320         killer.wait()
321
322     def test_del_closes_socket(self):
323         def accept_once(listener):
324             # delete/overwrite the original conn
325             # object, only keeping the file object around
326             # closing the file object should close everything
327             try:
328                 conn, addr = listener.accept()
329                 conn = conn.makefile('wb')
330                 conn.write(b'hello\n')
331                 conn.close()
332                 gc.collect()
333                 self.assertWriteToClosedFileRaises(conn)
334             finally:
335                 listener.close()
336
337         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
338         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
339         server.bind(('127.0.0.1', 0))
340         server.listen(50)
341         killer = eventlet.spawn(accept_once, server)
342         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
343         client.connect(('127.0.0.1', server.getsockname()[1]))
344         fd = client.makefile('rb')
345         client.close()
346         assert fd.read() == b'hello\n'
347         assert fd.read() == b''
348
349         killer.wait()
350
351     def test_full_duplex(self):
352         large_data = b'*' * 10 * min_buf_size()
353         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
354         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
355         listener.bind(('127.0.0.1', 0))
356         listener.listen(50)
357         bufsized(listener)
358
359         def send_large(sock):
360             sock.sendall(large_data)
361
362         def read_large(sock):
363             result = sock.recv(len(large_data))
364             while len(result) < len(large_data):
365                 result += sock.recv(len(large_data))
366             self.assertEqual(result, large_data)
367
368         def server():
369             (sock, addr) = listener.accept()
370             sock = bufsized(sock)
371             send_large_coro = eventlet.spawn(send_large, sock)
372             eventlet.sleep(0)
373             result = sock.recv(10)
374             expected = b'hello world'
375             while len(result) < len(expected):
376                 result += sock.recv(10)
377             self.assertEqual(result, expected)
378             send_large_coro.wait()
379
380         server_evt = eventlet.spawn(server)
381         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
382         client.connect(('127.0.0.1', listener.getsockname()[1]))
383         bufsized(client)
384         large_evt = eventlet.spawn(read_large, client)
385         eventlet.sleep(0)
386         client.sendall(b'hello world')
387         server_evt.wait()
388         large_evt.wait()
389         client.close()
390
391     def test_sendall(self):
392         # test adapted from Marcus Cavanaugh's email
393         # it may legitimately take a while, but will eventually complete
394         self.timer.cancel()
395         second_bytes = 10
396
397         def test_sendall_impl(many_bytes):
398             bufsize = max(many_bytes // 15, 2)
399
400             def sender(listener):
401                 (sock, addr) = listener.accept()
402                 sock = bufsized(sock, size=bufsize)
403                 sock.sendall(b'x' * many_bytes)
404                 sock.sendall(b'y' * second_bytes)
405
406             listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
407             listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
408             listener.bind(("", 0))
409             listener.listen(50)
410             sender_coro = eventlet.spawn(sender, listener)
411             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
412             client.connect(('127.0.0.1', listener.getsockname()[1]))
413             bufsized(client, size=bufsize)
414             total = 0
415             while total < many_bytes:
416                 data = client.recv(min(many_bytes - total, many_bytes // 10))
417                 if not data:
418                     break
419                 total += len(data)
420
421             total2 = 0
422             while total < second_bytes:
423                 data = client.recv(second_bytes)
424                 if not data:
425                     break
426                 total2 += len(data)
427
428             sender_coro.wait()
429             client.close()
430
431         for how_many in (1000, 10000, 100000, 1000000):
432             test_sendall_impl(how_many)
433
434     def test_wrap_socket(self):
435         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
436         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
437         sock.bind(('127.0.0.1', 0))
438         sock.listen(50)
439         ssl.wrap_socket(sock)
440
441     def test_timeout_and_final_write(self):
442         # This test verifies that a write on a socket that we've
443         # stopped listening for doesn't result in an incorrect switch
444         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
445         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
446         server.bind(('127.0.0.1', 0))
447         server.listen(50)
448         bound_port = server.getsockname()[1]
449
450         def sender(evt):
451             s2, addr = server.accept()
452             wrap_wfile = s2.makefile('wb')
453
454             eventlet.sleep(0.02)
455             wrap_wfile.write(b'hi')
456             s2.close()
457             evt.send(b'sent via event')
458
459         evt = event.Event()
460         eventlet.spawn(sender, evt)
461         # lets the socket enter accept mode, which
462         # is necessary for connect to succeed on windows
463         eventlet.sleep(0)
464         try:
465             # try and get some data off of this pipe
466             # but bail before any is sent
467             eventlet.Timeout(0.01)
468             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
469             client.connect(('127.0.0.1', bound_port))
470             wrap_rfile = client.makefile()
471             wrap_rfile.read(1)
472             self.fail()
473         except eventlet.TimeoutError:
474             pass
475
476         result = evt.wait()
477         self.assertEqual(result, b'sent via event')
478         server.close()
479         client.close()
480
481     @tests.skip_with_pyevent
482     def test_raised_multiple_readers(self):
483         debug.hub_prevent_multiple_readers(True)
484
485         def handle(sock, addr):
486             sock.recv(1)
487             sock.sendall(b"a")
488             raise eventlet.StopServe()
489
490         listener = eventlet.listen(('127.0.0.1', 0))
491         eventlet.spawn(eventlet.serve, listener, handle)
492
493         def reader(s):
494             s.recv(1)
495
496         s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
497         a = eventlet.spawn(reader, s)
498         eventlet.sleep(0)
499         self.assertRaises(RuntimeError, s.recv, 1)
500         s.sendall(b'b')
501         a.wait()
502
503     @tests.skip_with_pyevent
504     @tests.skip_if(using_epoll_hub)
505     @tests.skip_if(using_kqueue_hub)
506     def test_closure(self):
507         def spam_to_me(address):
508             sock = eventlet.connect(address)
509             while True:
510                 try:
511                     sock.sendall(b'hello world')
512                 except socket.error as e:
513                     if get_errno(e) == errno.EPIPE:
514                         return
515                     raise
516
517         server = eventlet.listen(('127.0.0.1', 0))
518         sender = eventlet.spawn(spam_to_me, server.getsockname())
519         client, address = server.accept()
520         server.close()
521
522         def reader():
523             try:
524                 while True:
525                     data = client.recv(1024)
526                     assert data
527             except socket.error as e:
528                 # we get an EBADF because client is closed in the same process
529                 # (but a different greenthread)
530                 if get_errno(e) != errno.EBADF:
531                     raise
532
533         def closer():
534             client.close()
535
536         reader = eventlet.spawn(reader)
537         eventlet.spawn_n(closer)
538         reader.wait()
539         sender.wait()
540
541     def test_invalid_connection(self):
542         # find an unused port by creating a socket then closing it
543         listening_socket = eventlet.listen(('127.0.0.1', 0))
544         port = listening_socket.getsockname()[1]
545         listening_socket.close()
546         self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
547
548     def test_zero_timeout_and_back(self):
549         listen = eventlet.listen(('', 0))
550         # Keep reference to server side of socket
551         server = eventlet.spawn(listen.accept)
552         client = eventlet.connect(listen.getsockname())
553
554         client.settimeout(0.05)
555         # Now must raise socket.timeout
556         self.assertRaises(socket.timeout, client.recv, 1)
557
558         client.settimeout(0)
559         # Now must raise socket.error with EAGAIN
560         try:
561             client.recv(1)
562             assert False
563         except socket.error as e:
564             assert get_errno(e) == errno.EAGAIN
565
566         client.settimeout(0.05)
567         # Now socket.timeout again
568         self.assertRaises(socket.timeout, client.recv, 1)
569         server.wait()
570
571     def test_default_nonblocking(self):
572         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
573         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
574         assert flags & os.O_NONBLOCK
575
576         sock2 = socket.socket(sock1.fd)
577         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
578         assert flags & os.O_NONBLOCK
579
580     def test_dup_nonblocking(self):
581         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
582         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
583         assert flags & os.O_NONBLOCK
584
585         sock2 = sock1.dup()
586         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
587         assert flags & os.O_NONBLOCK
588
589     def test_skip_nonblocking(self):
590         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
591         fd = sock1.fd.fileno()
592         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
593         flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
594         assert flags & os.O_NONBLOCK == 0
595
596         sock2 = socket.socket(sock1.fd, set_nonblocking=False)
597         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
598         assert flags & os.O_NONBLOCK == 0
599
600     def test_sockopt_interface(self):
601         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
602         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 0
603         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) == b'\000'
604         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
605
606     def test_socketpair_select(self):
607         # https://github.com/eventlet/eventlet/pull/25
608         s1, s2 = socket.socketpair()
609         assert select.select([], [s1], [], 0) == ([], [s1], [])
610         assert select.select([], [s1], [], 0) == ([], [s1], [])
611
612     def test_shutdown_safe(self):
613         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
614         sock.close()
615         # should not raise
616         greenio.shutdown_safe(sock)
617
618
619 def test_get_fileno_of_a_socket_works():
620     class DummySocket(object):
621         def fileno(self):
622             return 123
623     assert select.get_fileno(DummySocket()) == 123
624
625
626 def test_get_fileno_of_an_int_works():
627     assert select.get_fileno(123) == 123
628
629
630 expected_get_fileno_type_error_message = (
631     'Expected int or long, got <%s \'str\'>' % ('type' if six.PY2 else 'class'))
632
633
634 def test_get_fileno_of_wrong_type_fails():
635     try:
636         select.get_fileno('foo')
637     except TypeError as ex:
638         assert str(ex) == expected_get_fileno_type_error_message
639     else:
640         assert False, 'Expected TypeError not raised'
641
642
643 def test_get_fileno_of_a_socket_with_fileno_returning_wrong_type_fails():
644     class DummySocket(object):
645         def fileno(self):
646             return 'foo'
647     try:
648         select.get_fileno(DummySocket())
649     except TypeError as ex:
650         assert str(ex) == expected_get_fileno_type_error_message
651     else:
652         assert False, 'Expected TypeError not raised'
653
654
655 class TestGreenPipe(tests.LimitedTestCase):
656     @tests.skip_on_windows
657     def setUp(self):
658         super(self.__class__, self).setUp()
659         self.tempdir = tempfile.mkdtemp('_green_pipe_test')
660
661     def tearDown(self):
662         shutil.rmtree(self.tempdir)
663         super(self.__class__, self).tearDown()
664
665     def test_pipe(self):
666         r, w = os.pipe()
667         rf = greenio.GreenPipe(r, 'rb')
668         wf = greenio.GreenPipe(w, 'wb', 0)
669
670         def sender(f, content):
671             for ch in map(six.int2byte, six.iterbytes(content)):
672                 eventlet.sleep(0.0001)
673                 f.write(ch)
674             f.close()
675
676         one_line = b"12345\n"
677         eventlet.spawn(sender, wf, one_line * 5)
678         for i in range(5):
679             line = rf.readline()
680             eventlet.sleep(0.01)
681             self.assertEqual(line, one_line)
682         self.assertEqual(rf.readline(), b'')
683
684     def test_pipe_read(self):
685         # ensure that 'readline' works properly on GreenPipes when data is not
686         # immediately available (fd is nonblocking, was raising EAGAIN)
687         # also ensures that readline() terminates on '\n' and '\r\n'
688         r, w = os.pipe()
689
690         r = greenio.GreenPipe(r, 'rb')
691         w = greenio.GreenPipe(w, 'wb')
692
693         def writer():
694             eventlet.sleep(.1)
695
696             w.write(b'line\n')
697             w.flush()
698
699             w.write(b'line\r\n')
700             w.flush()
701
702         gt = eventlet.spawn(writer)
703
704         eventlet.sleep(0)
705
706         line = r.readline()
707         self.assertEqual(line, b'line\n')
708
709         line = r.readline()
710         self.assertEqual(line, b'line\r\n')
711
712         gt.wait()
713
714     def test_pipe_writes_large_messages(self):
715         r, w = os.pipe()
716
717         r = greenio.GreenPipe(r, 'rb')
718         w = greenio.GreenPipe(w, 'wb')
719
720         large_message = b"".join([1024 * six.int2byte(i) for i in range(65)])
721
722         def writer():
723             w.write(large_message)
724             w.close()
725
726         gt = eventlet.spawn(writer)
727
728         for i in range(65):
729             buf = r.read(1024)
730             expected = 1024 * six.int2byte(i)
731             self.assertEqual(
732                 buf, expected,
733                 "expected=%r..%r, found=%r..%r iter=%d"
734                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
735         gt.wait()
736
737     def test_seek_on_buffered_pipe(self):
738         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'wb+', 1024)
739         self.assertEqual(f.tell(), 0)
740         f.seek(0, 2)
741         self.assertEqual(f.tell(), 0)
742         f.write(b'1234567890')
743         f.seek(0, 2)
744         self.assertEqual(f.tell(), 10)
745         f.seek(0)
746         value = f.read(1)
747         self.assertEqual(value, b'1')
748         self.assertEqual(f.tell(), 1)
749         value = f.read(1)
750         self.assertEqual(value, b'2')
751         self.assertEqual(f.tell(), 2)
752         f.seek(0, 1)
753         self.assertEqual(f.readline(), b'34567890')
754         f.seek(-5, 1)
755         self.assertEqual(f.readline(), b'67890')
756         f.seek(0)
757         self.assertEqual(f.readline(), b'1234567890')
758         f.seek(0, 2)
759         self.assertEqual(f.readline(), b'')
760
761     def test_truncate(self):
762         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'wb+', 1024)
763         f.write(b'1234567890')
764         f.truncate(9)
765         self.assertEqual(f.tell(), 9)
766
767
768 class TestGreenIoLong(tests.LimitedTestCase):
769     TEST_TIMEOUT = 10  # the test here might take a while depending on the OS
770
771     @tests.skip_with_pyevent
772     def test_multiple_readers(self, clibufsize=False):
773         debug.hub_prevent_multiple_readers(False)
774         recvsize = 2 * min_buf_size()
775         sendsize = 10 * recvsize
776
777         # test that we can have multiple coroutines reading
778         # from the same fd.  We make no guarantees about which one gets which
779         # bytes, but they should both get at least some
780         def reader(sock, results):
781             while True:
782                 data = sock.recv(recvsize)
783                 if not data:
784                     break
785                 results.append(data)
786
787         results1 = []
788         results2 = []
789         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
790         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
791         listener.bind(('127.0.0.1', 0))
792         listener.listen(50)
793
794         def server():
795             (sock, addr) = listener.accept()
796             sock = bufsized(sock)
797             try:
798                 c1 = eventlet.spawn(reader, sock, results1)
799                 c2 = eventlet.spawn(reader, sock, results2)
800                 try:
801                     c1.wait()
802                     c2.wait()
803                 finally:
804                     c1.kill()
805                     c2.kill()
806             finally:
807                 sock.close()
808
809         server_coro = eventlet.spawn(server)
810         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
811         client.connect(('127.0.0.1', listener.getsockname()[1]))
812         if clibufsize:
813             bufsized(client, size=sendsize)
814         else:
815             bufsized(client)
816         client.sendall(b'*' * sendsize)
817         client.close()
818         server_coro.wait()
819         listener.close()
820         assert len(results1) > 0
821         assert len(results2) > 0
822         debug.hub_prevent_multiple_readers()
823
824     @tests.skipped  # by rdw because it fails but it's not clear how to make it pass
825     @tests.skip_with_pyevent
826     def test_multiple_readers2(self):
827         self.test_multiple_readers(clibufsize=True)
828
829
830 class TestGreenIoStarvation(tests.LimitedTestCase):
831     # fixme: this doesn't succeed, because of eventlet's predetermined
832     # ordering.  two processes, one with server, one with client eventlets
833     # might be more reliable?
834
835     TEST_TIMEOUT = 300  # the test here might take a while depending on the OS
836
837     @tests.skipped  # by rdw, because it fails but it's not clear how to make it pass
838     @tests.skip_with_pyevent
839     def test_server_starvation(self, sendloops=15):
840         recvsize = 2 * min_buf_size()
841         sendsize = 10000 * recvsize
842
843         results = [[] for i in range(5)]
844
845         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
846         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
847         listener.bind(('127.0.0.1', 0))
848         port = listener.getsockname()[1]
849         listener.listen(50)
850
851         base_time = time.time()
852
853         def server(my_results):
854             sock, addr = listener.accept()
855
856             datasize = 0
857
858             t1 = None
859             t2 = None
860             try:
861                 while True:
862                     data = sock.recv(recvsize)
863                     if not t1:
864                         t1 = time.time() - base_time
865                     if not data:
866                         t2 = time.time() - base_time
867                         my_results.append(datasize)
868                         my_results.append((t1, t2))
869                         break
870                     datasize += len(data)
871             finally:
872                 sock.close()
873
874         def client():
875             pid = os.fork()
876             if pid:
877                 return pid
878
879             client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
880             client.connect(('127.0.0.1', port))
881
882             bufsized(client, size=sendsize)
883
884             for i in range(sendloops):
885                 client.sendall(b'*' * sendsize)
886             client.close()
887             os._exit(0)
888
889         clients = []
890         servers = []
891         for r in results:
892             servers.append(eventlet.spawn(server, r))
893         for r in results:
894             clients.append(client())
895
896         for s in servers:
897             s.wait()
898         for c in clients:
899             os.waitpid(c, 0)
900
901         listener.close()
902
903         # now test that all of the server receive intervals overlap, and
904         # that there were no errors.
905         for r in results:
906             assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
907             assert r[0] == sendsize * sendloops
908             assert len(r[1]) == 2
909             assert r[1][0] is not None
910             assert r[1][1] is not None
911
912         starttimes = sorted(r[1][0] for r in results)
913         endtimes = sorted(r[1][1] for r in results)
914         runlengths = sorted(r[1][1] - r[1][0] for r in results)
915
916         # assert that the last task started before the first task ended
917         # (our no-starvation condition)
918         assert starttimes[-1] < endtimes[0], \
919             "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
920
921         maxstartdiff = starttimes[-1] - starttimes[0]
922
923         assert maxstartdiff * 2 < runlengths[0], \
924             "Largest difference in starting times more than twice the shortest running time!"
925         assert runlengths[0] * 2 > runlengths[-1], \
926             "Longest runtime more than twice as long as shortest!"
927
928
929 def test_set_nonblocking():
930     sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
931     fileno = sock.fileno()
932     orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
933     assert orig_flags & os.O_NONBLOCK == 0
934     greenio.set_nonblocking(sock)
935     new_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
936     assert new_flags == (orig_flags | os.O_NONBLOCK)
937
938
939 def test_socket_del_fails_gracefully_when_not_fully_initialized():
940     # Regression introduced in da87716714689894f23d0db7b003f26d97031e83, reported in:
941     # * GH #137 https://github.com/eventlet/eventlet/issues/137
942     # * https://bugs.launchpad.net/oslo.messaging/+bug/1369999
943
944     class SocketSubclass(socket.socket):
945
946         def __init__(self):
947             pass
948
949     with capture_stderr() as err:
950         SocketSubclass()
951
952     assert err.getvalue() == ''
953
954
955 def test_double_close_219():
956     tests.run_isolated('greenio_double_close_219.py')