Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / greenio_test.py
1 import array
2 import errno
3 import eventlet
4 import fcntl
5 import gc
6 import os
7 import shutil
8 import socket as _orig_sock
9 import sys
10 import tempfile
11
12 from nose.tools import eq_
13
14 from eventlet import event, greenio, debug
15 from eventlet.hubs import get_hub
16 from eventlet.green import select, socket, time, ssl
17 from eventlet.support import capture_stderr, get_errno, six
18 from tests import (
19     LimitedTestCase, main,
20     skip_with_pyevent, skipped, skip_if, skip_on_windows,
21 )
22
23
24 if six.PY3:
25     buffer = memoryview
26
27
28 def bufsized(sock, size=1):
29     """ Resize both send and receive buffers on a socket.
30     Useful for testing trampoline.  Returns the socket.
31
32     >>> import socket
33     >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
34     """
35     sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
36     sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
37     return sock
38
39
40 def min_buf_size():
41     """Return the minimum buffer size that the platform supports."""
42     test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
43     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
44     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
45
46
47 def using_epoll_hub(_f):
48     try:
49         return 'epolls' in type(get_hub()).__module__
50     except Exception:
51         return False
52
53
54 def using_kqueue_hub(_f):
55     try:
56         return 'kqueue' in type(get_hub()).__module__
57     except Exception:
58         return False
59
60
61 class TestGreenSocket(LimitedTestCase):
62     def assertWriteToClosedFileRaises(self, fd):
63         if sys.version_info[0] < 3:
64             # 2.x socket._fileobjects are odd: writes don't check
65             # whether the socket is closed or not, and you get an
66             # AttributeError during flush if it is closed
67             fd.write(b'a')
68             self.assertRaises(Exception, fd.flush)
69         else:
70             # 3.x io write to closed file-like pbject raises ValueError
71             self.assertRaises(ValueError, fd.write, b'a')
72
73     def test_connect_timeout(self):
74         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
75         s.settimeout(0.1)
76         gs = greenio.GreenSocket(s)
77         try:
78             gs.connect(('192.0.2.1', 80))
79             self.fail("socket.timeout not raised")
80         except socket.timeout as e:
81             assert hasattr(e, 'args')
82             self.assertEqual(e.args[0], 'timed out')
83         except socket.error as e:
84             # unreachable is also a valid outcome
85             if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
86                 raise
87
88     def test_accept_timeout(self):
89         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
90         s.bind(('', 0))
91         s.listen(50)
92
93         s.settimeout(0.1)
94         gs = greenio.GreenSocket(s)
95         try:
96             gs.accept()
97             self.fail("socket.timeout not raised")
98         except socket.timeout as e:
99             assert hasattr(e, 'args')
100             self.assertEqual(e.args[0], 'timed out')
101
102     def test_connect_ex_timeout(self):
103         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104         s.settimeout(0.1)
105         gs = greenio.GreenSocket(s)
106         e = gs.connect_ex(('192.0.2.1', 80))
107         if e not in (errno.EHOSTUNREACH, errno.ENETUNREACH):
108             self.assertEqual(e, errno.EAGAIN)
109
110     def test_recv_timeout(self):
111         listener = greenio.GreenSocket(socket.socket())
112         listener.bind(('', 0))
113         listener.listen(50)
114
115         evt = event.Event()
116
117         def server():
118             # accept the connection in another greenlet
119             sock, addr = listener.accept()
120             evt.wait()
121
122         gt = eventlet.spawn(server)
123
124         addr = listener.getsockname()
125
126         client = greenio.GreenSocket(socket.socket())
127         client.settimeout(0.1)
128
129         client.connect(addr)
130
131         try:
132             client.recv(8192)
133             self.fail("socket.timeout not raised")
134         except socket.timeout as e:
135             assert hasattr(e, 'args')
136             self.assertEqual(e.args[0], 'timed out')
137
138         evt.send()
139         gt.wait()
140
141     def test_recvfrom_timeout(self):
142         gs = greenio.GreenSocket(
143             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
144         gs.settimeout(.1)
145         gs.bind(('', 0))
146
147         try:
148             gs.recvfrom(8192)
149             self.fail("socket.timeout not raised")
150         except socket.timeout as e:
151             assert hasattr(e, 'args')
152             self.assertEqual(e.args[0], 'timed out')
153
154     def test_recvfrom_into_timeout(self):
155         buf = array.array('B')
156
157         gs = greenio.GreenSocket(
158             socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
159         gs.settimeout(.1)
160         gs.bind(('', 0))
161
162         try:
163             gs.recvfrom_into(buf)
164             self.fail("socket.timeout not raised")
165         except socket.timeout as e:
166             assert hasattr(e, 'args')
167             self.assertEqual(e.args[0], 'timed out')
168
169     def test_recv_into_timeout(self):
170         buf = array.array('B')
171
172         listener = greenio.GreenSocket(socket.socket())
173         listener.bind(('', 0))
174         listener.listen(50)
175
176         evt = event.Event()
177
178         def server():
179             # accept the connection in another greenlet
180             sock, addr = listener.accept()
181             evt.wait()
182
183         gt = eventlet.spawn(server)
184
185         addr = listener.getsockname()
186
187         client = greenio.GreenSocket(socket.socket())
188         client.settimeout(0.1)
189
190         client.connect(addr)
191
192         try:
193             client.recv_into(buf)
194             self.fail("socket.timeout not raised")
195         except socket.timeout as e:
196             assert hasattr(e, 'args')
197             self.assertEqual(e.args[0], 'timed out')
198
199         evt.send()
200         gt.wait()
201
202     def test_send_timeout(self):
203         self.reset_timeout(2)
204         listener = bufsized(eventlet.listen(('', 0)))
205
206         evt = event.Event()
207
208         def server():
209             # accept the connection in another greenlet
210             sock, addr = listener.accept()
211             sock = bufsized(sock)
212             evt.wait()
213
214         gt = eventlet.spawn(server)
215
216         addr = listener.getsockname()
217
218         client = bufsized(greenio.GreenSocket(socket.socket()))
219         client.connect(addr)
220         try:
221             client.settimeout(0.00001)
222             msg = b"A" * 100000  # large enough number to overwhelm most buffers
223
224             total_sent = 0
225             # want to exceed the size of the OS buffer so it'll block in a
226             # single send
227             for x in range(10):
228                 total_sent += client.send(msg)
229             self.fail("socket.timeout not raised")
230         except socket.timeout as e:
231             assert hasattr(e, 'args')
232             self.assertEqual(e.args[0], 'timed out')
233
234         evt.send()
235         gt.wait()
236
237     def test_sendall_timeout(self):
238         listener = greenio.GreenSocket(socket.socket())
239         listener.bind(('', 0))
240         listener.listen(50)
241
242         evt = event.Event()
243
244         def server():
245             # accept the connection in another greenlet
246             sock, addr = listener.accept()
247             evt.wait()
248
249         gt = eventlet.spawn(server)
250
251         addr = listener.getsockname()
252
253         client = greenio.GreenSocket(socket.socket())
254         client.settimeout(0.1)
255         client.connect(addr)
256
257         try:
258             msg = b"A" * (8 << 20)
259
260             # want to exceed the size of the OS buffer so it'll block
261             client.sendall(msg)
262             self.fail("socket.timeout not raised")
263         except socket.timeout as e:
264             assert hasattr(e, 'args')
265             self.assertEqual(e.args[0], 'timed out')
266
267         evt.send()
268         gt.wait()
269
270     def test_close_with_makefile(self):
271         def accept_close_early(listener):
272             # verify that the makefile and the socket are truly independent
273             # by closing the socket prior to using the made file
274             try:
275                 conn, addr = listener.accept()
276                 fd = conn.makefile('wb')
277                 conn.close()
278                 fd.write(b'hello\n')
279                 fd.close()
280                 self.assertWriteToClosedFileRaises(fd)
281                 self.assertRaises(socket.error, conn.send, b'b')
282             finally:
283                 listener.close()
284
285         def accept_close_late(listener):
286             # verify that the makefile and the socket are truly independent
287             # by closing the made file and then sending a character
288             try:
289                 conn, addr = listener.accept()
290                 fd = conn.makefile('wb')
291                 fd.write(b'hello')
292                 fd.close()
293                 conn.send(b'\n')
294                 conn.close()
295                 self.assertWriteToClosedFileRaises(fd)
296                 self.assertRaises(socket.error, conn.send, b'b')
297             finally:
298                 listener.close()
299
300         def did_it_work(server):
301             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
302             client.connect(('127.0.0.1', server.getsockname()[1]))
303             fd = client.makefile('rb')
304             client.close()
305             assert fd.readline() == b'hello\n'
306             assert fd.read() == b''
307             fd.close()
308
309         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
310         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
311         server.bind(('0.0.0.0', 0))
312         server.listen(50)
313         killer = eventlet.spawn(accept_close_early, server)
314         did_it_work(server)
315         killer.wait()
316
317         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
318         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
319         server.bind(('0.0.0.0', 0))
320         server.listen(50)
321         killer = eventlet.spawn(accept_close_late, server)
322         did_it_work(server)
323         killer.wait()
324
325     def test_del_closes_socket(self):
326         def accept_once(listener):
327             # delete/overwrite the original conn
328             # object, only keeping the file object around
329             # closing the file object should close everything
330             try:
331                 conn, addr = listener.accept()
332                 conn = conn.makefile('wb')
333                 conn.write(b'hello\n')
334                 conn.close()
335                 gc.collect()
336                 self.assertWriteToClosedFileRaises(conn)
337             finally:
338                 listener.close()
339
340         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
341         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
342         server.bind(('127.0.0.1', 0))
343         server.listen(50)
344         killer = eventlet.spawn(accept_once, server)
345         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
346         client.connect(('127.0.0.1', server.getsockname()[1]))
347         fd = client.makefile('rb')
348         client.close()
349         assert fd.read() == b'hello\n'
350         assert fd.read() == b''
351
352         killer.wait()
353
354     def test_full_duplex(self):
355         large_data = b'*' * 10 * min_buf_size()
356         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
357         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
358         listener.bind(('127.0.0.1', 0))
359         listener.listen(50)
360         bufsized(listener)
361
362         def send_large(sock):
363             sock.sendall(large_data)
364
365         def read_large(sock):
366             result = sock.recv(len(large_data))
367             while len(result) < len(large_data):
368                 result += sock.recv(len(large_data))
369             self.assertEqual(result, large_data)
370
371         def server():
372             (sock, addr) = listener.accept()
373             sock = bufsized(sock)
374             send_large_coro = eventlet.spawn(send_large, sock)
375             eventlet.sleep(0)
376             result = sock.recv(10)
377             expected = b'hello world'
378             while len(result) < len(expected):
379                 result += sock.recv(10)
380             self.assertEqual(result, expected)
381             send_large_coro.wait()
382
383         server_evt = eventlet.spawn(server)
384         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
385         client.connect(('127.0.0.1', listener.getsockname()[1]))
386         bufsized(client)
387         large_evt = eventlet.spawn(read_large, client)
388         eventlet.sleep(0)
389         client.sendall(b'hello world')
390         server_evt.wait()
391         large_evt.wait()
392         client.close()
393
394     def test_sendall(self):
395         # test adapted from Marcus Cavanaugh's email
396         # it may legitimately take a while, but will eventually complete
397         self.timer.cancel()
398         second_bytes = 10
399
400         def test_sendall_impl(many_bytes):
401             bufsize = max(many_bytes // 15, 2)
402
403             def sender(listener):
404                 (sock, addr) = listener.accept()
405                 sock = bufsized(sock, size=bufsize)
406                 sock.sendall(b'x' * many_bytes)
407                 sock.sendall(b'y' * second_bytes)
408
409             listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
410             listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
411             listener.bind(("", 0))
412             listener.listen(50)
413             sender_coro = eventlet.spawn(sender, listener)
414             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
415             client.connect(('127.0.0.1', listener.getsockname()[1]))
416             bufsized(client, size=bufsize)
417             total = 0
418             while total < many_bytes:
419                 data = client.recv(min(many_bytes - total, many_bytes // 10))
420                 if not data:
421                     break
422                 total += len(data)
423
424             total2 = 0
425             while total < second_bytes:
426                 data = client.recv(second_bytes)
427                 if not data:
428                     break
429                 total2 += len(data)
430
431             sender_coro.wait()
432             client.close()
433
434         for how_many in (1000, 10000, 100000, 1000000):
435             test_sendall_impl(how_many)
436
437     def test_wrap_socket(self):
438         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
439         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
440         sock.bind(('127.0.0.1', 0))
441         sock.listen(50)
442         ssl.wrap_socket(sock)
443
444     def test_timeout_and_final_write(self):
445         # This test verifies that a write on a socket that we've
446         # stopped listening for doesn't result in an incorrect switch
447         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
448         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
449         server.bind(('127.0.0.1', 0))
450         server.listen(50)
451         bound_port = server.getsockname()[1]
452
453         def sender(evt):
454             s2, addr = server.accept()
455             wrap_wfile = s2.makefile('wb')
456
457             eventlet.sleep(0.02)
458             wrap_wfile.write(b'hi')
459             s2.close()
460             evt.send(b'sent via event')
461
462         evt = event.Event()
463         eventlet.spawn(sender, evt)
464         # lets the socket enter accept mode, which
465         # is necessary for connect to succeed on windows
466         eventlet.sleep(0)
467         try:
468             # try and get some data off of this pipe
469             # but bail before any is sent
470             eventlet.Timeout(0.01)
471             client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
472             client.connect(('127.0.0.1', bound_port))
473             wrap_rfile = client.makefile()
474             wrap_rfile.read(1)
475             self.fail()
476         except eventlet.TimeoutError:
477             pass
478
479         result = evt.wait()
480         self.assertEqual(result, b'sent via event')
481         server.close()
482         client.close()
483
484     @skip_with_pyevent
485     def test_raised_multiple_readers(self):
486         debug.hub_prevent_multiple_readers(True)
487
488         def handle(sock, addr):
489             sock.recv(1)
490             sock.sendall(b"a")
491             raise eventlet.StopServe()
492
493         listener = eventlet.listen(('127.0.0.1', 0))
494         eventlet.spawn(eventlet.serve, listener, handle)
495
496         def reader(s):
497             s.recv(1)
498
499         s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
500         a = eventlet.spawn(reader, s)
501         eventlet.sleep(0)
502         self.assertRaises(RuntimeError, s.recv, 1)
503         s.sendall(b'b')
504         a.wait()
505
506     @skip_with_pyevent
507     @skip_if(using_epoll_hub)
508     @skip_if(using_kqueue_hub)
509     def test_closure(self):
510         def spam_to_me(address):
511             sock = eventlet.connect(address)
512             while True:
513                 try:
514                     sock.sendall(b'hello world')
515                 except socket.error as e:
516                     if get_errno(e) == errno.EPIPE:
517                         return
518                     raise
519
520         server = eventlet.listen(('127.0.0.1', 0))
521         sender = eventlet.spawn(spam_to_me, server.getsockname())
522         client, address = server.accept()
523         server.close()
524
525         def reader():
526             try:
527                 while True:
528                     data = client.recv(1024)
529                     assert data
530             except socket.error as e:
531                 # we get an EBADF because client is closed in the same process
532                 # (but a different greenthread)
533                 if get_errno(e) != errno.EBADF:
534                     raise
535
536         def closer():
537             client.close()
538
539         reader = eventlet.spawn(reader)
540         eventlet.spawn_n(closer)
541         reader.wait()
542         sender.wait()
543
544     def test_invalid_connection(self):
545         # find an unused port by creating a socket then closing it
546         listening_socket = eventlet.listen(('127.0.0.1', 0))
547         port = listening_socket.getsockname()[1]
548         listening_socket.close()
549         self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
550
551     def test_zero_timeout_and_back(self):
552         listen = eventlet.listen(('', 0))
553         # Keep reference to server side of socket
554         server = eventlet.spawn(listen.accept)
555         client = eventlet.connect(listen.getsockname())
556
557         client.settimeout(0.05)
558         # Now must raise socket.timeout
559         self.assertRaises(socket.timeout, client.recv, 1)
560
561         client.settimeout(0)
562         # Now must raise socket.error with EAGAIN
563         try:
564             client.recv(1)
565             assert False
566         except socket.error as e:
567             assert get_errno(e) == errno.EAGAIN
568
569         client.settimeout(0.05)
570         # Now socket.timeout again
571         self.assertRaises(socket.timeout, client.recv, 1)
572         server.wait()
573
574     def test_default_nonblocking(self):
575         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
576         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
577         assert flags & os.O_NONBLOCK
578
579         sock2 = socket.socket(sock1.fd)
580         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
581         assert flags & os.O_NONBLOCK
582
583     def test_dup_nonblocking(self):
584         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
585         flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
586         assert flags & os.O_NONBLOCK
587
588         sock2 = sock1.dup()
589         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
590         assert flags & os.O_NONBLOCK
591
592     def test_skip_nonblocking(self):
593         sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
594         fd = sock1.fd.fileno()
595         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
596         flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
597         assert flags & os.O_NONBLOCK == 0
598
599         sock2 = socket.socket(sock1.fd, set_nonblocking=False)
600         flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
601         assert flags & os.O_NONBLOCK == 0
602
603     def test_sockopt_interface(self):
604         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
605         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 0
606         assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) == b'\000'
607         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
608
609     def test_socketpair_select(self):
610         # https://github.com/eventlet/eventlet/pull/25
611         s1, s2 = socket.socketpair()
612         assert select.select([], [s1], [], 0) == ([], [s1], [])
613         assert select.select([], [s1], [], 0) == ([], [s1], [])
614
615
616 def test_get_fileno_of_a_socket_works():
617     class DummySocket(object):
618         def fileno(self):
619             return 123
620     assert select.get_fileno(DummySocket()) == 123
621
622
623 def test_get_fileno_of_an_int_works():
624     assert select.get_fileno(123) == 123
625
626
627 def test_get_fileno_of_wrong_type_fails():
628     try:
629         select.get_fileno('foo')
630     except TypeError as ex:
631         assert str(ex) == 'Expected int or long, got <type \'str\'>'
632     else:
633         assert False, 'Expected TypeError not raised'
634
635
636 def test_get_fileno_of_a_socket_with_fileno_returning_wrong_type_fails():
637     class DummySocket(object):
638         def fileno(self):
639             return 'foo'
640     try:
641         select.get_fileno(DummySocket())
642     except TypeError as ex:
643         assert str(ex) == 'Expected int or long, got <type \'str\'>'
644     else:
645         assert False, 'Expected TypeError not raised'
646
647
648 class TestGreenPipe(LimitedTestCase):
649     @skip_on_windows
650     def setUp(self):
651         super(self.__class__, self).setUp()
652         self.tempdir = tempfile.mkdtemp('_green_pipe_test')
653
654     def tearDown(self):
655         shutil.rmtree(self.tempdir)
656         super(self.__class__, self).tearDown()
657
658     def test_pipe(self):
659         r, w = os.pipe()
660         rf = greenio.GreenPipe(r, 'r')
661         wf = greenio.GreenPipe(w, 'w', 0)
662
663         def sender(f, content):
664             for ch in map(six.int2byte, six.iterbytes(content)):
665                 eventlet.sleep(0.0001)
666                 f.write(ch)
667             f.close()
668
669         one_line = b"12345\n"
670         eventlet.spawn(sender, wf, one_line * 5)
671         for i in range(5):
672             line = rf.readline()
673             eventlet.sleep(0.01)
674             self.assertEqual(line, one_line)
675         self.assertEqual(rf.readline(), b'')
676
677     def test_pipe_read(self):
678         # ensure that 'readline' works properly on GreenPipes when data is not
679         # immediately available (fd is nonblocking, was raising EAGAIN)
680         # also ensures that readline() terminates on '\n' and '\r\n'
681         r, w = os.pipe()
682
683         r = greenio.GreenPipe(r)
684         w = greenio.GreenPipe(w, 'w')
685
686         def writer():
687             eventlet.sleep(.1)
688
689             w.write(b'line\n')
690             w.flush()
691
692             w.write(b'line\r\n')
693             w.flush()
694
695         gt = eventlet.spawn(writer)
696
697         eventlet.sleep(0)
698
699         line = r.readline()
700         self.assertEqual(line, b'line\n')
701
702         line = r.readline()
703         self.assertEqual(line, b'line\r\n')
704
705         gt.wait()
706
707     def test_pipe_writes_large_messages(self):
708         r, w = os.pipe()
709
710         r = greenio.GreenPipe(r)
711         w = greenio.GreenPipe(w, 'w')
712
713         large_message = b"".join([1024 * six.int2byte(i) for i in range(65)])
714
715         def writer():
716             w.write(large_message)
717             w.close()
718
719         gt = eventlet.spawn(writer)
720
721         for i in range(65):
722             buf = r.read(1024)
723             expected = 1024 * chr(i)
724             self.assertEqual(
725                 buf, expected,
726                 "expected=%r..%r, found=%r..%r iter=%d"
727                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
728         gt.wait()
729
730     def test_seek_on_buffered_pipe(self):
731         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
732         self.assertEqual(f.tell(), 0)
733         f.seek(0, 2)
734         self.assertEqual(f.tell(), 0)
735         f.write(b'1234567890')
736         f.seek(0, 2)
737         self.assertEqual(f.tell(), 10)
738         f.seek(0)
739         value = f.read(1)
740         self.assertEqual(value, '1')
741         self.assertEqual(f.tell(), 1)
742         value = f.read(1)
743         self.assertEqual(value, '2')
744         self.assertEqual(f.tell(), 2)
745         f.seek(0, 1)
746         self.assertEqual(f.readline(), '34567890')
747         f.seek(-5, 1)
748         self.assertEqual(f.readline(), '67890')
749         f.seek(0)
750         self.assertEqual(f.readline(), '1234567890')
751         f.seek(0, 2)
752         self.assertEqual(f.readline(), '')
753
754     def test_truncate(self):
755         f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
756         f.write(b'1234567890')
757         f.truncate(9)
758         self.assertEqual(f.tell(), 9)
759
760
761 class TestGreenIoLong(LimitedTestCase):
762     TEST_TIMEOUT = 10  # the test here might take a while depending on the OS
763
764     @skip_with_pyevent
765     def test_multiple_readers(self, clibufsize=False):
766         debug.hub_prevent_multiple_readers(False)
767         recvsize = 2 * min_buf_size()
768         sendsize = 10 * recvsize
769
770         # test that we can have multiple coroutines reading
771         # from the same fd.  We make no guarantees about which one gets which
772         # bytes, but they should both get at least some
773         def reader(sock, results):
774             while True:
775                 data = sock.recv(recvsize)
776                 if not data:
777                     break
778                 results.append(data)
779
780         results1 = []
781         results2 = []
782         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
783         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
784         listener.bind(('127.0.0.1', 0))
785         listener.listen(50)
786
787         def server():
788             (sock, addr) = listener.accept()
789             sock = bufsized(sock)
790             try:
791                 c1 = eventlet.spawn(reader, sock, results1)
792                 c2 = eventlet.spawn(reader, sock, results2)
793                 try:
794                     c1.wait()
795                     c2.wait()
796                 finally:
797                     c1.kill()
798                     c2.kill()
799             finally:
800                 sock.close()
801
802         server_coro = eventlet.spawn(server)
803         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
804         client.connect(('127.0.0.1', listener.getsockname()[1]))
805         if clibufsize:
806             bufsized(client, size=sendsize)
807         else:
808             bufsized(client)
809         client.sendall(b'*' * sendsize)
810         client.close()
811         server_coro.wait()
812         listener.close()
813         assert len(results1) > 0
814         assert len(results2) > 0
815         debug.hub_prevent_multiple_readers()
816
817     @skipped  # by rdw because it fails but it's not clear how to make it pass
818     @skip_with_pyevent
819     def test_multiple_readers2(self):
820         self.test_multiple_readers(clibufsize=True)
821
822
823 class TestGreenIoStarvation(LimitedTestCase):
824     # fixme: this doesn't succeed, because of eventlet's predetermined
825     # ordering.  two processes, one with server, one with client eventlets
826     # might be more reliable?
827
828     TEST_TIMEOUT = 300  # the test here might take a while depending on the OS
829
830     @skipped  # by rdw, because it fails but it's not clear how to make it pass
831     @skip_with_pyevent
832     def test_server_starvation(self, sendloops=15):
833         recvsize = 2 * min_buf_size()
834         sendsize = 10000 * recvsize
835
836         results = [[] for i in range(5)]
837
838         listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
839         listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
840         listener.bind(('127.0.0.1', 0))
841         port = listener.getsockname()[1]
842         listener.listen(50)
843
844         base_time = time.time()
845
846         def server(my_results):
847             sock, addr = listener.accept()
848
849             datasize = 0
850
851             t1 = None
852             t2 = None
853             try:
854                 while True:
855                     data = sock.recv(recvsize)
856                     if not t1:
857                         t1 = time.time() - base_time
858                     if not data:
859                         t2 = time.time() - base_time
860                         my_results.append(datasize)
861                         my_results.append((t1, t2))
862                         break
863                     datasize += len(data)
864             finally:
865                 sock.close()
866
867         def client():
868             pid = os.fork()
869             if pid:
870                 return pid
871
872             client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
873             client.connect(('127.0.0.1', port))
874
875             bufsized(client, size=sendsize)
876
877             for i in range(sendloops):
878                 client.sendall(b'*' * sendsize)
879             client.close()
880             os._exit(0)
881
882         clients = []
883         servers = []
884         for r in results:
885             servers.append(eventlet.spawn(server, r))
886         for r in results:
887             clients.append(client())
888
889         for s in servers:
890             s.wait()
891         for c in clients:
892             os.waitpid(c, 0)
893
894         listener.close()
895
896         # now test that all of the server receive intervals overlap, and
897         # that there were no errors.
898         for r in results:
899             assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
900             assert r[0] == sendsize * sendloops
901             assert len(r[1]) == 2
902             assert r[1][0] is not None
903             assert r[1][1] is not None
904
905         starttimes = sorted(r[1][0] for r in results)
906         endtimes = sorted(r[1][1] for r in results)
907         runlengths = sorted(r[1][1] - r[1][0] for r in results)
908
909         # assert that the last task started before the first task ended
910         # (our no-starvation condition)
911         assert starttimes[-1] < endtimes[0], \
912             "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
913
914         maxstartdiff = starttimes[-1] - starttimes[0]
915
916         assert maxstartdiff * 2 < runlengths[0], \
917             "Largest difference in starting times more than twice the shortest running time!"
918         assert runlengths[0] * 2 > runlengths[-1], \
919             "Longest runtime more than twice as long as shortest!"
920
921
922 def test_set_nonblocking():
923     sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
924     fileno = sock.fileno()
925     orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
926     assert orig_flags & os.O_NONBLOCK == 0
927     greenio.set_nonblocking(sock)
928     new_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
929     assert new_flags == (orig_flags | os.O_NONBLOCK)
930
931
932 def test_socket_del_fails_gracefully_when_not_fully_initialized():
933     # Regression introduced in da87716714689894f23d0db7b003f26d97031e83, reported in:
934     # * GH #137 https://github.com/eventlet/eventlet/issues/137
935     # * https://bugs.launchpad.net/oslo.messaging/+bug/1369999
936
937     class SocketSubclass(socket.socket):
938
939         def __init__(self):
940             pass
941
942     with capture_stderr() as err:
943         SocketSubclass()
944
945     assert err.getvalue() == ''
946
947
948 if __name__ == '__main__':
949     main()