Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / greenio_test.py
1 import array
2 import errno
3 import eventlet
4 import fcntl
5 import gc
6 import os
7 import shutil
8 import socket as _orig_sock
9 import sys
10 import tempfile
11
12 from eventlet import event, greenio, debug
13 from eventlet.hubs import get_hub
14 from eventlet.green import select, socket, time, ssl
15 from eventlet.support import get_errno, six
16 from tests import (
17     LimitedTestCase, main,
18     skip_with_pyevent, skipped, skip_if, skip_on_windows,
19 )
20
21
22 if six.PY3:
23     buffer = memoryview
24
25
26 def bufsized(sock, size=1):
27     """ Resize both send and receive buffers on a socket.
28     Useful for testing trampoline.  Returns the socket.
29
30     >>> import socket
31     >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
32     """
33     sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
34     sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
35     return sock
36
37
38 def min_buf_size():
39     """Return the minimum buffer size that the platform supports."""
40     test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
42     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
43
44
45 def using_epoll_hub(_f):
46     try:
47         return 'epolls' in type(get_hub()).__module__
48     except Exception:
49         return False
50
51
52 def using_kqueue_hub(_f):
53     try:
54         return 'kqueue' in type(get_hub()).__module__
55     except Exception:
56         return False
57
58
59 class TestGreenSocket(LimitedTestCase):
60     def assertWriteToClosedFileRaises(self, fd):
61         if sys.version_info[0] < 3:
62             # 2.x socket._fileobjects are odd: writes don't check
63             # whether the socket is closed or not, and you get an
64             # AttributeError during flush if it is closed
65             fd.write(b'a')
66             self.assertRaises(Exception, fd.flush)
67         else:
68             # 3.x io write to closed file-like pbject raises ValueError
69             self.assertRaises(ValueError, fd.write, b'a')
70
71     def test_connect_timeout(self):
72         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73         s.settimeout(0.1)
74         gs = greenio.GreenSocket(s)
75         try:
76             gs.connect(('192.0.2.1', 80))
77             self.fail("socket.timeout not raised")
78         except socket.timeout as e:
79             assert hasattr(e, 'args')
80             self.assertEqual(e.args[0], 'timed out')
81         except socket.error as e:
82             # unreachable is also a valid outcome
83             if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
84                 raise
85
86     def test_accept_timeout(self):
87         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
88         s.bind(('', 0))
89         s.listen(50)
90
91         s.settimeout(0.1)
92         gs = greenio.GreenSocket(s)
93         try:
94             gs.accept()
95             self.fail("socket.timeout not raised")
96         except socket.timeout as e:
97             assert hasattr(e, 'args')
98             self.assertEqual(e.args[0], 'timed out')
99
100     def test_connect_ex_timeout(self):
101         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
102         s.settimeout(0.1)
103         gs = greenio.GreenSocket(s)
104         e = gs.connect_ex(('192.0.2.1', 80))
105         if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH):
106             self.assertEqual(e, errno.EAGAIN)
107
108     def test_recv_timeout(self):
109         listener = greenio.GreenSocket(socket.socket())
110         listener.bind(('', 0))
111         listener.listen(50)
112
113         evt = event.Event()
114
115         def server():
116             # accept the connection in another greenlet
117             sock, addr = listener.accept()
118             evt.wait()
119
120         gt = eventlet.spawn(server)
121
122         addr = listener.getsockname()
123
124         client = greenio.GreenSocket(socket.socket())
125         client.settimeout(0.1)
126
127         client.connect(addr)
128
129         try:
130             client.recv(8192)
131             self.fail("socket.timeout not raised")
132         except socket.timeout as e:
133             assert hasattr(e, 'args')
134             self.assertEqual(e.args[0], 'timed out')
135
136         evt.send()
137         gt.wait()
138
139     def test_recvfrom_timeout(self):
140         gs = greenio.GreenSocket(
141             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
142         gs.settimeout(.1)
143         gs.bind(('', 0))
144
145         try:
146             gs.recvfrom(8192)
147             self.fail("socket.timeout not raised")
148         except socket.timeout as e:
149             assert hasattr(e, 'args')
150             self.assertEqual(e.args[0], 'timed out')
151
152     def test_recvfrom_into_timeout(self):
153         buf = array.array('B')
154
155         gs = greenio.GreenSocket(
156             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
157         gs.settimeout(.1)
158         gs.bind(('', 0))
159
160         try:
161             gs.recvfrom_into(buf)
162             self.fail("socket.timeout not raised")
163         except socket.timeout as e:
164             assert hasattr(e, 'args')
165             self.assertEqual(e.args[0], 'timed out')
166
167     def test_recv_into_timeout(self):
168         buf = array.array('B')
169
170         listener = greenio.GreenSocket(socket.socket())
171         listener.bind(('', 0))
172         listener.listen(50)
173
174         evt = event.Event()
175
176         def server():
177             # accept the connection in another greenlet
178             sock, addr = listener.accept()
179             evt.wait()
180
181         gt = eventlet.spawn(server)
182
183         addr = listener.getsockname()
184
185         client = greenio.GreenSocket(socket.socket())
186         client.settimeout(0.1)
187
188         client.connect(addr)
189
190         try:
191             client.recv_into(buf)
192             self.fail("socket.timeout not raised")
193         except socket.timeout as e:
194             assert hasattr(e, 'args')
195             self.assertEqual(e.args[0], 'timed out')
196
197         evt.send()
198         gt.wait()
199
200     def test_send_timeout(self):
201         self.reset_timeout(2)
202         listener = bufsized(eventlet.listen(('', 0)))
203
204         evt = event.Event()
205
206         def server():
207             # accept the connection in another greenlet
208             sock, addr = listener.accept()
209             sock = bufsized(sock)
210             evt.wait()
211
212         gt = eventlet.spawn(server)
213
214         addr = listener.getsockname()
215
216         client = bufsized(greenio.GreenSocket(socket.socket()))
217         client.connect(addr)
218         try:
219             client.settimeout(0.00001)
220             msg = b"A" * 100000  # large enough number to overwhelm most buffers
221
222             total_sent = 0
223             # want to exceed the size of the OS buffer so it'll block in a
224             # single send
225             for x in range(10):
226                 total_sent += client.send(msg)
227             self.fail("socket.timeout not raised")
228         except socket.timeout as e:
229             assert hasattr(e, 'args')
230             self.assertEqual(e.args[0], 'timed out')
231
232         evt.send()
233         gt.wait()
234
235     def test_sendall_timeout(self):
236         listener = greenio.GreenSocket(socket.socket())
237         listener.bind(('', 0))
238         listener.listen(50)
239
240         evt = event.Event()
241
242         def server():
243             # accept the connection in another greenlet
244             sock, addr = listener.accept()
245             evt.wait()
246
247         gt = eventlet.spawn(server)
248
249         addr = listener.getsockname()
250
251         client = greenio.GreenSocket(socket.socket())
252         client.settimeout(0.1)
253         client.connect(addr)
254
255         try:
256             msg = b"A" * (8 << 20)
257
258             # want to exceed the size of the OS buffer so it'll block
259             client.sendall(msg)
260             self.fail("socket.timeout not raised")
261         except socket.timeout as e:
262             assert hasattr(e, 'args')
263             self.assertEqual(e.args[0], 'timed out')
264
265         evt.send()
266         gt.wait()
267
268     def test_close_with_makefile(self):
269         def accept_close_early(listener):
270             # verify that the makefile and the socket are truly independent
271             # by closing the socket prior to using the made file
272             try:
273                 conn, addr = listener.accept()
274                 fd = conn.makefile('w')
275                 conn.close()
276                 fd.write(b'hello\n')
277                 fd.close()
278                 self.assertWriteToClosedFileRaises(fd)
279                 self.assertRaises(socket.error, conn.send, b'b')
280             finally:
281                 listener.close()
282
283         def accept_close_late(listener):
284             # verify that the makefile and the socket are truly independent
285             # by closing the made file and then sending a character
286             try:
287                 conn, addr = listener.accept()
288                 fd = conn.makefile('w')
289                 fd.write(b'hello')
290                 fd.close()
291                 conn.send(b'\n')
292                 conn.close()
293                 self.assertWriteToClosedFileRaises(fd)
294                 self.assertRaises(socket.error, conn.send, b'b')
295             finally:
296                 listener.close()
297
298         def did_it_work(server):
299             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
300             client.connect(('127.0.0.1', server.getsockname()[1]))
301             fd = client.makefile()
302             client.close()
303             assert fd.readline() == b'hello\n'
304             assert fd.read() == b''
305             fd.close()
306
307         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
308         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
309         server.bind(('0.0.0.0', 0))
310         server.listen(50)
311         killer = eventlet.spawn(accept_close_early, server)
312         did_it_work(server)
313         killer.wait()
314
315         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
317         server.bind(('0.0.0.0', 0))
318         server.listen(50)
319         killer = eventlet.spawn(accept_close_late, server)
320         did_it_work(server)
321         killer.wait()
322
323     def test_del_closes_socket(self):
324         def accept_once(listener):
325             # delete/overwrite the original conn
326             # object, only keeping the file object around
327             # closing the file object should close everything
328             try:
329                 conn, addr = listener.accept()
330                 conn = conn.makefile('w')
331                 conn.write(b'hello\n')
332                 conn.close()
333                 gc.collect()
334                 self.assertWriteToClosedFileRaises(conn)
335             finally:
336                 listener.close()
337
338         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
339         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
340         server.bind(('127.0.0.1', 0))
341         server.listen(50)
342         killer = eventlet.spawn(accept_once, server)
343         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344         client.connect(('127.0.0.1', server.getsockname()[1]))
345         fd = client.makefile()
346         client.close()
347         assert fd.read() == b'hello\n'
348         assert fd.read() == b''
349
350         killer.wait()
351
352     def test_full_duplex(self):
353         large_data = b'*' * 10 * min_buf_size()
354         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
355         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
356         listener.bind(('127.0.0.1', 0))
357         listener.listen(50)
358         bufsized(listener)
359
360         def send_large(sock):
361             sock.sendall(large_data)
362
363         def read_large(sock):
364             result = sock.recv(len(large_data))
365             while len(result) < len(large_data):
366                 result += sock.recv(len(large_data))
367             self.assertEqual(result, large_data)
368
369         def server():
370             (sock, addr) = listener.accept()
371             sock = bufsized(sock)
372             send_large_coro = eventlet.spawn(send_large, sock)
373             eventlet.sleep(0)
374             result = sock.recv(10)
375             expected = b'hello world'
376             while len(result) < len(expected):
377                 result += sock.recv(10)
378             self.assertEqual(result, expected)
379             send_large_coro.wait()
380
381         server_evt = eventlet.spawn(server)
382         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
383         client.connect(('127.0.0.1', listener.getsockname()[1]))
384         bufsized(client)
385         large_evt = eventlet.spawn(read_large, client)
386         eventlet.sleep(0)
387         client.sendall(b'hello world')
388         server_evt.wait()
389         large_evt.wait()
390         client.close()
391
392     def test_sendall(self):
393         # test adapted from Marcus Cavanaugh's email
394         # it may legitimately take a while, but will eventually complete
395         self.timer.cancel()
396         second_bytes = 10
397
398         def test_sendall_impl(many_bytes):
399             bufsize = max(many_bytes // 15, 2)
400
401             def sender(listener):
402                 (sock, addr) = listener.accept()
403                 sock = bufsized(sock, size=bufsize)
404                 sock.sendall(b'x' * many_bytes)
405                 sock.sendall(b'y' * second_bytes)
406
407             listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
408             listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
409             listener.bind(("", 0))
410             listener.listen(50)
411             sender_coro = eventlet.spawn(sender, listener)
412             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
413             client.connect(('127.0.0.1', listener.getsockname()[1]))
414             bufsized(client, size=bufsize)
415             total = 0
416             while total < many_bytes:
417                 data = client.recv(min(many_bytes - total, many_bytes // 10))
418                 if not data:
419                     break
420                 total += len(data)
421
422             total2 = 0
423             while total < second_bytes:
424                 data = client.recv(second_bytes)
425                 if not data:
426                     break
427                 total2 += len(data)
428
429             sender_coro.wait()
430             client.close()
431
432         for how_many in (1000, 10000, 100000, 1000000):
433             test_sendall_impl(how_many)
434
435     def test_wrap_socket(self):
436         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
437         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
438         sock.bind(('127.0.0.1', 0))
439         sock.listen(50)
440         ssl.wrap_socket(sock)
441
442     def test_timeout_and_final_write(self):
443         # This test verifies that a write on a socket that we've
444         # stopped listening for doesn't result in an incorrect switch
445         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
446         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
447         server.bind(('127.0.0.1', 0))
448         server.listen(50)
449         bound_port = server.getsockname()[1]
450
451         def sender(evt):
452             s2, addr = server.accept()
453             wrap_wfile = s2.makefile('w')
454
455             eventlet.sleep(0.02)
456             wrap_wfile.write(b'hi')
457             s2.close()
458             evt.send(b'sent via event')
459
460         evt = event.Event()
461         eventlet.spawn(sender, evt)
462         # lets the socket enter accept mode, which
463         # is necessary for connect to succeed on windows
464         eventlet.sleep(0)
465         try:
466             # try and get some data off of this pipe
467             # but bail before any is sent
468             eventlet.Timeout(0.01)
469             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
470             client.connect(('127.0.0.1', bound_port))
471             wrap_rfile = client.makefile()
472             wrap_rfile.read(1)
473             self.fail()
474         except eventlet.TimeoutError:
475             pass
476
477         result = evt.wait()
478         self.assertEqual(result, b'sent via event')
479         server.close()
480         client.close()
481
482     @skip_with_pyevent
483     def test_raised_multiple_readers(self):
484         debug.hub_prevent_multiple_readers(True)
485
486         def handle(sock, addr):
487             sock.recv(1)
488             sock.sendall(b"a")
489             raise eventlet.StopServe()
490
491         listener = eventlet.listen(('127.0.0.1', 0))
492         eventlet.spawn(eventlet.serve, listener, handle)
493
494         def reader(s):
495             s.recv(1)
496
497         s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
498         a = eventlet.spawn(reader, s)
499         eventlet.sleep(0)
500         self.assertRaises(RuntimeError, s.recv, 1)
501         s.sendall(b'b')
502         a.wait()
503
504     @skip_with_pyevent
505     @skip_if(using_epoll_hub)
506     @skip_if(using_kqueue_hub)
507     def test_closure(self):
508         def spam_to_me(address):
509             sock = eventlet.connect(address)
510             while True:
511                 try:
512                     sock.sendall(b'hello world')
513                 except socket.error as e:
514                     if get_errno(e) == errno.EPIPE:
515                         return
516                     raise
517
518         server = eventlet.listen(('127.0.0.1', 0))
519         sender = eventlet.spawn(spam_to_me, server.getsockname())
520         client, address = server.accept()
521         server.close()
522
523         def reader():
524             try:
525                 while True:
526                     data = client.recv(1024)
527                     assert data
528             except socket.error as e:
529                 # we get an EBADF because client is closed in the same process
530                 # (but a different greenthread)
531                 if get_errno(e) != errno.EBADF:
532                     raise
533
534         def closer():
535             client.close()
536
537         reader = eventlet.spawn(reader)
538         eventlet.spawn_n(closer)
539         reader.wait()
540         sender.wait()
541
542     def test_invalid_connection(self):
543         # find an unused port by creating a socket then closing it
544         listening_socket = eventlet.listen(('127.0.0.1', 0))
545         port = listening_socket.getsockname()[1]
546         listening_socket.close()
547         self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
548
549     def test_zero_timeout_and_back(self):
550         listen = eventlet.listen(('', 0))
551         # Keep reference to server side of socket
552         server = eventlet.spawn(listen.accept)
553         client = eventlet.connect(listen.getsockname())
554
555         client.settimeout(0.05)
556         # Now must raise socket.timeout
557         self.assertRaises(socket.timeout, client.recv, 1)
558
559         client.settimeout(0)
560         # Now must raise socket.error with EAGAIN
561         try:
562             client.recv(1)
563             assert False
564         except socket.error as e:
565             assert get_errno(e) == errno.EAGAIN
566
567         client.settimeout(0.05)
568         # Now socket.timeout again
569         self.assertRaises(socket.timeout, client.recv, 1)
570         server.wait()
571
572     def test_default_nonblocking(self):
573         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
574         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
575         assert flags & os.O_NONBLOCK
576
577         sock2 = socket.socket(sock1.fd)
578         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
579         assert flags & os.O_NONBLOCK
580
581     def test_dup_nonblocking(self):
582         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
583         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
584         assert flags & os.O_NONBLOCK
585
586         sock2 = sock1.dup()
587         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
588         assert flags & os.O_NONBLOCK
589
590     def test_skip_nonblocking(self):
591         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
592         fd = sock1.fd.fileno()
593         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
594         flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
595         assert flags & os.O_NONBLOCK == 0
596
597         sock2 = socket.socket(sock1.fd, set_nonblocking=False)
598         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
599         assert flags & os.O_NONBLOCK == 0
600
601     def test_sockopt_interface(self):
602         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
603         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 0
604         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) == '\000'
605         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
606
607     def test_socketpair_select(self):
608         # https://github.com/eventlet/eventlet/pull/25
609         s1, s2 = socket.socketpair()
610         assert select.select([], [s1], [], 0) == ([], [s1], [])
611         assert select.select([], [s1], [], 0) == ([], [s1], [])
612
613
614 class TestGreenPipe(LimitedTestCase):
615     @skip_on_windows
616     def setUp(self):
617         super(self.__class__, self).setUp()
618         self.tempdir = tempfile.mkdtemp('_green_pipe_test')
619
620     def tearDown(self):
621         shutil.rmtree(self.tempdir)
622         super(self.__class__, self).tearDown()
623
624     def test_pipe(self):
625         r, w = os.pipe()
626         rf = greenio.GreenPipe(r, 'r')
627         wf = greenio.GreenPipe(w, 'w', 0)
628
629         def sender(f, content):
630             for ch in content:
631                 eventlet.sleep(0.0001)
632                 f.write(ch)
633             f.close()
634
635         one_line = b"12345\n"
636         eventlet.spawn(sender, wf, one_line * 5)
637         for i in range(5):
638             line = rf.readline()
639             eventlet.sleep(0.01)
640             self.assertEqual(line, one_line)
641         self.assertEqual(rf.readline(), '')
642
643     def test_pipe_read(self):
644         # ensure that 'readline' works properly on GreenPipes when data is not
645         # immediately available (fd is nonblocking, was raising EAGAIN)
646         # also ensures that readline() terminates on '\n' and '\r\n'
647         r, w = os.pipe()
648
649         r = greenio.GreenPipe(r)
650         w = greenio.GreenPipe(w, 'w')
651
652         def writer():
653             eventlet.sleep(.1)
654
655             w.write(b'line\n')
656             w.flush()
657
658             w.write(b'line\r\n')
659             w.flush()
660
661         gt = eventlet.spawn(writer)
662
663         eventlet.sleep(0)
664
665         line = r.readline()
666         self.assertEqual(line, 'line\n')
667
668         line = r.readline()
669         self.assertEqual(line, 'line\r\n')
670
671         gt.wait()
672
673     def test_pipe_writes_large_messages(self):
674         r, w = os.pipe()
675
676         r = greenio.GreenPipe(r)
677         w = greenio.GreenPipe(w, 'w')
678
679         large_message = b"".join([1024 * chr(i) for i in range(65)])
680
681         def writer():
682             w.write(large_message)
683             w.close()
684
685         gt = eventlet.spawn(writer)
686
687         for i in range(65):
688             buf = r.read(1024)
689             expected = 1024 * chr(i)
690             self.assertEqual(
691                 buf, expected,
692                 "expected=%r..%r, found=%r..%r iter=%d"
693                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
694         gt.wait()
695
696     def test_seek_on_buffered_pipe(self):
697         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
698         self.assertEqual(f.tell(), 0)
699         f.seek(0, 2)
700         self.assertEqual(f.tell(), 0)
701         f.write(b'1234567890')
702         f.seek(0, 2)
703         self.assertEqual(f.tell(), 10)
704         f.seek(0)
705         value = f.read(1)
706         self.assertEqual(value, '1')
707         self.assertEqual(f.tell(), 1)
708         value = f.read(1)
709         self.assertEqual(value, '2')
710         self.assertEqual(f.tell(), 2)
711         f.seek(0, 1)
712         self.assertEqual(f.readline(), '34567890')
713         f.seek(-5, 1)
714         self.assertEqual(f.readline(), '67890')
715         f.seek(0)
716         self.assertEqual(f.readline(), '1234567890')
717         f.seek(0, 2)
718         self.assertEqual(f.readline(), '')
719
720     def test_truncate(self):
721         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
722         f.write(b'1234567890')
723         f.truncate(9)
724         self.assertEqual(f.tell(), 9)
725
726
727 class TestGreenIoLong(LimitedTestCase):
728     TEST_TIMEOUT = 10  # the test here might take a while depending on the OS
729
730     @skip_with_pyevent
731     def test_multiple_readers(self, clibufsize=False):
732         debug.hub_prevent_multiple_readers(False)
733         recvsize = 2 * min_buf_size()
734         sendsize = 10 * recvsize
735
736         # test that we can have multiple coroutines reading
737         # from the same fd.  We make no guarantees about which one gets which
738         # bytes, but they should both get at least some
739         def reader(sock, results):
740             while True:
741                 data = sock.recv(recvsize)
742                 if not data:
743                     break
744                 results.append(data)
745
746         results1 = []
747         results2 = []
748         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
749         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
750         listener.bind(('127.0.0.1', 0))
751         listener.listen(50)
752
753         def server():
754             (sock, addr) = listener.accept()
755             sock = bufsized(sock)
756             try:
757                 c1 = eventlet.spawn(reader, sock, results1)
758                 c2 = eventlet.spawn(reader, sock, results2)
759                 try:
760                     c1.wait()
761                     c2.wait()
762                 finally:
763                     c1.kill()
764                     c2.kill()
765             finally:
766                 sock.close()
767
768         server_coro = eventlet.spawn(server)
769         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770         client.connect(('127.0.0.1', listener.getsockname()[1]))
771         if clibufsize:
772             bufsized(client, size=sendsize)
773         else:
774             bufsized(client)
775         client.sendall(b'*' * sendsize)
776         client.close()
777         server_coro.wait()
778         listener.close()
779         assert len(results1) > 0
780         assert len(results2) > 0
781         debug.hub_prevent_multiple_readers()
782
783     @skipped  # by rdw because it fails but it's not clear how to make it pass
784     @skip_with_pyevent
785     def test_multiple_readers2(self):
786         self.test_multiple_readers(clibufsize=True)
787
788
789 class TestGreenIoStarvation(LimitedTestCase):
790     # fixme: this doesn't succeed, because of eventlet's predetermined
791     # ordering.  two processes, one with server, one with client eventlets
792     # might be more reliable?
793
794     TEST_TIMEOUT = 300  # the test here might take a while depending on the OS
795
796     @skipped  # by rdw, because it fails but it's not clear how to make it pass
797     @skip_with_pyevent
798     def test_server_starvation(self, sendloops=15):
799         recvsize = 2 * min_buf_size()
800         sendsize = 10000 * recvsize
801
802         results = [[] for i in range(5)]
803
804         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
805         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
806         listener.bind(('127.0.0.1', 0))
807         port = listener.getsockname()[1]
808         listener.listen(50)
809
810         base_time = time.time()
811
812         def server(my_results):
813             sock, addr = listener.accept()
814
815             datasize = 0
816
817             t1 = None
818             t2 = None
819             try:
820                 while True:
821                     data = sock.recv(recvsize)
822                     if not t1:
823                         t1 = time.time() - base_time
824                     if not data:
825                         t2 = time.time() - base_time
826                         my_results.append(datasize)
827                         my_results.append((t1, t2))
828                         break
829                     datasize += len(data)
830             finally:
831                 sock.close()
832
833         def client():
834             pid = os.fork()
835             if pid:
836                 return pid
837
838             client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
839             client.connect(('127.0.0.1', port))
840
841             bufsized(client, size=sendsize)
842
843             for i in range(sendloops):
844                 client.sendall(b'*' * sendsize)
845             client.close()
846             os._exit(0)
847
848         clients = []
849         servers = []
850         for r in results:
851             servers.append(eventlet.spawn(server, r))
852         for r in results:
853             clients.append(client())
854
855         for s in servers:
856             s.wait()
857         for c in clients:
858             os.waitpid(c, 0)
859
860         listener.close()
861
862         # now test that all of the server receive intervals overlap, and
863         # that there were no errors.
864         for r in results:
865             assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
866             assert r[0] == sendsize * sendloops
867             assert len(r[1]) == 2
868             assert r[1][0] is not None
869             assert r[1][1] is not None
870
871         starttimes = sorted(r[1][0] for r in results)
872         endtimes = sorted(r[1][1] for r in results)
873         runlengths = sorted(r[1][1] - r[1][0] for r in results)
874
875         # assert that the last task started before the first task ended
876         # (our no-starvation condition)
877         assert starttimes[-1] < endtimes[0], \
878             "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
879
880         maxstartdiff = starttimes[-1] - starttimes[0]
881
882         assert maxstartdiff * 2 < runlengths[0], \
883             "Largest difference in starting times more than twice the shortest running time!"
884         assert runlengths[0] * 2 > runlengths[-1], \
885             "Longest runtime more than twice as long as shortest!"
886
887
888 def test_set_nonblocking():
889     sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
890     fileno = sock.fileno()
891     orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
892     assert orig_flags & os.O_NONBLOCK == 0
893     greenio.set_nonblocking(sock)
894     new_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
895     assert new_flags == (orig_flags | os.O_NONBLOCK)
896
897
898 if __name__ == '__main__':
899     main()